Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,35 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "=0.6.0", features = ["default-engine"] }
delta_kernel = { version = "=0.6.1", features = ["default-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
arrow = { version = "53" }
arrow-arith = { version = "53" }
arrow-array = { version = "53", features = ["chrono-tz"] }
arrow-buffer = { version = "53" }
arrow-cast = { version = "53" }
arrow-ipc = { version = "53" }
arrow-json = { version = "53" }
arrow-ord = { version = "53" }
arrow-row = { version = "53" }
arrow-schema = { version = "53" }
arrow-select = { version = "53" }
arrow = { version = "54" }
arrow-arith = { version = "54" }
arrow-array = { version = "54", features = ["chrono-tz"] }
arrow-buffer = { version = "54" }
arrow-cast = { version = "54" }
arrow-ipc = { version = "54" }
arrow-json = { version = "54" }
arrow-ord = { version = "54" }
arrow-row = { version = "54" }
arrow-schema = { version = "54" }
arrow-select = { version = "54" }
object_store = { version = "0.11.2" , features = ["cloud"]}
parquet = { version = "53" }
parquet = { version = "54" }

# datafusion
datafusion = { version = "44" }
datafusion-expr = { version = "44" }
datafusion-common = { version = "44" }
datafusion-ffi = { version = "44" }
datafusion-functions = { version = "44" }
datafusion-functions-aggregate = { version = "44" }
datafusion-physical-expr = { version = "44" }
datafusion-physical-plan = { version = "44" }
datafusion-proto = { version = "44" }
datafusion-sql = { version = "44" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }
datafusion-ffi = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }
datafusion-functions-aggregate = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", branch = "branch-45" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ mod tests {
use super::*;
use datafusion::assert_batches_sorted_eq;
use datafusion::catalog::CatalogProvider;
use datafusion::catalog_common::MemoryCatalogProvider;
use datafusion::catalog::MemoryCatalogProvider;
use datafusion::execution::context::SessionContext;

#[test]
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
TableReference, ToDFSchema,
config::ConfigOptions, Column, Constraints, DFSchema, DataFusionError,
Result as DataFusionResult, TableReference, ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::logical_plan::CreateExternalTable;
Expand Down Expand Up @@ -659,6 +659,7 @@ impl<'a> DeltaScanBuilder<'a> {
} else {
file_groups.into_values().collect()
},
constraints: Constraints::default(),
statistics: stats,
projection: self.projection.cloned(),
limit: self.limit,
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ mod datafusion {
null_count,
max_value,
min_value,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
})
}
Expand All @@ -653,6 +654,7 @@ mod datafusion {
null_count: self.null_count.add(&other.null_count),
max_value: self.max_value.max(&other.max_value),
min_value: self.min_value.min(&other.min_value),
sum_value: Precision::Absent,
distinct_count: self.distinct_count.add(&other.distinct_count),
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::prelude::SessionContext;
use datafusion_common::{ScalarValue, Statistics};
use datafusion_common::{Constraints, ScalarValue, Statistics};
use datafusion_physical_expr::expressions;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -389,6 +389,7 @@ impl CdfLoadBuilder {
object_store_url: self.log_store.object_store_url(),
file_schema: cdc_file_schema.clone(),
file_groups: cdc_file_groups.into_values().collect(),
constraints: Constraints::default(),
statistics: Statistics::new_unknown(&cdc_file_schema),
projection: None,
limit: None,
Expand All @@ -406,6 +407,7 @@ impl CdfLoadBuilder {
object_store_url: self.log_store.object_store_url(),
file_schema: add_remove_file_schema.clone(),
file_groups: add_file_groups.into_values().collect(),
constraints: Constraints::default(),
statistics: Statistics::new_unknown(&add_remove_file_schema.clone()),
projection: None,
limit: None,
Expand All @@ -423,6 +425,7 @@ impl CdfLoadBuilder {
object_store_url: self.log_store.object_store_url(),
file_schema: add_remove_file_schema.clone(),
file_groups: remove_file_groups.into_values().collect(),
constraints: Constraints::default(),
statistics: Statistics::new_unknown(&add_remove_file_schema),
projection: None,
limit: None,
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
ParquetError::IndexOutOfBound(u.to_owned(), v.to_owned())
}
ParquetError::NYI(msg) => ParquetError::NYI(msg.to_owned()),
// ParquetError is non exhaustive, so have a fallback
e => ParquetError::General(e.to_string()),
},
skipped_values: partial_writes,
}
Expand Down
4 changes: 2 additions & 2 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ jemallocator = { version = "0.5", features = ["disable_initial_exec_tls", "backg
jemallocator = { version = "0.5", features = ["disable_initial_exec_tls"] }

[dependencies.pyo3]
version = "0.22.6"
features = ["extension-module", "abi3", "abi3-py39", "gil-refs"]
version = "0.23.4"
features = ["extension-module", "abi3", "abi3-py39"]

[dependencies.deltalake]
path = "../crates/deltalake"
Expand Down
14 changes: 9 additions & 5 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::RawDeltaTable;
use deltalake::storage::object_store::{MultipartUpload, PutPayloadMut};
use deltalake::storage::{DynObjectStore, ListResult, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use parking_lot::Mutex;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBytes, PyType};
Expand Down Expand Up @@ -519,7 +520,9 @@ impl ObjectInputFile {
// TODO the C++ implementation track an internal lock on all random access files, DO we need this here?
#[pyclass(weakref, module = "deltalake._internal")]
pub struct ObjectOutputStream {
upload: Box<dyn MultipartUpload>,
// wrap in mutex as rustc says `MultipartUpload` can't be
// shared across threads (it isn't sync)
upload: Mutex<Box<dyn MultipartUpload>>,
pos: i64,
#[pyo3(get)]
closed: bool,
Expand All @@ -537,7 +540,7 @@ impl ObjectOutputStream {
) -> Result<Self, ObjectStoreError> {
let upload = store.put_multipart(&path).await?;
Ok(Self {
upload,
upload: Mutex::new(upload),
pos: 0,
closed: false,
mode: "wb".into(),
Expand All @@ -555,14 +558,15 @@ impl ObjectOutputStream {
}

fn abort(&mut self) -> PyResult<()> {
rt().block_on(self.upload.abort())
rt().block_on(self.upload.lock().abort())
.map_err(PythonError::from)?;
Ok(())
}

fn upload_buffer(&mut self) -> PyResult<()> {
let payload = std::mem::take(&mut self.buffer).freeze();
match rt().block_on(self.upload.put_part(payload)) {
let res = rt().block_on(self.upload.lock().put_part(payload));
match res {
Ok(_) => Ok(()),
Err(err) => {
self.abort()?;
Expand All @@ -580,7 +584,7 @@ impl ObjectOutputStream {
if !self.buffer.is_empty() {
self.upload_buffer()?;
}
match rt().block_on(self.upload.complete()) {
match rt().block_on(self.upload.lock().complete()) {
Ok(_) => Ok(()),
Err(err) => Err(PyIOError::new_err(err.to_string())),
}
Expand Down
6 changes: 4 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1563,10 +1563,12 @@ impl RawDeltaTable {
&self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
// tokio runtime handle?
let handle = None;
let name = CString::new("datafusion_table_provider").unwrap();

let table = self.with_table(|t| Ok(Arc::new(t.clone())))?;
let provider = FFI_TableProvider::new(table, false);
let provider = FFI_TableProvider::new(table, false, handle);

PyCapsule::new_bound(py, provider, Some(name.clone()))
}
Expand Down Expand Up @@ -1820,7 +1822,7 @@ fn filestats_to_expression_next<'py>(
})?
.data_type()
.clone();
let column_type = PyArrowType(column_type).into_py(py);
let column_type = PyArrowType(column_type).into_pyobject(py)?;
pa.call_method1("scalar", (value,))?
.call_method1("cast", (column_type,))
};
Expand Down
Loading