Skip to content

Commit 24f5209

Browse files
authored
chore: upgrade to DataFusion 50.0.0, Arrow 56.1.0, Parquet 56.0.0 among others (#2286)
1 parent 0e7c26b commit 24f5209

File tree

24 files changed

+317
-191
lines changed

24 files changed

+317
-191
lines changed

native/Cargo.lock

Lines changed: 198 additions & 144 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ license = "Apache-2.0"
3131
edition = "2021"
3232

3333
# Comet uses the same minimum Rust version as DataFusion
34-
rust-version = "1.85"
34+
rust-version = "1.86"
3535

3636
[workspace.dependencies]
37-
arrow = { version = "55.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37+
arrow = { version = "56.0.0", features = ["prettyprint", "ffi", "chrono-tz"] }
3838
async-trait = { version = "0.1" }
3939
bytes = { version = "1.10.0" }
40-
parquet = { version = "55.2.0", default-features = false, features = ["experimental"] }
41-
datafusion = { version = "49.0.2", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
42-
datafusion-spark = { version = "49.0.2" }
40+
parquet = { version = "=56.0.0", default-features = false, features = ["experimental"] }
41+
datafusion = { version = "50.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
42+
datafusion-spark = { version = "50.0.0" }
4343
datafusion-comet-spark-expr = { path = "spark-expr" }
4444
datafusion-comet-proto = { path = "proto" }
4545
chrono = { version = "0.4", default-features = false, features = ["clock"] }

native/core/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ publish = false
3636

3737
[dependencies]
3838
arrow = { workspace = true }
39-
parquet = { workspace = true, default-features = false, features = ["experimental"] }
39+
parquet = { workspace = true, default-features = false, features = ["experimental", "arrow"] }
4040
futures = { workspace = true }
4141
mimalloc = { version = "*", default-features = false, optional = true }
4242
tikv-jemallocator = { version = "0.6.0", optional = true, features = ["disable_initial_exec_tls"] }
@@ -91,7 +91,7 @@ jni = { version = "0.21", features = ["invocation"] }
9191
lazy_static = "1.4"
9292
assertables = "9"
9393
hex = "0.4.3"
94-
datafusion-functions-nested = { version = "49.0.2" }
94+
datafusion-functions-nested = { version = "50.0.0" }
9595

9696
[features]
9797
default = []

native/core/src/execution/jni_api.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ use datafusion::execution::memory_pool::MemoryPool;
3535
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3636
use datafusion::logical_expr::ScalarUDF;
3737
use datafusion::{
38-
execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv},
38+
execution::disk_manager::DiskManagerBuilder,
3939
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
4040
prelude::{SessionConfig, SessionContext},
4141
};
4242
use datafusion_comet_proto::spark_operator::Operator;
4343
use datafusion_spark::function::hash::sha2::SparkSha2;
4444
use datafusion_spark::function::math::expm1::SparkExpm1;
45-
use datafusion_spark::function::string::char::SparkChar;
45+
use datafusion_spark::function::string::char::CharFunc;
4646
use futures::poll;
4747
use futures::stream::StreamExt;
4848
use jni::objects::JByteBuffer;
@@ -291,8 +291,7 @@ fn prepare_datafusion_session_context(
291291
&ScalarValue::Float64(Some(1.1)),
292292
);
293293

294-
#[allow(deprecated)]
295-
let runtime = RuntimeEnv::try_new(rt_config)?;
294+
let runtime = rt_config.build()?;
296295

297296
let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));
298297

@@ -301,7 +300,7 @@ fn prepare_datafusion_session_context(
301300
// register UDFs from datafusion-spark crate
302301
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkExpm1::default()));
303302
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha2::default()));
304-
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkChar::default()));
303+
session_ctx.register_udf(ScalarUDF::new_from_impl(CharFunc::default()));
305304

306305
// Must be the last one to override existing functions with the same name
307306
datafusion_comet_spark_expr::register_all_comet_functions(&mut session_ctx)?;

native/core/src/execution/planner.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use datafusion::physical_plan::InputOrderMode;
4040
use datafusion::{
4141
arrow::{compute::SortOptions, datatypes::SchemaRef},
4242
common::DataFusionError,
43+
config::ConfigOptions,
4344
execution::FunctionRegistry,
4445
functions_aggregate::first_last::{FirstValue, LastValue},
4546
logical_expr::Operator as DataFusionOperator,
@@ -623,8 +624,13 @@ impl PhysicalPlanner {
623624
let args = vec![child];
624625
let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone)));
625626
let field_ref = Arc::new(Field::new("hour", DataType::Int32, true));
626-
let expr: ScalarFunctionExpr =
627-
ScalarFunctionExpr::new("hour", comet_hour, args, field_ref);
627+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
628+
"hour",
629+
comet_hour,
630+
args,
631+
field_ref,
632+
Arc::new(ConfigOptions::default()),
633+
);
628634

629635
Ok(Arc::new(expr))
630636
}
@@ -635,8 +641,13 @@ impl PhysicalPlanner {
635641
let args = vec![child];
636642
let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone)));
637643
let field_ref = Arc::new(Field::new("minute", DataType::Int32, true));
638-
let expr: ScalarFunctionExpr =
639-
ScalarFunctionExpr::new("minute", comet_minute, args, field_ref);
644+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
645+
"minute",
646+
comet_minute,
647+
args,
648+
field_ref,
649+
Arc::new(ConfigOptions::default()),
650+
);
640651

