Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 132 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

29 changes: 14 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/lakehq/sail"
# Define Minimum Supported Rust Version (MSRV) the same as DataFusion
rust-version = "1.82.0"
rust-version = "1.85.1"

[workspace.lints.clippy]
unwrap_used = "deny"
Expand Down Expand Up @@ -66,7 +66,7 @@ aws-credential-types = "1.2.4"
aws-smithy-runtime-api = "1.8.5"
aws-smithy-types = "1.3.2"
aws-smithy-async = "1.2.5"
clap = { version = "4.5.41", features = ["derive"] }
clap = { version = "4.5.42", features = ["derive"] }
num_enum = "0.7.4"
num-traits = "0.2.19"
log = "0.4.27"
Expand Down Expand Up @@ -113,19 +113,18 @@ tonic-types = "0.12.3"
# https://github.com/hyperium/tonic/blob/vRELEASE/tonic/Cargo.toml
prost-build = "0.13.5"
prost = "0.13.5"

datafusion = { version = "48.0.1", features = ["serde", "avro"] }
datafusion-common = { version = "48.0.1", features = ["object_store", "avro"] }
datafusion-expr = { version = "48.0.1" }
datafusion-expr-common = { version = "48.0.1" }
datafusion-execution = { version = "48.0.1" }
datafusion-physical-optimizer = { version = "48.0.1" }
datafusion-proto = { version = "48.0.1" }
datafusion-functions = { version = "48.0.1" }
datafusion-functions-nested = { version = "48.0.1" }
datafusion-physical-expr = { version = "48.0.1" }
datafusion-spark = { version = "48.0.1" }
datafusion-functions-json = { git = "https://github.com/lakehq/datafusion-functions-json.git", rev = "bdf221e" }
datafusion = { version = "49.0.0", features = ["serde", "avro"] }
datafusion-common = { version = "49.0.0", features = ["object_store", "avro"] }
datafusion-expr = { version = "49.0.0" }
datafusion-expr-common = { version = "49.0.0" }
datafusion-execution = { version = "49.0.0" }
datafusion-physical-optimizer = { version = "49.0.0" }
datafusion-proto = { version = "49.0.0" }
datafusion-functions = { version = "49.0.0" }
datafusion-functions-nested = { version = "49.0.0" }
datafusion-physical-expr = { version = "49.0.0" }
datafusion-spark = { version = "49.0.0" }
datafusion-functions-json = { git = "https://github.com/lakehq/datafusion-functions-json.git", rev = "d199e09" }
# The `pyo3` version must match the one used in `arrow-pyarrow` (replace `RELEASE` with the release we are using):
# https://github.com/apache/arrow-rs/blob/RELEASE/arrow-pyarrow/Cargo.toml
# auto-initialize: Changes [`Python::with_gil`] to automatically initialize the Python interpreter if needed.
Expand Down
12 changes: 6 additions & 6 deletions crates/sail-delta-lake/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,21 @@ mod schema_adapter;
/// Convert DeltaTableError to DataFusionError
pub fn delta_to_datafusion_error(err: DeltaTableError) -> DataFusionError {
match err {
DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source, None),
DeltaTableError::Arrow { source } => DataFusionError::ArrowError(Box::new(source), None),
DeltaTableError::Io { source } => DataFusionError::IoError(source),
DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source),
DeltaTableError::Parquet { source } => DataFusionError::ParquetError(source),
DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(Box::new(source)),
DeltaTableError::Parquet { source } => DataFusionError::ParquetError(Box::new(source)),
_ => DataFusionError::External(Box::new(err)),
}
}

