@@ -28,11 +28,9 @@ use crate::datasource::physical_plan::{
2828} ;
2929use crate :: datasource:: schema_adapter:: SchemaAdapterFactory ;
3030use crate :: physical_optimizer:: pruning:: PruningPredicate ;
31- use arrow_schema:: { ArrowError , Schema , SchemaRef } ;
31+ use arrow_schema:: { ArrowError , SchemaRef } ;
3232use datafusion_common:: { exec_err, Result } ;
33- use datafusion_physical_expr:: expressions:: Column ;
3433use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
35- use datafusion_physical_plan:: filter:: batch_filter;
3634use datafusion_physical_plan:: metrics:: ExecutionPlanMetricsSet ;
3735use futures:: { StreamExt , TryStreamExt } ;
3836use log:: debug;
@@ -127,9 +125,7 @@ impl FileOpener for ParquetOpener {
127125 ) ;
128126
129127 // Filter pushdown: evaluate predicates during scan
130- if let Some ( predicate) =
131- pushdown_filters. then_some ( predicate. clone ( ) ) . flatten ( )
132- {
128+ if let Some ( predicate) = pushdown_filters. then_some ( predicate) . flatten ( ) {
133129 let row_filter = row_filter:: build_row_filter (
134130 & predicate,
135131 & file_schema,
@@ -157,7 +153,7 @@ impl FileOpener for ParquetOpener {
157153 // Determine which row groups to actually read. The idea is to skip
158154 // as many row groups as possible based on the metadata and query
159155 let file_metadata = builder. metadata ( ) . clone ( ) ;
160- let pruning_predicate = pruning_predicate. as_ref ( ) . map ( |p| p. as_ref ( ) ) ;
156+ let predicate = pruning_predicate. as_ref ( ) . map ( |p| p. as_ref ( ) ) ;
161157 let rg_metadata = file_metadata. row_groups ( ) ;
162158 // track which row groups to actually read
163159 let access_plan =
@@ -168,12 +164,12 @@ impl FileOpener for ParquetOpener {
168164 row_groups. prune_by_range ( rg_metadata, range) ;
169165 }
170166 // If there is a predicate that can be evaluated against the metadata
171- if let Some ( pruning_predicate ) = pruning_predicate . as_ref ( ) {
167+ if let Some ( predicate ) = predicate . as_ref ( ) {
172168 row_groups. prune_by_statistics (
173169 & file_schema,
174170 builder. parquet_schema ( ) ,
175171 rg_metadata,
176- pruning_predicate ,
172+ predicate ,
177173 & file_metrics,
178174 ) ;
179175
@@ -182,7 +178,7 @@ impl FileOpener for ParquetOpener {
182178 . prune_by_bloom_filters (
183179 & file_schema,
184180 & mut builder,
185- pruning_predicate ,
181+ predicate ,
186182 & file_metrics,
187183 )
188184 . await ;
@@ -226,21 +222,8 @@ impl FileOpener for ParquetOpener {
226222 let adapted = stream
227223 . map_err ( |e| ArrowError :: ExternalError ( Box :: new ( e) ) )
228224 . map ( move |maybe_batch| {
229- maybe_batch. and_then ( |b| {
230- if !pushdown_filters {
231- if let Some ( predicate) = predicate. clone ( ) {
232- let updated_predicate = update_predicate_index (
233- Arc :: clone ( & predicate) ,
234- b. schema_ref ( ) ,
235- ) ?;
236-
237- let b = batch_filter ( & b, & updated_predicate) ?;
238-
239- return schema_mapping. map_batch ( b) . map_err ( Into :: into) ;
240- }
241- }
242- schema_mapping. map_batch ( b) . map_err ( Into :: into)
243- } )
225+ maybe_batch
226+ . and_then ( |b| schema_mapping. map_batch ( b) . map_err ( Into :: into) )
244227 } ) ;
245228
246229 Ok ( adapted. boxed ( ) )
@@ -280,40 +263,3 @@ fn create_initial_plan(
280263 // default to scanning all row groups
281264 Ok ( ParquetAccessPlan :: new_all ( row_group_count) )
282265}
283-
284- /// Return the predicate with updated column indexes
285- ///
286- /// The indexes of Column PhysicalExpr in predicate are based on
287- /// file_schema / table_schema, which might be different from the
288- /// RecordBatch schema returned by Parquet Reader
289- ///
290- /// Recursively find all Column PhysicalExpr
291- /// and update with indexes of RecordBatch Schema
292- ///
293- /// Returns an error if failed to update Column index
294- fn update_predicate_index (
295- predicate : Arc < dyn PhysicalExpr > ,
296- schema : & Arc < Schema > ,
297- ) -> std:: result:: Result < Arc < dyn PhysicalExpr > , ArrowError > {
298- let children = predicate. children ( ) ;
299-
300- if children. len ( ) == 0 {
301- if let Some ( column) = predicate. as_any ( ) . downcast_ref :: < Column > ( ) {
302- let name = column. name ( ) ;
303- let new_index = schema. index_of ( name) ?;
304- let new_column = Column :: new ( name, new_index) ;
305- return Ok ( Arc :: new ( new_column) ) ;
306- }
307- return Ok ( Arc :: clone ( & predicate) ) ;
308- }
309-
310- let mut new_children: Vec < Arc < dyn PhysicalExpr > > = Vec :: new ( ) ;
311- for child in children {
312- let updated_child = update_predicate_index ( Arc :: clone ( child) , schema) ?;
313- new_children. push ( updated_child) ;
314- }
315-
316- predicate
317- . with_new_children ( new_children)
318- . map_err ( Into :: into)
319- }
0 commit comments