Skip to content

Commit cc426c8

Browse files
authored
Bump DataFusion to 50 and arrow to 56 (vortex-data#4577)
Just made sure there's no breaking changes coming our way. Includes support for the two new Arrow decimal types (32 and 64 bits), but without converting from Vortex dtype into them. Codspeed improvements are due to a new inline hint on arrow's `BitIterator::next` (found by @0ax1 🥳). Open issues: - [ ] apache/datafusion#17489 - mostly affects if/how much support we want to provide to decimals, or whether we need to provide some other solution for DataFusion. - [ ] Issue with followups - vortex-data#4668 --------- Signed-off-by: Adam Gutglick <[email protected]>
1 parent 23edc3d commit cc426c8

File tree

10 files changed

+384
-146
lines changed

10 files changed

+384
-146
lines changed

Cargo.lock

Lines changed: 120 additions & 95 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,19 @@ anyhow = "1.0.95"
5959
arbitrary = "1.3.2"
6060
arcref = "0.2.0"
6161
arrayref = "0.3.7"
62-
arrow-arith = "55.2.0"
63-
arrow-array = "55.2.0"
64-
arrow-buffer = "55.2.0"
65-
arrow-cast = "55.2.0"
66-
arrow-data = "55.2.0"
67-
arrow-ipc = "55.2.0"
68-
arrow-ord = "55.2.0"
69-
arrow-schema = "55.2.0"
70-
arrow-select = "55.2.0"
71-
arrow-string = "55.2.0"
62+
arrow-arith = "56"
63+
arrow-array = "56"
64+
arrow-buffer = "56"
65+
arrow-cast = "56"
66+
arrow-data = "56"
67+
arrow-ipc = "56"
68+
arrow-ord = "56"
69+
arrow-schema = "56"
70+
arrow-select = "56"
71+
arrow-string = "56"
7272
async-compat = "0.2.5"
7373
async-stream = "0.3.6"
74-
async-trait = "0.1.88"
74+
async-trait = "0.1.89"
7575
bindgen = "0.72.0"
7676
bit-vec = "0.8.0"
7777
bitvec = "1.0.1"
@@ -86,15 +86,17 @@ crossbeam-deque = "0.8.6"
8686
crossbeam-queue = "0.3.12"
8787
crossterm = "0.29"
8888
dashmap = "6.1.0"
89-
datafusion = { version = "49", default-features = false }
90-
datafusion-catalog = { version = "49" }
91-
datafusion-common = { version = "49" }
92-
datafusion-common-runtime = { version = "49" }
93-
datafusion-datasource = { version = "49", default-features = false }
94-
datafusion-execution = { version = "49" }
95-
datafusion-expr = { version = "49" }
96-
datafusion-physical-expr = { version = "49" }
97-
datafusion-physical-plan = { version = "49" }
89+
datafusion = { version = "50", default-features = false }
90+
datafusion-catalog = { version = "50" }
91+
datafusion-common = { version = "50" }
92+
datafusion-common-runtime = { version = "50" }
93+
datafusion-datasource = { version = "50", default-features = false }
94+
datafusion-execution = { version = "50" }
95+
datafusion-expr = { version = "50" }
96+
datafusion-physical-expr = { version = "50" }
97+
datafusion-physical-expr-adapter = { version = "50" }
98+
datafusion-physical-expr-common = { version = "50" }
99+
datafusion-physical-plan = { version = "50" }
98100
dirs = "6.0.0"
99101
divan = { package = "codspeed-divan-compat", version = "3.0" }
100102
dyn-hash = "0.2.0"
@@ -135,7 +137,7 @@ opentelemetry = "0.30.0"
135137
opentelemetry-otlp = "0.30.0"
136138
opentelemetry_sdk = "0.30.0"
137139
parking_lot = { version = "0.12.3", features = ["nightly"] }
138-
parquet = "55.2.0"
140+
parquet = "56"
139141
paste = "1.0.15"
140142
pco = "0.4.4"
141143
pin-project = "1.1.5"
@@ -181,17 +183,16 @@ target-lexicon = "0.13"
181183
tempfile = "3"
182184
termtree = { version = "0.5" }
183185
thiserror = "2.0.3"
184-
tokio = { version = "1.46" }
186+
tokio = { version = "1.47" }
185187
tokio-stream = "0.1.17"
186188
tokio-util = { version = "0.7.16" }
187-
# replace these with releases
188-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "d849ff430cd52250f6891ed4d5e3adad77bb2698" }
189-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "d849ff430cd52250f6891ed4d5e3adad77bb2698" }
189+
tpchgen = { version = "2" }
190+
tpchgen-arrow = { version = "2" }
190191
tracing = { version = "0.1.41" }
191192
tracing-perfetto = "0.1.5"
192193
tracing-subscriber = "0.3.20"
193-
url = "2.5.4"
194-
uuid = { version = "1.17", features = ["js"] }
194+
url = "2.5.7"
195+
uuid = { version = "1.18", features = ["js"] }
195196
walkdir = "2.5.0"
196197
wasm-bindgen-futures = "0.4.39"
197198
witchcraft-metrics = "1.0.1"

