Skip to content

Commit 082f9a8

Browse files
committed
chore: Update to DataFusion 46.0.0, update for API chanages
Signed-off-by: Andrew Lamb <[email protected]>
1 parent 49ed2c8 commit 082f9a8

File tree

13 files changed

+129
-128
lines changed

13 files changed

+129
-128
lines changed

.github/workflows/build.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
uses: actions-rs/toolchain@v1
2121
with:
2222
profile: default
23-
toolchain: '1.81'
23+
toolchain: '1.82'
2424
override: true
2525

2626
- name: Format
@@ -42,7 +42,7 @@ jobs:
4242
uses: actions-rs/toolchain@v1
4343
with:
4444
profile: default
45-
toolchain: '1.81'
45+
toolchain: '1.82'
4646
override: true
4747

4848
- name: build and lint with clippy
@@ -79,7 +79,7 @@ jobs:
7979
uses: actions-rs/toolchain@v1
8080
with:
8181
profile: default
82-
toolchain: '1.81'
82+
toolchain: '1.82'
8383
override: true
8484

8585
- name: Run tests
@@ -114,7 +114,7 @@ jobs:
114114
uses: actions-rs/toolchain@v1
115115
with:
116116
profile: default
117-
toolchain: '1.81'
117+
toolchain: '1.82'
118118
override: true
119119

120120
# Install Java and Hadoop for HDFS integration tests
@@ -160,7 +160,7 @@ jobs:
160160
uses: actions-rs/toolchain@v1
161161
with:
162162
profile: default
163-
toolchain: '1.81'
163+
toolchain: '1.82'
164164
override: true
165165

166166
- name: Download Lakectl

.github/workflows/codecov.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
uses: actions-rs/toolchain@v1
2121
with:
2222
profile: default
23-
toolchain: '1.81'
23+
toolchain: '1.82'
2424
override: true
2525
- name: Install cargo-llvm-cov
2626
uses: taiki-e/install-action@cargo-llvm-cov

Cargo.toml

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ resolver = "2"
55

66
[workspace.package]
77
authors = ["Qingping Hou <[email protected]>"]
8-
rust-version = "1.81"
8+
rust-version = "1.82"
99
keywords = ["deltalake", "delta", "datalake"]
1010
readme = "README.md"
1111
edition = "2021"
@@ -44,17 +44,16 @@ arrow-select = { version = "54" }
4444
object_store = { version = "0.11.2" , features = ["cloud"]}
4545
parquet = { version = "54" }
4646

47-
# datafusion
48-
datafusion = "45"
49-
datafusion-expr = "45"
50-
datafusion-common = "45"
51-
datafusion-ffi = "45"
52-
datafusion-functions = "45"
53-
datafusion-functions-aggregate = "45"
54-
datafusion-physical-expr = "45"
55-
datafusion-physical-plan = "45"
56-
datafusion-proto = "45"
57-
datafusion-sql = "45"
47+
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
48+
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
49+
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
50+
datafusion-ffi = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
51+
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
52+
datafusion-functions-aggregate = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
53+
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
54+
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
55+
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
56+
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-46" }
5857

5958
# serde
6059
serde = { version = "1.0.194", features = ["derive"] }
@@ -77,4 +76,3 @@ async-trait = { version = "0.1" }
7776
futures = { version = "0.3" }
7877
tokio = { version = "1" }
7978
num_cpus = { version = "1" }
80-

crates/core/src/delta_datafusion/expr.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ use datafusion_common::Result as DFResult;
3434
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
3535
use datafusion_expr::expr::InList;
3636
use datafusion_expr::planner::ExprPlanner;
37-
use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource};
37+
use datafusion_expr::{
38+
AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, ScalarFunctionArgs, TableSource,
39+
};
3840
// Needed for MakeParquetArray
3941
use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature};
4042
use datafusion_functions::core::planner::CoreFunctionPlanner;
@@ -99,13 +101,13 @@ impl ScalarUDFImpl for MakeParquetArray {
99101
r_type
100102
}
101103

102-
fn invoke_batch(&self, args: &[ColumnarValue], number_rows: usize) -> Result<ColumnarValue> {
104+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
103105
let mut data_type = DataType::Null;
104-
for arg in args {
106+
for arg in &args.args {
105107
data_type = arg.data_type();
106108
}
107109

108-
match self.actual.invoke_batch(args, number_rows)? {
110+
match self.actual.invoke_with_args(args)? {
109111
ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
110112
let field = Arc::new(Field::new("element", data_type, true));
111113
let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new(

crates/core/src/delta_datafusion/mod.rs

Lines changed: 53 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ use async_trait::async_trait;
3939
use chrono::{DateTime, TimeZone, Utc};
4040
use datafusion::catalog::{Session, TableProviderFactory};
4141
use datafusion::config::TableParquetOptions;
42-
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
4342
use datafusion::datasource::physical_plan::{
44-
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
43+
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, ParquetSource,
4544
};
4645
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
4746
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
@@ -648,36 +647,39 @@ impl<'a> DeltaScanBuilder<'a> {
648647
..Default::default()
649648
};
650649

651-
let mut exec_plan_builder = ParquetExecBuilder::new(
652-
FileScanConfig::new(self.log_store.object_store_url(), file_schema)
653-
.with_file_groups(
654-
// If all files were filtered out, we still need to emit at least one partition to
655-
// pass datafusion sanity checks.
656-
//
657-
// See https://github.com/apache/datafusion/issues/11322
658-
if file_groups.is_empty() {
659-
vec![vec![]]
660-
} else {
661-
file_groups.into_values().collect()
662-
},
663-
)
664-
.with_statistics(stats)
665-
.with_projection(self.projection.cloned())
666-
.with_limit(self.limit)
667-
.with_table_partition_cols(table_partition_cols),
668-
)
669-
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}))
670-
.with_table_parquet_options(parquet_options);
650+
let mut file_source = ParquetSource::new(parquet_options)
651+
.with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}));
671652

