Skip to content

Commit 02f817c

Browse files
andygrovealamb
authored andcommitted
chore: Upgrade to DataFusion 44.0.0-rc2 (apache#1154)
* move aggregate expressions to spark-expr crate * move more expressions * move benchmark * normalize_nan * bitwise not * comet scalar funcs * update bench imports * save * save * save * remove unused imports * clippy * implement more hashers * implement Hash and PartialEq * implement Hash and PartialEq * implement Hash and PartialEq * benches * fix ScalarUDFImpl.return_type failure * exclude test from miri * ignore correct test * ignore another test * remove miri checks * use return_type_from_exprs * Revert "use return_type_from_exprs" This reverts commit febc1f1. * use DF main branch * hacky workaround for regression in ScalarUDFImpl.return_type * fix repo url * pin to revision * bump to latest rev * bump to latest DF rev * bump DF to rev 9f530dd * add Cargo.lock * bump DF version * no default features * Revert "remove miri checks" This reverts commit 4638fe3. * Update pin to DataFusion e99e02b9b9093ceb0c13a2dd32a2a89beba47930 * update pin * Update Cargo.toml Bump to 44.0.0-rc2 * update cargo lock * revert miri change --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 2dc6d3f commit 02f817c

35 files changed

+715
-1035
lines changed

native/Cargo.lock

Lines changed: 292 additions & 306 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,21 @@ edition = "2021"
3333
rust-version = "1.79"
3434

3535
[workspace.dependencies]
36-
arrow = { version = "53.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37-
arrow-array = { version = "53.2.0" }
38-
arrow-buffer = { version = "53.2.0" }
39-
arrow-data = { version = "53.2.0" }
40-
arrow-schema = { version = "53.2.0" }
41-
parquet = { version = "53.2.0", default-features = false, features = ["experimental"] }
42-
datafusion = { version = "43.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
43-
datafusion-common = { version = "43.0.0" }
44-
datafusion-functions = { version = "43.0.0", features = ["crypto_expressions"] }
45-
datafusion-functions-nested = { version = "43.0.0", default-features = false }
46-
datafusion-expr = { version = "43.0.0", default-features = false }
47-
datafusion-execution = { version = "43.0.0", default-features = false }
48-
datafusion-physical-plan = { version = "43.0.0", default-features = false }
49-
datafusion-physical-expr = { version = "43.0.0", default-features = false }
36+
arrow = { version = "53.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
37+
arrow-array = { version = "53.3.0" }
38+
arrow-buffer = { version = "53.3.0" }
39+
arrow-data = { version = "53.3.0" }
40+
arrow-schema = { version = "53.3.0" }
41+
parquet = { version = "53.3.0", default-features = false, features = ["experimental"] }
42+
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false, features = ["unicode_expressions", "crypto_expressions"] }
43+
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false }
44+
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false, features = ["crypto_expressions"] }
45+
datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false }
46+
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false }
47+
datafusion-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false }
48+
datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false }
49+
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false }
50+
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "44.0.0-rc2", default-features = false }
5051
datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" }
5152
datafusion-comet-proto = { path = "proto", version = "0.5.0" }
5253
chrono = { version = "0.4", default-features = false, features = ["clock"] }

native/core/src/execution/expressions/bloom_filter_might_contain.rs

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,37 @@ use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, parquet::data
1919
use arrow::record_batch::RecordBatch;
2020
use arrow_array::cast::as_primitive_array;
2121
use arrow_schema::{DataType, Schema};
22-
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
2322
use datafusion::physical_plan::ColumnarValue;
2423
use datafusion_common::{internal_err, Result, ScalarValue};
2524
use datafusion_physical_expr::PhysicalExpr;
26-
use std::{
27-
any::Any,
28-
fmt::Display,
29-
hash::{Hash, Hasher},
30-
sync::Arc,
31-
};
25+
use std::hash::Hash;
26+
use std::{any::Any, fmt::Display, sync::Arc};
3227