bench-vortex/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ datafusion = { workspace = true, features = [
2929
"parquet",
3030
"datetime_expressions",
3131
"nested_expressions",
32+
"unicode_expressions",
3233
] }
3334
datafusion-common = { workspace = true }
3435
datafusion-physical-plan = { workspace = true }

vortex-array/src/arrow/compute/to_arrow/canonical.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use arrow_array::types::{
1010
};
1111
use arrow_array::{
1212
Array, ArrayRef as ArrowArrayRef, ArrowPrimitiveType, BooleanArray as ArrowBoolArray,
13+
Decimal32Array as ArrowDecimal32Array, Decimal64Array as ArrowDecimal64Array,
1314
Decimal128Array as ArrowDecimal128Array, Decimal256Array as ArrowDecimal256Array,
1415
FixedSizeListArray as ArrowFixedSizeListArray, GenericByteArray, GenericByteViewArray,
1516
GenericListArray, NullArray as ArrowNullArray, OffsetSizeTrait,
@@ -116,6 +117,34 @@ impl Kernel for ToArrowCanonical {
116117
{
117118
to_arrow_primitive::<Float64Type>(array)
118119
}
120+
(Canonical::Decimal(array), DataType::Decimal32(precision, scale)) => {
121+
if array.decimal_dtype().precision() != *precision
122+
|| array.decimal_dtype().scale() != *scale
123+
{
124+
vortex_bail!(
125+
"ToArrowCanonical: target precision/scale {}/{} does not match array precision/scale {}/{}",
126+
precision,
127+
scale,
128+
array.decimal_dtype().precision(),
129+
array.decimal_dtype().scale()
130+
);
131+
}
132+
to_arrow_decimal32(array)
133+
}
134+
(Canonical::Decimal(array), DataType::Decimal64(precision, scale)) => {
135+
if array.decimal_dtype().precision() != *precision
136+
|| array.decimal_dtype().scale() != *scale
137+
{
138+
vortex_bail!(
139+
"ToArrowCanonical: target precision/scale {}/{} does not match array precision/scale {}/{}",
140+
precision,
141+
scale,
142+
array.decimal_dtype().precision(),
143+
array.decimal_dtype().scale()
144+
);
145+
}
146+
to_arrow_decimal64(array)
147+
}
119148
(Canonical::Decimal(array), DataType::Decimal128(precision, scale)) => {
120149
if array.decimal_dtype().precision() != *precision
121150
|| array.decimal_dtype().scale() != *scale
@@ -223,6 +252,91 @@ fn to_arrow_primitive<T: ArrowPrimitiveType>(array: PrimitiveArray) -> VortexRes
223252
)))
224253
}
225254