641652
Ok(Arc::new(expr))
642653
}
@@ -647,8 +658,13 @@ impl PhysicalPlanner {
647658
let args = vec![child];
648659
let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone)));
649660
let field_ref = Arc::new(Field::new("second", DataType::Int32, true));
650-
let expr: ScalarFunctionExpr =
651-
ScalarFunctionExpr::new("second", comet_second, args, field_ref);
661+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
662+
"second",
663+
comet_second,
664+
args,
665+
field_ref,
666+
Arc::new(ConfigOptions::default()),
667+
);
652668

653669
Ok(Arc::new(expr))
654670
}
@@ -870,8 +886,13 @@ impl PhysicalPlanner {
870886
ScalarUDF::new_from_impl(BloomFilterMightContain::try_new(bloom_filter_expr)?);
871887

872888
let field_ref = Arc::new(Field::new("might_contain", DataType::Boolean, true));
873-
let expr: ScalarFunctionExpr =
874-
ScalarFunctionExpr::new("might_contain", Arc::new(udf), args, field_ref);
889+
let expr: ScalarFunctionExpr = ScalarFunctionExpr::new(
890+
"might_contain",
891+
Arc::new(udf),
892+
args,
893+
field_ref,
894+
Arc::new(ConfigOptions::default()),
895+
);
875896
Ok(Arc::new(expr))
876897
}
877898
ExprStruct::CreateNamedStruct(expr) => {
@@ -1090,6 +1111,7 @@ impl PhysicalPlanner {
10901111
fun_expr,
10911112
vec![left, right],
10921113
Arc::new(Field::new(func_name, data_type, true)),
1114+
Arc::new(ConfigOptions::default()),
10931115
)))
10941116
}
10951117
_ => {
@@ -1115,6 +1137,7 @@ impl PhysicalPlanner {
11151137
fun_expr,
11161138
vec![left, right],
11171139
Arc::new(Field::new(op_str, data_type, true)),
1140+
Arc::new(ConfigOptions::default()),
11181141
)))
11191142
} else {
11201143
Ok(Arc::new(BinaryExpr::new(left, op, right)))
@@ -2354,6 +2377,8 @@ impl PhysicalPlanner {
23542377
window_frame.into(),
23552378
input_schema.as_ref(),
23562379
false, // TODO: Ignore nulls
2380+
false, // TODO: Spark does not support DISTINCT ... OVER
2381+
None,
23572382
)
23582383
.map_err(|e| ExecutionError::DataFusionError(e.to_string()))
23592384
}
@@ -2533,6 +2558,7 @@ impl PhysicalPlanner {
25332558
fun_expr,
25342559
args.to_vec(),
25352560
Arc::new(Field::new(fun_name, data_type, true)),
2561+
Arc::new(ConfigOptions::default()),
25362562
));
25372563

25382564
Ok(scalar_expr)

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1350,7 +1350,7 @@ mod test {
13501350
#[tokio::test]
13511351
async fn shuffle_repartitioner_memory() {
13521352
let batch = create_batch(900);
1353-
assert_eq!(8376, batch.get_array_memory_size());
1353+
assert_eq!(8316, batch.get_array_memory_size()); // Not stable across Arrow versions
13541354

13551355
let memory_limit = 512 * 1024;
13561356
let num_partitions = 2;

native/spark-expr/src/agg_funcs/avg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use datafusion::logical_expr::Volatility::Immutable;
3737
use DataType::*;
3838

3939
/// AVG aggregate expression
40-
#[derive(Debug, Clone)]
40+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4141
pub struct Avg {
4242
name: String,
4343
signature: Signature,

native/spark-expr/src/agg_funcs/avg_decimal.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use num::{integer::div_ceil, Integer};
4040
use DataType::*;
4141

4242
/// AVG aggregate expression
43-
#[derive(Debug, Clone)]
43+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4444
pub struct AvgDecimal {
4545
signature: Signature,
4646
sum_data_type: DataType,

native/spark-expr/src/agg_funcs/correlation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use datafusion::physical_expr::expressions::StatsType;
3838
/// we have our own implementation is that DataFusion has UInt64 for state_field `count`,
3939
/// while Spark has Double for count. Also we have added `null_on_divide_by_zero`
4040
/// to be consistent with Spark's implementation.
41-
#[derive(Debug)]
41+
#[derive(Debug, PartialEq, Eq, Hash)]
4242
pub struct Correlation {
4343
name: String,
4444
signature: Signature,

native/spark-expr/src/agg_funcs/covariance.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,23 @@ use std::sync::Arc;
3838
/// The implementation mostly is the same as the DataFusion's implementation. The reason
3939
/// we have our own implementation is that DataFusion has UInt64 for state_field count,
4040
/// while Spark has Double for count.
41-
#[derive(Debug, Clone)]
41+
#[derive(Debug, Clone, PartialEq, Eq)]
4242
pub struct Covariance {
4343
name: String,
4444
signature: Signature,
4545
stats_type: StatsType,
4646
null_on_divide_by_zero: bool,
4747
}
4848

49+
impl std::hash::Hash for Covariance {
50+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
51+
self.name.hash(state);
52+
self.signature.hash(state);
53+
(self.stats_type as u8).hash(state);
54+
self.null_on_divide_by_zero.hash(state);
55+
}
56+
}
57+
4958
impl Covariance {
5059
/// Create a new COVAR aggregate function
5160
pub fn new(

0 commit comments

Comments
 (0)