@@ -21,7 +21,7 @@ use datafusion::datasource::file_format::FileFormat;
2121use datafusion:: datasource:: physical_plan:: FileScanConfig ;
2222use datafusion:: execution:: SessionState ;
2323use datafusion:: prelude:: SessionContext ;
24- use datafusion_common:: { Constraints , ScalarValue , Statistics } ;
24+ use datafusion_common:: ScalarValue ;
2525use datafusion_physical_expr:: { expressions, PhysicalExpr } ;
2626use datafusion_physical_plan:: projection:: ProjectionExec ;
2727use datafusion_physical_plan:: union:: UnionExec ;
@@ -378,53 +378,32 @@ impl CdfLoadBuilder {
378378 let cdc_scan = ParquetFormat :: new ( )
379379 . create_physical_plan (
380380 session_sate,
381- FileScanConfig {
382- object_store_url : self . log_store . object_store_url ( ) ,
383- file_schema : cdc_file_schema. clone ( ) ,
384- file_groups : cdc_file_groups. into_values ( ) . collect ( ) ,
385- constraints : Constraints :: default ( ) ,
386- statistics : Statistics :: new_unknown ( & cdc_file_schema) ,
387- projection : None ,
388- limit : None ,
389- table_partition_cols : cdc_partition_cols,
390- output_ordering : vec ! [ ] ,
391- } ,
381+ FileScanConfig :: new ( self . log_store . object_store_url ( ) , cdc_file_schema)
382+ . with_file_groups ( cdc_file_groups. into_values ( ) . collect ( ) )
383+ . with_table_partition_cols ( cdc_partition_cols) ,
392384 filters,
393385 )
394386 . await ?;
395387
396388 let add_scan = ParquetFormat :: new ( )
397389 . create_physical_plan (
398390 session_sate,
399- FileScanConfig {
400- object_store_url : self . log_store . object_store_url ( ) ,
401- file_schema : add_remove_file_schema. clone ( ) ,
402- file_groups : add_file_groups. into_values ( ) . collect ( ) ,
403- constraints : Constraints :: default ( ) ,
404- statistics : Statistics :: new_unknown ( & add_remove_file_schema. clone ( ) ) ,
405- projection : None ,
406- limit : None ,
407- table_partition_cols : add_remove_partition_cols. clone ( ) ,
408- output_ordering : vec ! [ ] ,
409- } ,
391+ FileScanConfig :: new (
392+ self . log_store . object_store_url ( ) ,
393+ add_remove_file_schema. clone ( ) ,
394+ )
395+ . with_file_groups ( add_file_groups. into_values ( ) . collect ( ) )
396+ . with_table_partition_cols ( add_remove_partition_cols. clone ( ) ) ,
410397 filters,
411398 )
412399 . await ?;
413400
414401 let remove_scan = ParquetFormat :: new ( )
415402 . create_physical_plan (
416403 session_sate,
417- FileScanConfig {
418- object_store_url : self . log_store . object_store_url ( ) ,
419- file_schema : add_remove_file_schema. clone ( ) ,
420- file_groups : remove_file_groups. into_values ( ) . collect ( ) ,
421- constraints : Constraints :: default ( ) ,
422- statistics : Statistics :: new_unknown ( & add_remove_file_schema) ,
423- projection : None ,
424- limit : None ,
425- table_partition_cols : add_remove_partition_cols,
426- output_ordering : vec ! [ ] ,
427- } ,
404+ FileScanConfig :: new ( self . log_store . object_store_url ( ) , add_remove_file_schema)
405+ . with_file_groups ( remove_file_groups. into_values ( ) . collect ( ) )
406+ . with_table_partition_cols ( add_remove_partition_cols) ,
428407 filters,
429408 )
430409 . await ?;
@@ -435,7 +414,7 @@ impl CdfLoadBuilder {
435414 Arc :: new ( UnionExec :: new ( vec ! [ cdc_scan, add_scan, remove_scan] ) ) ;
436415
437416 // We project the union in the order of the input_schema + cdc cols at the end
438- // This is to ensure the DeltaCdfTableProvider uses the correct schema consturction .
417+ // This is to ensure the DeltaCdfTableProvider uses the correct schema construction .
439418 let mut fields = schema. fields ( ) . to_vec ( ) ;
440419 for f in ADD_PARTITION_SCHEMA . clone ( ) {
441420 fields. push ( f. into ( ) ) ;
0 commit comments