255+
fn to_arrow_decimal32(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
256+
let null_buffer = array.validity_mask().to_null_buffer();
257+
let buffer: Buffer<i32> = match array.values_type() {
258+
DecimalValueType::I8 => {
259+
Buffer::from_trusted_len_iter(array.buffer::<i8>().into_iter().map(|x| x.as_()))
260+
}
261+
DecimalValueType::I16 => {
262+
Buffer::from_trusted_len_iter(array.buffer::<i16>().into_iter().map(|x| x.as_()))
263+
}
264+
DecimalValueType::I32 => array.buffer::<i32>(),
265+
DecimalValueType::I64 => array
266+
.buffer::<i64>()
267+
.into_iter()
268+
.map(|x| {
269+
x.to_i32()
270+
.ok_or_else(|| vortex_err!("i64 to i32 narrowing cannot be done safely"))
271+
})
272+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
273+
DecimalValueType::I128 => array
274+
.buffer::<i128>()
275+
.into_iter()
276+
.map(|x| {
277+
x.to_i32()
278+
.ok_or_else(|| vortex_err!("i128 to i32 narrowing cannot be done safely"))
279+
})
280+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
281+
DecimalValueType::I256 => array
282+
.buffer::<vortex_scalar::i256>()
283+
.into_iter()
284+
.map(|x| {
285+
x.to_i32()
286+
.ok_or_else(|| vortex_err!("i256 to i32 narrowing cannot be done safely"))
287+
})
288+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
289+
_ => vortex_bail!("unknown value type {:?}", array.values_type()),
290+
};
291+
Ok(Arc::new(
292+
ArrowDecimal32Array::new(buffer.into_arrow_scalar_buffer(), null_buffer)
293+
.with_precision_and_scale(
294+
array.decimal_dtype().precision(),
295+
array.decimal_dtype().scale(),
296+
)?,
297+
))
298+
}
299+
300+
fn to_arrow_decimal64(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
301+
let null_buffer = array.validity_mask().to_null_buffer();
302+
let buffer: Buffer<i64> = match array.values_type() {
303+
DecimalValueType::I8 => {
304+
Buffer::from_trusted_len_iter(array.buffer::<i8>().into_iter().map(|x| x.as_()))
305+
}
306+
DecimalValueType::I16 => {
307+
Buffer::from_trusted_len_iter(array.buffer::<i16>().into_iter().map(|x| x.as_()))
308+
}
309+
DecimalValueType::I32 => {
310+
Buffer::from_trusted_len_iter(array.buffer::<i32>().into_iter().map(|x| x.as_()))
311+
}
312+
DecimalValueType::I64 => array.buffer::<i64>(),
313+
DecimalValueType::I128 => array
314+
.buffer::<i128>()
315+
.into_iter()
316+
.map(|x| {
317+
x.to_i64()
318+
.ok_or_else(|| vortex_err!("i128 to i64 narrowing cannot be done safely"))
319+
})
320+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
321+
DecimalValueType::I256 => array
322+
.buffer::<vortex_scalar::i256>()
323+
.into_iter()
324+
.map(|x| {
325+
x.to_i64()
326+
.ok_or_else(|| vortex_err!("i256 to i64 narrowing cannot be done safely"))
327+
})
328+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
329+
_ => vortex_bail!("unknown value type {:?}", array.values_type()),
330+
};
331+
Ok(Arc::new(
332+
ArrowDecimal64Array::new(buffer.into_arrow_scalar_buffer(), null_buffer)
333+
.with_precision_and_scale(
334+
array.decimal_dtype().precision(),
335+
array.decimal_dtype().scale(),
336+
)?,
337+
))
338+
}
339+
226340
fn to_arrow_decimal128(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
227341
let null_buffer = array.validity_mask().to_null_buffer();
228342
let buffer: Buffer<i128> = match array.values_type() {
@@ -586,6 +700,60 @@ mod tests {
586700
assert_eq!(arrow_decimal.value(2), 12);
587701
}
588702

703+
#[rstest]
704+
#[case(0i8)]
705+
#[case(0i16)]
706+
#[case(0i32)]
707+
#[case(0i64)]
708+
#[case(0i128)]
709+
#[case(vortex_scalar::i256::ZERO)]
710+
fn to_arrow_decimal32<T: NativeDecimalType>(#[case] _decimal_type: T) {
711+
use arrow_array::Decimal32Array;
712+
713+
let mut decimal = DecimalBuilder::new::<T>(2, 1, false.into());
714+
decimal.append_value(10);
715+
decimal.append_value(11);
716+
decimal.append_value(12);
717+
718+
let decimal = decimal.finish();
719+
720+
let arrow_array = decimal.into_arrow(&DataType::Decimal32(2, 1)).unwrap();
721+
let arrow_decimal = arrow_array
722+
.as_any()
723+
.downcast_ref::<Decimal32Array>()
724+
.unwrap();
725+
assert_eq!(arrow_decimal.value(0), 10);
726+
assert_eq!(arrow_decimal.value(1), 11);
727+
assert_eq!(arrow_decimal.value(2), 12);
728+
}
729+
730+
#[rstest]
731+
#[case(0i8)]
732+
#[case(0i16)]
733+
#[case(0i32)]
734+
#[case(0i64)]
735+
#[case(0i128)]
736+
#[case(vortex_scalar::i256::ZERO)]
737+
fn to_arrow_decimal64<T: NativeDecimalType>(#[case] _decimal_type: T) {
738+
use arrow_array::Decimal64Array;
739+
740+
let mut decimal = DecimalBuilder::new::<T>(2, 1, false.into());
741+
decimal.append_value(10);
742+
decimal.append_value(11);
743+
decimal.append_value(12);
744+
745+
let decimal = decimal.finish();
746+
747+
let arrow_array = decimal.into_arrow(&DataType::Decimal64(2, 1)).unwrap();
748+
let arrow_decimal = arrow_array
749+
.as_any()
750+
.downcast_ref::<Decimal64Array>()
751+
.unwrap();
752+
assert_eq!(arrow_decimal.value(0), 10);
753+
assert_eq!(arrow_decimal.value(1), 11);
754+
assert_eq!(arrow_decimal.value(2), 12);
755+
}
756+
589757
#[rstest]
590758
#[case(0i8)]
591759
#[case(0i16)]

vortex-array/src/arrow/convert.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use std::sync::Arc;
55

66
use arrow_array::cast::{AsArray, as_null_array};
77
use arrow_array::types::{
8-
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
9-
Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
10-
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
11-
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
12-
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
8+
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal32Type, Decimal64Type,
9+
Decimal128Type, Decimal256Type, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type,
10+
Int32Type, Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
11+
Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType,
12+
TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
1313
};
1414
use arrow_array::{
1515
Array as ArrowArray, ArrowPrimitiveType, BooleanArray as ArrowBooleanArray,
@@ -104,6 +104,24 @@ impl_from_arrow_primitive!(Float16Type);
104104
impl_from_arrow_primitive!(Float32Type);
105105
impl_from_arrow_primitive!(Float64Type);
106106

107+
impl FromArrowArray<&ArrowPrimitiveArray<Decimal32Type>> for ArrayRef {
108+
fn from_arrow(array: &ArrowPrimitiveArray<Decimal32Type>, nullable: bool) -> Self {
109+
let decimal_type = DecimalDType::new(array.precision(), array.scale());
110+
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
111+
let validity = nulls(array.nulls(), nullable);
112+
DecimalArray::new(buffer, decimal_type, validity).into_array()
113+
}
114+
}
115+
116+
impl FromArrowArray<&ArrowPrimitiveArray<Decimal64Type>> for ArrayRef {
117+
fn from_arrow(array: &ArrowPrimitiveArray<Decimal64Type>, nullable: bool) -> Self {
118+
let decimal_type = DecimalDType::new(array.precision(), array.scale());
119+
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
120+
let validity = nulls(array.nulls(), nullable);
121+
DecimalArray::new(buffer, decimal_type, validity).into_array()
122+
}
123+
}
124+
107125
impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
108126
fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, nullable: bool) -> Self {
109127
let decimal_type = DecimalDType::new(array.precision(), array.scale());
@@ -433,6 +451,12 @@ impl FromArrowArray<&dyn ArrowArray> for ArrayRef {
433451
}
434452
ArrowTimeUnit::Second | ArrowTimeUnit::Millisecond => unreachable!(),
435453
},
454+
DataType::Decimal32(..) => {
455+
Self::from_arrow(array.as_primitive::<Decimal32Type>(), nullable)
456+
}
457+
DataType::Decimal64(..) => {
458+
Self::from_arrow(array.as_primitive::<Decimal64Type>(), nullable)
459+
}
436460
DataType::Decimal128(..) => {
437461
Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
438462
}

vortex-datafusion/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ datafusion-datasource = { workspace = true, default-features = false }
2424
datafusion-execution = { workspace = true }
2525
datafusion-expr = { workspace = true }
2626
datafusion-physical-expr = { workspace = true }
27+
datafusion-physical-expr-adapter = { workspace = true }
28+
datafusion-physical-expr-common = { workspace = true }
2729
datafusion-physical-plan = { workspace = true }
2830
futures = { workspace = true }
2931
itertools = { workspace = true }

vortex-datafusion/src/convert/exprs.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
use arrow_schema::{DataType, Schema};
77
use datafusion_expr::Operator as DFOperator;
88
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
9+
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
910
use datafusion_physical_plan::expressions as df_expr;
1011
use vortex::error::{VortexResult, vortex_bail, vortex_err};
1112
use vortex::expr::{BinaryExpr, ExprRef, LikeExpr, Operator, and, get_item, lit, root};
@@ -122,6 +123,12 @@ impl TryFromDataFusion<DFOperator> for Operator {
122123
}
123124

124125
pub(crate) fn can_be_pushed_down(expr: &PhysicalExprRef, schema: &Schema) -> bool {
126+
// We currently do not support pushdown of dynamic expressions in DF.
127+
// See issue: https://github.com/vortex-data/vortex/issues/4034
128+
if is_dynamic_physical_expr(expr) {
129+
return false;
130+
}
131+
125132
let expr = expr.as_any();
126133
if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
127134
can_binary_be_pushed_down(binary, schema)

0 commit comments

Comments
 (0)