/// Convert DataFusionError to DeltaTableError
pub fn datafusion_to_delta_error(err: DataFusionError) -> DeltaTableError {
match err {
DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source },
DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source: *source },
DataFusionError::IoError(source) => DeltaTableError::Io { source },
DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source },
DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source },
DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source: *source },
DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source: *source },
_ => DeltaTableError::Generic(err.to_string()),
}
}
Expand Down
22 changes: 19 additions & 3 deletions crates/sail-execution/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,19 @@ impl PhysicalExtensionCodec for RemoteExecutionCodec {
nulls_first: opt.nulls_first,
})
.collect();
let null_equality = if null_equals_null {
datafusion::common::NullEquality::NullEqualsNull
} else {
datafusion::common::NullEquality::NullEqualsNothing
};
Ok(Arc::new(SortMergeJoinExec::try_new(
left,
right,
on,
filter,
join_type,
sort_options,
null_equals_null,
null_equality,
)?))
}
// TODO: StreamingTableExec?
Expand Down Expand Up @@ -554,14 +559,18 @@ impl PhysicalExtensionCodec for RemoteExecutionCodec {
nulls_first: x.nulls_first,
})
.collect();
let null_equals_null = match sort_merge_join.null_equality() {
datafusion::common::NullEquality::NullEqualsNull => true,
datafusion::common::NullEquality::NullEqualsNothing => false,
};
NodeKind::SortMergeJoin(gen::SortMergeJoinExecNode {
left,
right,
on,
filter,
join_type,
sort_options,
null_equals_null: sort_merge_join.null_equals_null(),
null_equals_null,
})
} else if let Some(partial_sort) = node.as_any().downcast_ref::<PartialSortExec>() {
let expr = Some(self.try_encode_lex_ordering(partial_sort.expr())?);
Expand Down Expand Up @@ -1339,7 +1348,14 @@ impl RemoteExecutionCodec {
.iter()
.map(|x| self.try_decode_message(x))
.collect::<Result<_>>()?;
parse_physical_sort_exprs(&lex_ordering, registry, schema, self)
let lex_ordering = LexOrdering::new(
parse_physical_sort_exprs(&lex_ordering, registry, schema, self)
.map_err(|e| plan_datafusion_err!("failed to decode lex ordering: {e}"))?,
);
match lex_ordering {
Some(lex_ordering) => Ok(lex_ordering),
None => plan_err!("failed to decode lex ordering: invalid sort expressions"),
}
}

fn try_encode_lex_ordering(&self, lex_ordering: &LexOrdering) -> Result<gen::LexOrdering> {
Expand Down
8 changes: 4 additions & 4 deletions crates/sail-plan/src/extension/function/max_min_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl AggregateUDFImpl for MaxByFunction {
}
fn simplify(&self) -> Option<function::AggregateFunctionSimplification> {
let simplify = |mut aggr_func: AggregateFunction, _: &dyn SimplifyInfo| {
let mut order_by = aggr_func.params.order_by.unwrap_or_default();
let mut order_by = aggr_func.params.order_by;
let (second_arg, first_arg) = (
aggr_func.params.args.remove(1),
aggr_func.params.args.remove(0),
Expand All @@ -88,7 +88,7 @@ impl AggregateUDFImpl for MaxByFunction {
vec![first_arg],
aggr_func.params.distinct,
aggr_func.params.filter,
Some(order_by),
order_by,
aggr_func.params.null_treatment,
)))
};
Expand Down Expand Up @@ -154,7 +154,7 @@ impl AggregateUDFImpl for MinByFunction {

fn simplify(&self) -> Option<function::AggregateFunctionSimplification> {
let simplify = |mut aggr_func: AggregateFunction, _: &dyn SimplifyInfo| {
let mut order_by = aggr_func.params.order_by.unwrap_or_default();
let mut order_by = aggr_func.params.order_by;
let (second_arg, first_arg) = (
aggr_func.params.args.remove(1),
aggr_func.params.args.remove(0),
Expand All @@ -167,7 +167,7 @@ impl AggregateUDFImpl for MinByFunction {
vec![first_arg],
aggr_func.params.distinct,
aggr_func.params.filter,
Some(order_by),
order_by,
aggr_func.params.null_treatment,
)))
};
Expand Down
23 changes: 14 additions & 9 deletions crates/sail-plan/src/extension/physical/file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub async fn create_file_write_physical_plan(
let sort_order = if sort_by.is_empty() {
None
} else {
Some(create_sort_order(sort_by, &physical_input.schema())?)
create_sort_order(sort_by, &physical_input.schema())?
};
let info = SinkInfo {
input: physical_input,
Expand All @@ -64,8 +64,8 @@ pub async fn create_file_write_physical_plan(
.await
}

fn create_sort_order(sort_by: Vec<Sort>, schema: &Schema) -> Result<LexRequirement> {
let mut ordering = LexOrdering::default();
fn create_sort_order(sort_by: Vec<Sort>, schema: &Schema) -> Result<Option<LexRequirement>> {
let mut ordering = Vec::with_capacity(sort_by.len());
for sort in sort_by.iter() {
match &sort.expr {
Expr::Column(c) => {
Expand All @@ -81,10 +81,15 @@ fn create_sort_order(sort_by: Vec<Sort>, schema: &Schema) -> Result<LexRequireme
_ => return plan_err!("expected column expression in sort order: {sort_by:?}"),
}
}
Ok(LexRequirement::new(
ordering
.into_iter()
.map(PhysicalSortRequirement::from)
.collect::<Vec<_>>(),
))
let ordering = LexOrdering::new(ordering);
if let Some(ordering) = ordering {
Ok(LexRequirement::new(
ordering
.into_iter()
.map(PhysicalSortRequirement::from)
.collect::<Vec<_>>(),
))
} else {
Ok(None)
}
}
8 changes: 6 additions & 2 deletions crates/sail-plan/src/extension/physical/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use async_trait::async_trait;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{create_physical_sort_exprs, ExtensionPlanner, PhysicalPlanner};
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::{internal_err, Result};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_physical_expr::{create_physical_sort_exprs, LexOrdering};

use crate::extension::logical::{
FileWriteNode, MapPartitionsNode, RangeNode, SchemaPivotNode, ShowStringNode,
Expand Down Expand Up @@ -59,7 +60,10 @@ impl ExtensionPlanner for ExtensionPhysicalPlanner {
node.schema(),
session_state.execution_props(),
)?;
let sort = SortExec::new(expr, physical_inputs.one()?)
let Some(ordering) = LexOrdering::new(expr) else {
return internal_err!("SortExec requires at least one sort expression");
};
let sort = SortExec::new(ordering, physical_inputs.one()?)
.with_fetch(node.fetch())
.with_preserve_partitioning(true);
Arc::new(sort)
Expand Down
2 changes: 1 addition & 1 deletion crates/sail-plan/src/function/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ fn listagg(input: AggFunctionInput) -> PlanResult<expr::Expr> {
args: vec![agg_col.clone()],
distinct: input.distinct,
order_by: if input.distinct {
Some(vec![agg_col.clone().sort(true, true)])
vec![agg_col.clone().sort(true, true)]
} else {
input.order_by
},
Expand Down
2 changes: 1 addition & 1 deletion crates/sail-plan/src/function/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pub struct AggFunctionInput<'a> {
pub distinct: bool,
pub ignore_nulls: Option<bool>,
pub filter: Option<Box<expr::Expr>>,
pub order_by: Option<Vec<expr::Sort>>,
pub order_by: Vec<expr::Sort>,
pub function_context: FunctionContextInput<'a>,
}

Expand Down
17 changes: 9 additions & 8 deletions crates/sail-plan/src/resolver/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use datafusion::functions::core::expr_ext::FieldAccessor;
use datafusion::functions::core::get_field;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_common::{Column, DFSchemaRef, DataFusionError, TableReference};
use datafusion_expr::expr::{ScalarFunction, WindowFunctionParams};
use datafusion_expr::expr::{FieldMetadata, ScalarFunction, WindowFunctionParams};
use datafusion_expr::{
col, expr, expr_fn, lit, window_frame, AggregateUDF, BinaryExpr, ExprSchemable, Operator,
ScalarUDF,
Expand Down Expand Up @@ -71,7 +71,8 @@ impl NamedExpr {
.metadata
.as_ref()
.map(|x| {
x.iter()
x.inner()
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
})
Expand All @@ -92,7 +93,7 @@ impl NamedExpr {
expr: *alias.expr,
metadata: alias
.metadata
.map(|x| x.into_iter().collect())
.map(|x| x.to_hashmap().into_iter().collect())
.unwrap_or(vec![]),
}),
_ => Err(PlanError::invalid(
Expand Down Expand Up @@ -958,8 +959,8 @@ impl PlanResolver<'_> {
None => None,
};
let order_by = match order_by {
Some(x) => Some(self.resolve_sort_orders(x, true, schema, state).await?),
None => None,
Some(x) => self.resolve_sort_orders(x, true, schema, state).await?,
None => vec![],
};
let input = AggFunctionInput {
arguments,
Expand Down Expand Up @@ -1001,9 +1002,9 @@ impl PlanResolver<'_> {
let name = name.into_iter().map(|x| x.into()).collect::<Vec<String>>();
let expr = if let [n] = name.as_slice() {
if let Some(metadata) = metadata {
let metadata_map: Option<HashMap<String, String>> =
Some(metadata.into_iter().collect());
expr.alias_with_metadata(n, metadata_map)
let metadata_map: HashMap<String, String> = metadata.into_iter().collect();
let field_metadata = Some(FieldMetadata::from(metadata_map));
expr.alias_with_metadata(n, field_metadata)
} else {
expr.alias(n)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sail-plan/src/resolver/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl PlanResolver<'_> {
args: arguments,
distinct,
filter: None,
order_by: None,
order_by: vec![],
null_treatment: None,
},
}))
Expand Down
Loading