3328
/// A physical expression that checks if a value might be in a bloom filter. It corresponds to the
3429
/// Spark's `BloomFilterMightContain` expression.
35-
#[derive(Debug, Hash)]
30+
#[derive(Debug, Eq)]
3631
pub struct BloomFilterMightContain {
3732
pub bloom_filter_expr: Arc<dyn PhysicalExpr>,
3833
pub value_expr: Arc<dyn PhysicalExpr>,
3934
bloom_filter: Option<SparkBloomFilter>,
4035
}
4136

37+
impl Hash for BloomFilterMightContain {
38+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
39+
self.bloom_filter_expr.hash(state);
40+
self.value_expr.hash(state);
41+
self.bloom_filter.hash(state);
42+
}
43+
}
44+
45+
impl PartialEq for BloomFilterMightContain {
46+
fn eq(&self, other: &Self) -> bool {
47+
self.bloom_filter_expr.eq(&other.bloom_filter_expr)
48+
&& self.value_expr.eq(&other.value_expr)
49+
&& self.bloom_filter.eq(&other.bloom_filter)
50+
}
51+
}
52+
4253
impl Display for BloomFilterMightContain {
4354
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
4455
write!(
@@ -49,18 +60,6 @@ impl Display for BloomFilterMightContain {
4960
}
5061
}
5162

52-
impl PartialEq<dyn Any> for BloomFilterMightContain {
53-
fn eq(&self, _other: &dyn Any) -> bool {
54-
down_cast_any_ref(_other)
55-
.downcast_ref::<Self>()
56-
.map(|other| {
57-
self.bloom_filter_expr.eq(&other.bloom_filter_expr)
58-
&& self.value_expr.eq(&other.value_expr)
59-
})
60-
.unwrap_or(false)
61-
}
62-
}
63-
6463
fn evaluate_bloom_filter(
6564
bloom_filter_expr: &Arc<dyn PhysicalExpr>,
6665
) -> Result<Option<SparkBloomFilter>> {
@@ -141,11 +140,4 @@ impl PhysicalExpr for BloomFilterMightContain {
141140
Arc::clone(&children[1]),
142141
)?))
143142
}
144-
145-
fn dyn_hash(&self, state: &mut dyn Hasher) {
146-
let mut s = state;
147-
self.bloom_filter_expr.hash(&mut s);
148-
self.value_expr.hash(&mut s);
149-
self.hash(&mut s);
150-
}
151143
}