672653
// Sometimes (i.e Merge) we want to prune files that don't make the
673654
// filter and read the entire contents for files that do match the
674655
// filter
675656
if let Some(predicate) = logical_filter {
676657
if config.enable_parquet_pushdown {
677-
exec_plan_builder = exec_plan_builder.with_predicate(predicate);
658+
file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
678659
}
679660
};
680661

662+
let file_scan_config = FileScanConfig::new(
663+
self.log_store.object_store_url(),
664+
file_schema,
665+
Arc::new(file_source),
666+
)
667+
.with_file_groups(
668+
// If all files were filtered out, we still need to emit at least one partition to
669+
// pass datafusion sanity checks.
670+
//
671+
// See https://github.com/apache/datafusion/issues/11322
672+
if file_groups.is_empty() {
673+
vec![vec![]]
674+
} else {
675+
file_groups.into_values().collect()
676+
},
677+
)
678+
.with_statistics(stats)
679+
.with_projection(self.projection.cloned())
680+
.with_limit(self.limit)
681+
.with_table_partition_cols(table_partition_cols);
682+
681683
let metrics = ExecutionPlanMetricsSet::new();
682684
MetricBuilder::new(&metrics)
683685
.global_counter("files_scanned")
@@ -688,7 +690,7 @@ impl<'a> DeltaScanBuilder<'a> {
688690

689691
Ok(DeltaScan {
690692
table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
691-
parquet_scan: exec_plan_builder.build_arc(),
693+
parquet_scan: file_scan_config.build(),
692694
config,
693695
logical_schema,
694696
metrics,
@@ -1960,7 +1962,7 @@ mod tests {
19601962
use bytes::Bytes;
19611963
use chrono::{TimeZone, Utc};
19621964
use datafusion::assert_batches_sorted_eq;
1963-
use datafusion::datasource::physical_plan::ParquetExec;
1965+
use datafusion::datasource::source::DataSourceExec;
19641966
use datafusion::physical_plan::empty::EmptyExec;
19651967
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
19661968
use datafusion_expr::lit;
@@ -2713,7 +2715,7 @@ mod tests {
27132715
.await
27142716
.unwrap();
27152717

2716-
let mut visitor = ParquetPredicateVisitor::default();
2718+
let mut visitor = ParquetVisitor::default();
27172719
visit_execution_plan(&scan, &mut visitor).unwrap();
27182720

27192721
assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
@@ -2748,7 +2750,7 @@ mod tests {
27482750
.await
27492751
.unwrap();
27502752

2751-
let mut visitor = ParquetPredicateVisitor::default();
2753+
let mut visitor = ParquetVisitor::default();
27522754
visit_execution_plan(&scan, &mut visitor).unwrap();
27532755

27542756
assert!(visitor.predicate.is_none());
@@ -2777,42 +2779,46 @@ mod tests {
27772779
.await
27782780
.unwrap();
27792781

2780-
let mut visitor = ParquetOptionsVisitor::default();
2782+
let mut visitor = ParquetVisitor::default();
27812783
visit_execution_plan(&scan, &mut visitor).unwrap();
27822784

27832785
assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
27842786
}
27852787

2788+
/// Extracts fields from the parquet scan
27862789
#[derive(Default)]
2787-
struct ParquetPredicateVisitor {
2790+
struct ParquetVisitor {
27882791
predicate: Option<Arc<dyn PhysicalExpr>>,
27892792
pruning_predicate: Option<Arc<PruningPredicate>>,
2793+
options: Option<TableParquetOptions>,
27902794
}
27912795

2792-
impl ExecutionPlanVisitor for ParquetPredicateVisitor {
2796+
impl ExecutionPlanVisitor for ParquetVisitor {
27932797
type Error = DataFusionError;
27942798

27952799
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
2796-
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
2797-
self.predicate = parquet_exec.predicate().cloned();
2798-
self.pruning_predicate = parquet_exec.pruning_predicate().cloned();
2799-
}
2800-
Ok(true)
2801-
}
2802-
}
2803-
2804-
#[derive(Default)]
2805-
struct ParquetOptionsVisitor {
2806-
options: Option<TableParquetOptions>,
2807-
}
2800+
let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
2801+
return Ok(true);
2802+
};
28082803

2809-
impl ExecutionPlanVisitor for ParquetOptionsVisitor {
2810-
type Error = DataFusionError;
2804+
let Some(scan_config) = datasource_exec
2805+
.data_source()
2806+
.as_any()
2807+
.downcast_ref::<FileScanConfig>()
2808+
else {
2809+
return Ok(true);
2810+
};
28112811

2812-
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
2813-
if let Some(parquet_exec) = plan.as_any().downcast_ref::<ParquetExec>() {
2814-
self.options = Some(parquet_exec.table_parquet_options().clone())
2812+
if let Some(parquet_source) = scan_config
2813+
.file_source
2814+
.as_any()
2815+
.downcast_ref::<ParquetSource>()
2816+
{
2817+
self.options = Some(parquet_source.table_parquet_options().clone());
2818+
self.predicate = parquet_source.predicate().cloned();
2819+
self.pruning_predicate = parquet_source.pruning_predicate().cloned();
28152820
}
2821+
28162822
Ok(true)
28172823
}
28182824
}

crates/core/src/operations/load_cdf.rs

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@ use std::time::SystemTime;
1616
use arrow_array::RecordBatch;
1717
use arrow_schema::{ArrowError, Field, Schema};
1818
use chrono::{DateTime, Utc};
19-
use datafusion::datasource::file_format::parquet::ParquetFormat;
20-
use datafusion::datasource::file_format::FileFormat;
21-
use datafusion::datasource::physical_plan::FileScanConfig;
19+
use datafusion::datasource::physical_plan::{FileScanConfig, FileSource, ParquetSource};
2220
use datafusion::execution::SessionState;
2321
use datafusion::prelude::SessionContext;
22+
use datafusion_common::config::TableParquetOptions;
2423
use datafusion_common::ScalarValue;
2524
use datafusion_physical_expr::{expressions, PhysicalExpr};
2625
use datafusion_physical_plan::projection::ProjectionExec;
@@ -369,38 +368,38 @@ impl CdfLoadBuilder {
369368
)?;
370369

371370
// Create the parquet scans for each associated type of file.
372-
let cdc_scan = ParquetFormat::new()
373-
.create_physical_plan(
374-
session_sate,
375-
FileScanConfig::new(self.log_store.object_store_url(), cdc_file_schema)
376-
.with_file_groups(cdc_file_groups.into_values().collect())
377-
.with_table_partition_cols(cdc_partition_cols),
378-
filters,
379-
)
380-
.await?;
381-
382-
let add_scan = ParquetFormat::new()
383-
.create_physical_plan(
384-
session_sate,
385-
FileScanConfig::new(
386-
self.log_store.object_store_url(),
387-
add_remove_file_schema.clone(),
388-
)
389-
.with_file_groups(add_file_groups.into_values().collect())
390-
.with_table_partition_cols(add_remove_partition_cols.clone()),
391-
filters,
392-
)
393-
.await?;
394-
395-
let remove_scan = ParquetFormat::new()
396-
.create_physical_plan(
397-
session_sate,
398-
FileScanConfig::new(self.log_store.object_store_url(), add_remove_file_schema)
399-
.with_file_groups(remove_file_groups.into_values().collect())
400-
.with_table_partition_cols(add_remove_partition_cols),
401-
filters,
402-
)
403-
.await?;
371+
let mut parquet_source = ParquetSource::new(TableParquetOptions::new());
372+
if let Some(filters) = filters {
373+
parquet_source =
374+
parquet_source.with_predicate(Arc::clone(&cdc_file_schema), Arc::clone(filters));
375+
}
376+
let parquet_source: Arc<dyn FileSource> = Arc::new(parquet_source);
377+
let cdc_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
378+
self.log_store.object_store_url(),
379+
Arc::clone(&cdc_file_schema),
380+
Arc::clone(&parquet_source),
381+
)
382+
.with_file_groups(cdc_file_groups.into_values().collect())
383+
.with_table_partition_cols(cdc_partition_cols)
384+
.build();
385+
386+
let add_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
387+
self.log_store.object_store_url(),
388+
Arc::clone(&add_remove_file_schema),
389+
Arc::clone(&parquet_source),
390+
)
391+
.with_file_groups(add_file_groups.into_values().collect())
392+
.with_table_partition_cols(add_remove_partition_cols.clone())
393+
.build();
394+
395+
let remove_scan: Arc<dyn ExecutionPlan> = FileScanConfig::new(
396+
self.log_store.object_store_url(),
397+
Arc::clone(&add_remove_file_schema),
398+
parquet_source,
399+
)
400+
.with_file_groups(remove_file_groups.into_values().collect())
401+
.with_table_partition_cols(add_remove_partition_cols)
402+
.build();
404403

405404
// The output batches are then unioned to create a single output. Coalesce partitions is only here for the time
406405
// being for development. I plan to parallelize the reads once the base idea is correct.

0 commit comments

Comments
 (0)