From 9be5597dd392947058bd0a27f474a24612f730c8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 1 Feb 2025 12:39:33 -0500 Subject: [PATCH 1/4] chore: Update to DataFusion 45.0.0 Signed-off-by: Andrew Lamb --- Cargo.toml | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7721b485f9..4e7af7f0d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,16 +45,16 @@ object_store = { version = "0.11.2" , features = ["cloud"]} parquet = { version = "53" } # datafusion -datafusion = { version = "44" } -datafusion-expr = { version = "44" } -datafusion-common = { version = "44" } -datafusion-ffi = { version = "44" } -datafusion-functions = { version = "44" } -datafusion-functions-aggregate = { version = "44" } -datafusion-physical-expr = { version = "44" } -datafusion-physical-plan = { version = "44" } -datafusion-proto = { version = "44" } -datafusion-sql = { version = "44" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } +datafusion-ffi = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } +datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } +datafusion-functions-aggregate = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } +datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } +datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } # serde serde = { version = "1.0.194", features = ["derive"] } From 4cbc7b601ba3f884915d2f02985000d266fd4fda Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 1 Feb 2025 12:45:37 -0500 Subject: [PATCH 2/4] chore: update arrow/parquet and py03 Signed-off-by: Andrew Lamb --- Cargo.toml | 24 ++++++++++++------------ python/Cargo.toml | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4e7af7f0d4..4ec3bb7ba1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,19 +30,19 @@ delta_kernel = { version = "=0.6.0", features = ["default-engine"] } #delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow -arrow = { version = "53" } -arrow-arith = { version = "53" } -arrow-array = { version = "53", features = ["chrono-tz"] } -arrow-buffer = { version = "53" } -arrow-cast = { version = "53" } -arrow-ipc = { version = "53" } -arrow-json = { version = "53" } -arrow-ord = { version = "53" } -arrow-row = { version = "53" } -arrow-schema = { version = "53" } -arrow-select = { version = "53" } +arrow = { version = "54" } +arrow-arith = { version = "54" } +arrow-array = { version = "54", features = ["chrono-tz"] } +arrow-buffer = { version = "54" } +arrow-cast = { version = "54" } +arrow-ipc = { version = "54" } +arrow-json = { version = "54" } +arrow-ord = { version = "54" } +arrow-row = { version = "54" } +arrow-schema = { version = "54" } +arrow-select = { version = "54" } object_store = { version = "0.11.2" , features = ["cloud"]} -parquet = { version = "53" } +parquet = { version = "54" } # datafusion datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" } diff --git a/python/Cargo.toml b/python/Cargo.toml index 52c22a18e8..35acc8b7a2 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -60,8 +60,8 @@ jemallocator = { version = "0.5", features = ["disable_initial_exec_tls", "backg jemallocator = { version = "0.5", features = ["disable_initial_exec_tls"] } [dependencies.pyo3] -version = "0.22.6" -features = ["extension-module", "abi3", "abi3-py39", "gil-refs"] +version = "0.23.4" +features = ["extension-module", "abi3", "abi3-py39"] [dependencies.deltalake] path = "../crates/deltalake" From 5e25f880684ec6608f9770bd16683d17fe8ee957 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 1 Feb 2025 12:58:41 -0500 Subject: [PATCH 3/4] chore: update delta-kernel Signed-off-by: Andrew Lamb --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4ec3bb7ba1..cdb421ca61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "=0.6.0", features = ["default-engine"] } +delta_kernel = { version = "=0.6.1", features = ["default-engine"] } #delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] } # arrow From 2a9c99160cdd7bac4bcb02bee019ac22d99d562a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 1 Feb 2025 13:08:35 -0500 Subject: [PATCH 4/4] fix: make it compile Signed-off-by: Andrew Lamb --- crates/core/src/data_catalog/storage/mod.rs | 2 +- crates/core/src/delta_datafusion/mod.rs | 5 +++-- crates/core/src/kernel/snapshot/log_data.rs | 2 ++ crates/core/src/operations/load_cdf.rs | 5 ++++- crates/core/src/writer/json.rs | 2 ++ python/src/filesystem.rs | 14 +++++++++----- python/src/lib.rs | 6 ++++-- 7 files changed, 25 insertions(+), 11 deletions(-) diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index 236caf79a8..38e5bdc9c6 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -149,7 +149,7 @@ mod tests { use super::*; use datafusion::assert_batches_sorted_eq; use datafusion::catalog::CatalogProvider; - use datafusion::catalog_common::MemoryCatalogProvider; + use datafusion::catalog::MemoryCatalogProvider; use datafusion::execution::context::SessionContext; #[test] diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 3726da7725..dfa79c09b4 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -51,8 +51,8 @@ use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ - config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult, - TableReference, ToDFSchema, + config::ConfigOptions, Column, Constraints, DFSchema, DataFusionError, + Result as DataFusionResult, TableReference, ToDFSchema, }; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::logical_plan::CreateExternalTable; @@ -659,6 +659,7 @@ impl<'a> DeltaScanBuilder<'a> { } else { file_groups.into_values().collect() }, + constraints: Constraints::default(), statistics: stats, projection: self.projection.cloned(), limit: self.limit, diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 7ef65783e2..7f0415dbd3 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -638,6 +638,7 @@ mod datafusion { null_count, max_value, min_value, + sum_value: Precision::Absent, distinct_count: Precision::Absent, }) } @@ -653,6 +654,7 @@ mod datafusion { null_count: self.null_count.add(&other.null_count), max_value: self.max_value.max(&other.max_value), min_value: self.min_value.min(&other.min_value), + sum_value: Precision::Absent, distinct_count: self.distinct_count.add(&other.distinct_count), } } diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 9718c3eda1..7f4c8c910d 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -10,7 +10,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::prelude::SessionContext; -use datafusion_common::{ScalarValue, Statistics}; +use datafusion_common::{Constraints, ScalarValue, Statistics}; use datafusion_physical_expr::expressions; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::projection::ProjectionExec; @@ -389,6 +389,7 @@ impl CdfLoadBuilder { object_store_url: self.log_store.object_store_url(), file_schema: cdc_file_schema.clone(), file_groups: cdc_file_groups.into_values().collect(), + constraints: Constraints::default(), statistics: Statistics::new_unknown(&cdc_file_schema), projection: None, limit: None, @@ -406,6 +407,7 @@ impl CdfLoadBuilder { object_store_url: self.log_store.object_store_url(), file_schema: add_remove_file_schema.clone(), file_groups: add_file_groups.into_values().collect(), + constraints: Constraints::default(), statistics: Statistics::new_unknown(&add_remove_file_schema.clone()), projection: None, limit: None, @@ -423,6 +425,7 @@ impl CdfLoadBuilder { object_store_url: self.log_store.object_store_url(), file_schema: add_remove_file_schema.clone(), file_groups: remove_file_groups.into_values().collect(), + constraints: Constraints::default(), statistics: Statistics::new_unknown(&add_remove_file_schema), projection: None, limit: None, diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index 19b6c6d493..8b45e9c30a 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -349,6 +349,8 @@ impl DeltaWriter> for JsonWriter { ParquetError::IndexOutOfBound(u.to_owned(), v.to_owned()) } ParquetError::NYI(msg) => ParquetError::NYI(msg.to_owned()), + // ParquetError is non exhaustive, so have a fallback + e => ParquetError::General(e.to_string()), }, skipped_values: partial_writes, } diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index e2594e4a18..b20c2ea31d 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -4,6 +4,7 @@ use crate::RawDeltaTable; use deltalake::storage::object_store::{MultipartUpload, PutPayloadMut}; use deltalake::storage::{DynObjectStore, ListResult, ObjectStoreError, Path}; use deltalake::DeltaTableBuilder; +use parking_lot::Mutex; use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError}; use pyo3::prelude::*; use pyo3::types::{IntoPyDict, PyBytes, PyType}; @@ -519,7 +520,9 @@ impl ObjectInputFile { // TODO the C++ implementation track an internal lock on all random access files, DO we need this here? #[pyclass(weakref, module = "deltalake._internal")] pub struct ObjectOutputStream { - upload: Box, + // wrap in mutex as rustc says `MultipartUpload` can't be + // shared across threads (it isn't sync) + upload: Mutex>, pos: i64, #[pyo3(get)] closed: bool, @@ -537,7 +540,7 @@ impl ObjectOutputStream { ) -> Result { let upload = store.put_multipart(&path).await?; Ok(Self { - upload, + upload: Mutex::new(upload), pos: 0, closed: false, mode: "wb".into(), @@ -555,14 +558,15 @@ impl ObjectOutputStream { } fn abort(&mut self) -> PyResult<()> { - rt().block_on(self.upload.abort()) + rt().block_on(self.upload.lock().abort()) .map_err(PythonError::from)?; Ok(()) } fn upload_buffer(&mut self) -> PyResult<()> { let payload = std::mem::take(&mut self.buffer).freeze(); - match rt().block_on(self.upload.put_part(payload)) { + let res = rt().block_on(self.upload.lock().put_part(payload)); + match res { Ok(_) => Ok(()), Err(err) => { self.abort()?; @@ -580,7 +584,7 @@ impl ObjectOutputStream { if !self.buffer.is_empty() { self.upload_buffer()?; } - match rt().block_on(self.upload.complete()) { + match rt().block_on(self.upload.lock().complete()) { Ok(_) => Ok(()), Err(err) => Err(PyIOError::new_err(err.to_string())), } diff --git a/python/src/lib.rs b/python/src/lib.rs index 3c36da9ddf..248ad3d2fc 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1563,10 +1563,12 @@ impl RawDeltaTable { &self, py: Python<'py>, ) -> PyResult> { + // tokio runtime handle? + let handle = None; let name = CString::new("datafusion_table_provider").unwrap(); let table = self.with_table(|t| Ok(Arc::new(t.clone())))?; - let provider = FFI_TableProvider::new(table, false); + let provider = FFI_TableProvider::new(table, false, handle); PyCapsule::new_bound(py, provider, Some(name.clone())) } @@ -1820,7 +1822,7 @@ fn filestats_to_expression_next<'py>( })? .data_type() .clone(); - let column_type = PyArrowType(column_type).into_py(py); + let column_type = PyArrowType(column_type).into_pyobject(py)?; pa.call_method1("scalar", (value,))? .call_method1("cast", (column_type,)) };