native/core/src/execution/expressions/subquery.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use crate::{
2222
use arrow_array::RecordBatch;
2323
use arrow_schema::{DataType, Schema, TimeUnit};
2424
use datafusion::logical_expr::ColumnarValue;
25-
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
2625
use datafusion_common::{internal_err, ScalarValue};
2726
use datafusion_physical_expr::PhysicalExpr;
2827
use jni::{
@@ -32,11 +31,11 @@ use jni::{
3231
use std::{
3332
any::Any,
3433
fmt::{Display, Formatter},
35-
hash::{Hash, Hasher},
34+
hash::Hash,
3635
sync::Arc,
3736
};
3837

39-
#[derive(Debug, Hash)]
38+
#[derive(Debug, Hash, PartialEq, Eq)]
4039
pub struct Subquery {
4140
/// The ID of the execution context that owns this subquery. We use this ID to retrieve the
4241
/// subquery result.
@@ -63,19 +62,6 @@ impl Display for Subquery {
6362
}
6463
}
6564

66-
impl PartialEq<dyn Any> for Subquery {
67-
fn eq(&self, other: &dyn Any) -> bool {
68-
down_cast_any_ref(other)
69-
.downcast_ref::<Self>()
70-
.map(|x| {
71-
self.id.eq(&x.id)
72-
&& self.data_type.eq(&x.data_type)
73-
&& self.exec_context_id.eq(&x.exec_context_id)
74-
})
75-
.unwrap_or(false)
76-
}
77-
}
78-
7965
impl PhysicalExpr for Subquery {
8066
fn as_any(&self) -> &dyn Any {
8167
self
@@ -209,9 +195,4 @@ impl PhysicalExpr for Subquery {
209195
) -> datafusion_common::Result<Arc<dyn PhysicalExpr>> {
210196
Ok(self)
211197
}
212-
213-
fn dyn_hash(&self, state: &mut dyn Hasher) {
214-
let mut s = state;
215-
self.hash(&mut s)
216-
}
217198
}

native/core/src/execution/jni_api.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
use arrow::datatypes::DataType as ArrowDataType;
2121
use arrow_array::RecordBatch;
2222
use datafusion::{
23-
execution::{
24-
disk_manager::DiskManagerConfig,
25-
runtime_env::{RuntimeConfig, RuntimeEnv},
26-
},
23+
execution::{disk_manager::DiskManagerConfig, runtime_env::RuntimeEnv},
2724
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
2825
prelude::{SessionConfig, SessionContext},
2926
};
@@ -52,6 +49,7 @@ use crate::{
5249
};
5350
use datafusion_comet_proto::spark_operator::Operator;
5451
use datafusion_common::ScalarValue;
52+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
5553
use futures::stream::StreamExt;
5654
use jni::{
5755
objects::GlobalRef,
@@ -188,7 +186,7 @@ fn prepare_datafusion_session_context(
188186
memory_fraction: f64,
189187
comet_task_memory_manager: Arc<GlobalRef>,
190188
) -> CometResult<SessionContext> {
191-
let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
189+
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs);
192190

193191
// Check if we are using unified memory manager integrated with Spark.
194192
if use_unified_memory_manager {
@@ -216,6 +214,7 @@ fn prepare_datafusion_session_context(
216214
&ScalarValue::Float64(Some(1.1)),
217215
);
218216

217+
#[allow(deprecated)]
219218
let runtime = RuntimeEnv::try_new(rt_config)?;
220219

221220
let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));

native/core/src/execution/operators/copy.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use arrow_array::{
3030
use arrow_data::transform::MutableArrayData;
3131
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaRef};
3232

33+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
3334
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
3435
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
3536
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
@@ -78,7 +79,8 @@ impl CopyExec {
7879
let cache = PlanProperties::new(
7980
EquivalenceProperties::new(Arc::clone(&schema)),
8081
Partitioning::UnknownPartitioning(1),
81-
ExecutionMode::Bounded,
82+
EmissionType::Final,
83+
Boundedness::Bounded,
8284
);
8385

8486
Self {

native/core/src/execution/operators/expand.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
use arrow_array::{RecordBatch, RecordBatchOptions};
1919
use arrow_schema::SchemaRef;
20+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
2021
use datafusion::{
2122
execution::TaskContext,
2223
physical_plan::{
23-
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
24+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
2425
RecordBatchStream, SendableRecordBatchStream,
2526
},
2627
};
@@ -54,7 +55,8 @@ impl ExpandExec {
5455
let cache = PlanProperties::new(
5556
EquivalenceProperties::new(Arc::clone(&schema)),
5657
Partitioning::UnknownPartitioning(1),
57-
ExecutionMode::Bounded,
58+
EmissionType::Final,
59+
Boundedness::Bounded,
5860
);
5961

6062
Self {

native/core/src/execution/operators/filter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ impl FilterExec {
210210
Ok(PlanProperties::new(
211211
eq_properties,
212212
input.output_partitioning().clone(), // Output Partitioning
213-
input.execution_mode(), // Execution Mode
213+
input.pipeline_behavior(),
214+
input.boundedness(),
214215
))
215216
}
216217
}

native/core/src/execution/operators/scan.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use arrow_data::ffi::FFI_ArrowArray;
2828
use arrow_data::ArrayData;
2929
use arrow_schema::ffi::FFI_ArrowSchema;
3030
use arrow_schema::{DataType, Field, Schema, SchemaRef};
31+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
3132
use datafusion::physical_plan::metrics::{
3233
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
3334
};
@@ -122,7 +123,8 @@ impl ScanExec {
122123
// The partitioning is not important because we are not using DataFusion's
123124
// query planner or optimizer
124125
Partitioning::UnknownPartitioning(1),
125-
ExecutionMode::Bounded,
126+
EmissionType::Final,
127+
Boundedness::Bounded,
126128
);
127129

128130
Ok(Self {

0 commit comments

Comments
 (0)