diff --git a/Cargo.lock b/Cargo.lock index 3e5a63e6fef..2f69caedbc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4285,6 +4285,7 @@ dependencies = [ "rand", "rstest", "serde", + "static_assertions", "tokio", "vortex-buffer", "vortex-datetime-dtype", diff --git a/Cargo.toml b/Cargo.toml index 0132f249ef2..e70bea7afee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,6 +119,7 @@ serde = "1.0.197" serde_json = "1.0.116" serde_test = "1.0.176" simplelog = { version = "0.12.2", features = ["paris"] } +static_assertions = "1" tar = "0.4" tempfile = "3" thiserror = "1.0.58" diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index ae9e598fa5e..a6fcd1eb77f 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -69,7 +69,7 @@ fn vortex_compress_tpch(c: &mut Criterion) { let comments_canonical = comments .into_canonical() .unwrap() - .into_varbin() + .into_varbinview() .unwrap() .into_array(); group.bench_function("compress-fsst-canonicalized", |b| { diff --git a/bench-vortex/src/tpch/schema.rs b/bench-vortex/src/tpch/schema.rs index eb4971b66d1..6d15ec37f8f 100644 --- a/bench-vortex/src/tpch/schema.rs +++ b/bench-vortex/src/tpch/schema.rs @@ -7,62 +7,62 @@ use lazy_static::lazy_static; lazy_static! { pub static ref NATION: Schema = Schema::new(vec![ Field::new("n_nationkey", DataType::Int64, false), - Field::new("n_name", DataType::Utf8, false), + Field::new("n_name", DataType::Utf8View, false), Field::new("n_regionkey", DataType::Int64, false), - Field::new("n_comment", DataType::Utf8, true), + Field::new("n_comment", DataType::Utf8View, true), ]); pub static ref REGION: Schema = Schema::new(vec![ Field::new("r_regionkey", DataType::Int64, false), - Field::new("r_name", DataType::Utf8, false), - Field::new("r_comment", DataType::Utf8, true), + Field::new("r_name", DataType::Utf8View, false), + Field::new("r_comment", DataType::Utf8View, true), ]); pub static ref PART: Schema = Schema::new(vec![ Field::new("p_partkey", DataType::Int64, false), - Field::new("p_name", DataType::Utf8, false), - Field::new("p_mfgr", DataType::Utf8, false), - Field::new("p_brand", DataType::Utf8, false), - Field::new("p_type", DataType::Utf8, false), + Field::new("p_name", DataType::Utf8View, false), + Field::new("p_mfgr", DataType::Utf8View, false), + Field::new("p_brand", DataType::Utf8View, false), + Field::new("p_type", DataType::Utf8View, false), Field::new("p_size", DataType::Int32, false), - Field::new("p_container", DataType::Utf8, false), + Field::new("p_container", DataType::Utf8View, false), Field::new("p_retailprice", DataType::Float64, false), - Field::new("p_comment", DataType::Utf8, false), + Field::new("p_comment", DataType::Utf8View, false), ]); pub static ref SUPPLIER: Schema = Schema::new(vec![ Field::new("s_suppkey", DataType::Int64, false), - Field::new("s_name", DataType::Utf8, false), - Field::new("s_address", DataType::Utf8, false), + Field::new("s_name", DataType::Utf8View, false), + Field::new("s_address", DataType::Utf8View, false), Field::new("s_nationkey", DataType::Int32, false), - Field::new("s_phone", DataType::Utf8, false), + Field::new("s_phone", DataType::Utf8View, false), Field::new("s_acctbal", DataType::Float64, false), - Field::new("s_comment", DataType::Utf8, false), + Field::new("s_comment", DataType::Utf8View, false), ]); pub static ref PARTSUPP: Schema = Schema::new(vec![ Field::new("ps_partkey", DataType::Int64, false), Field::new("ps_suppkey", DataType::Int64, false), Field::new("ps_availqty", DataType::Int64, false), Field::new("ps_supplycost", DataType::Float64, false), - Field::new("ps_comment", DataType::Utf8, false), + Field::new("ps_comment", DataType::Utf8View, false), ]); pub static ref CUSTOMER: Schema = Schema::new(vec![ Field::new("c_custkey", DataType::Int64, false), - Field::new("c_name", DataType::Utf8, false), - Field::new("c_address", DataType::Utf8, false), + Field::new("c_name", DataType::Utf8View, false), + Field::new("c_address", DataType::Utf8View, false), Field::new("c_nationkey", DataType::Int64, false), - Field::new("c_phone", DataType::Utf8, false), + Field::new("c_phone", DataType::Utf8View, false), Field::new("c_acctbal", DataType::Float64, false), - Field::new("c_mktsegment", DataType::Utf8, false), - Field::new("c_comment", DataType::Utf8, false), + Field::new("c_mktsegment", DataType::Utf8View, false), + Field::new("c_comment", DataType::Utf8View, false), ]); pub static ref ORDERS: Schema = Schema::new(vec![ Field::new("o_orderkey", DataType::Int64, false), Field::new("o_custkey", DataType::Int64, false), - Field::new("o_orderstatus", DataType::Utf8, false), + Field::new("o_orderstatus", DataType::Utf8View, false), Field::new("o_totalprice", DataType::Float64, false), Field::new("o_orderdate", DataType::Date32, false), - Field::new("o_orderpriority", DataType::Utf8, false), - Field::new("o_clerk", DataType::Utf8, false), + Field::new("o_orderpriority", DataType::Utf8View, false), + Field::new("o_clerk", DataType::Utf8View, false), Field::new("o_shippriority", DataType::Int32, false), - Field::new("o_comment", DataType::Utf8, false), + Field::new("o_comment", DataType::Utf8View, false), ]); pub static ref LINEITEM: Schema = Schema::new(vec![ Field::new("l_orderkey", DataType::Int64, false), @@ -73,13 +73,13 @@ lazy_static! { Field::new("l_extendedprice", DataType::Float64, false), Field::new("l_discount", DataType::Float64, false), Field::new("l_tax", DataType::Float64, false), - Field::new("l_returnflag", DataType::Utf8, false), - Field::new("l_linestatus", DataType::Utf8, false), + Field::new("l_returnflag", DataType::Utf8View, false), + Field::new("l_linestatus", DataType::Utf8View, false), Field::new("l_shipdate", DataType::Date32, false), Field::new("l_commitdate", DataType::Date32, false), Field::new("l_receiptdate", DataType::Date32, false), - Field::new("l_shipinstruct", DataType::Utf8, false), - Field::new("l_shipmode", DataType::Utf8, false), - Field::new("l_comment", DataType::Utf8, false), + Field::new("l_shipinstruct", DataType::Utf8View, false), + Field::new("l_shipmode", DataType::Utf8View, false), + Field::new("l_comment", DataType::Utf8View, false), ]); } diff --git a/encodings/dict/src/compress.rs b/encodings/dict/src/compress.rs index 56f2e21173d..798ebbdc0f3 100644 --- a/encodings/dict/src/compress.rs +++ b/encodings/dict/src/compress.rs @@ -5,7 +5,7 @@ use hashbrown::hash_map::{Entry, RawEntryMut}; use hashbrown::HashMap; use num_traits::AsPrimitive; use vortex::accessor::ArrayAccessor; -use vortex::array::{PrimitiveArray, VarBinArray}; +use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray}; use vortex::validity::Validity; use vortex::{ArrayDType, IntoArray}; use vortex_dtype::{match_each_native_ptype, DType, NativePType, ToBytes}; @@ -89,6 +89,13 @@ pub fn dict_encode_varbin(array: &VarBinArray) -> (PrimitiveArray, VarBinArray) .unwrap() } +/// Dictionary encode a VarbinViewArray. +pub fn dict_encode_varbinview(array: &VarBinViewArray) -> (PrimitiveArray, VarBinArray) { + array + .with_iterator(|iter| dict_encode_typed_varbin(array.dtype().clone(), iter)) + .unwrap() +} + fn lookup_bytes<'a, T: NativePType + AsPrimitive>( offsets: &'a [T], bytes: &'a [u8], diff --git a/encodings/dict/src/compute.rs b/encodings/dict/src/compute.rs index e5a6f0b7ee9..3e954019deb 100644 --- a/encodings/dict/src/compute.rs +++ b/encodings/dict/src/compute.rs @@ -55,11 +55,11 @@ impl SliceFn for DictArray { #[cfg(test)] mod test { - use vortex::array::{PrimitiveArray, VarBinArray}; + use vortex::accessor::ArrayAccessor; + use vortex::array::{PrimitiveArray, VarBinViewArray}; use vortex::{IntoArray, IntoArrayVariant, ToArray}; - use vortex_dtype::{DType, Nullability}; - use crate::{dict_encode_typed_primitive, dict_encode_varbin, DictArray}; + use crate::{dict_encode_typed_primitive, dict_encode_varbinview, DictArray}; #[test] fn flatten_nullable_primitive() { @@ -79,20 +79,30 @@ mod test { #[test] fn flatten_nullable_varbin() { - let reference = VarBinArray::from_iter( - vec![Some("a"), Some("b"), None, Some("a"), None, Some("b")], - DType::Utf8(Nullability::Nullable), - ); - let (codes, values) = dict_encode_varbin(&reference); + let reference = VarBinViewArray::from_iter_nullable_str(vec![ + Some("a"), + Some("b"), + None, + Some("a"), + None, + Some("b"), + ]); + assert_eq!(reference.len(), 6); + let (codes, values) = dict_encode_varbinview(&reference); let dict = DictArray::try_new(codes.into_array(), values.into_array()).unwrap(); - let flattened_dict = dict.to_array().into_varbin().unwrap(); - assert_eq!( - flattened_dict.offsets().into_primitive().unwrap().buffer(), - reference.offsets().into_primitive().unwrap().buffer() - ); + let flattened_dict = dict.to_array().into_varbinview().unwrap(); + assert_eq!( - flattened_dict.bytes().into_primitive().unwrap().buffer(), - reference.bytes().into_primitive().unwrap().buffer() + flattened_dict + .with_iterator(|iter| iter + .map(|slice| slice.map(|s| s.to_vec())) + .collect::>()) + .unwrap(), + reference + .with_iterator(|iter| iter + .map(|slice| slice.map(|s| s.to_vec())) + .collect::>()) + .unwrap(), ); } } diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index 4e2bf21f739..f1dd1bee19d 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -1,7 +1,5 @@ -use arrow_array::builder::GenericByteBuilder; -use arrow_array::types::BinaryType; -use fsst::Symbol; -use vortex::array::VarBinArray; +use arrow_array::builder::BinaryViewBuilder; +use vortex::array::VarBinViewArray; use vortex::arrow::FromArrowArray; use vortex::validity::ArrayValidity; use vortex::{ArrayDType, Canonical, IntoCanonical}; @@ -12,17 +10,10 @@ use crate::FSSTArray; impl IntoCanonical for FSSTArray { fn into_canonical(self) -> VortexResult { self.with_decompressor(|decompressor| { - // Note: the maximum amount of decompressed space for an FSST array is 8 * n_elements, - // as each code can expand into a symbol of 1-8 bytes. - let max_items = self.len(); - let max_bytes = self.codes().nbytes() * size_of::(); - - // Create the target Arrow binary array - // TODO(aduffy): switch to BinaryView when PR https://github.com/spiraldb/vortex/pull/476 merges - let mut builder = GenericByteBuilder::::with_capacity(max_items, max_bytes); + let mut builder = BinaryViewBuilder::with_capacity(self.len()); // TODO(aduffy): add decompression functions that support writing directly into and output buffer. - let codes_array = self.codes().into_canonical()?.into_varbin()?; + let codes_array = self.codes().into_canonical()?.into_varbinview()?; // TODO(aduffy): make this loop faster. for idx in 0..self.len() { @@ -37,20 +28,20 @@ impl IntoCanonical for FSSTArray { let arrow_array = builder.finish(); - // Force the DTYpe - let canonical_varbin = VarBinArray::try_from(&vortex::Array::from_arrow( + // Force the DType + let canonical_varbin = VarBinViewArray::try_from(&vortex::Array::from_arrow( &arrow_array, self.dtype().is_nullable(), ))?; - let forced_dtype = VarBinArray::try_new( - canonical_varbin.offsets(), - canonical_varbin.bytes(), + let forced_dtype = VarBinViewArray::try_new( + canonical_varbin.views(), + canonical_varbin.buffers().collect(), self.dtype().clone(), canonical_varbin.validity(), )?; - Ok(Canonical::VarBin(forced_dtype)) + Ok(Canonical::VarBinView(forced_dtype)) }) } } diff --git a/pyvortex/pyproject.toml b/pyvortex/pyproject.toml index f472a8dbb7e..baa123a10f6 100644 --- a/pyvortex/pyproject.toml +++ b/pyvortex/pyproject.toml @@ -16,7 +16,7 @@ build-backend = "maturin" [tool.rye] managed = true dev-dependencies = [ - "pyarrow>=15.0.0", + "pyarrow>=17.0.0", "pip" ] diff --git a/pyvortex/test/test_array.py b/pyvortex/test/test_array.py index 8acf7ab2fd1..a9c7d9c21a1 100644 --- a/pyvortex/test/test_array.py +++ b/pyvortex/test/test_array.py @@ -9,22 +9,22 @@ def test_primitive_array_round_trip(): def test_array_with_nulls(): - a = pa.array([b"123", None]) + a = pa.array([b"123", None], type=pa.string_view()) arr = vortex.encode(a) assert arr.to_arrow().combine_chunks() == a def test_varbin_array_round_trip(): - a = pa.array(["a", "b", "c"]) + a = pa.array(["a", "b", "c"], type=pa.string_view()) arr = vortex.encode(a) assert arr.to_arrow().combine_chunks() == a -def test_varbin_array_take(): +def test_varbinview_array_take(): a = vortex.encode(pa.array(["a", "b", "c", "d"])) assert a.take(vortex.encode(pa.array([0, 2]))).to_arrow().combine_chunks() == pa.array( ["a", "c"], - type=pa.utf8(), + type=pa.string_view(), ) diff --git a/pyvortex/test/test_compress.py b/pyvortex/test/test_compress.py index 368e7c488bb..afb5a8381af 100644 --- a/pyvortex/test/test_compress.py +++ b/pyvortex/test/test_compress.py @@ -66,12 +66,12 @@ def test_table_encode(): table = pa.table( { "number": pa.chunked_array([pa.array([0, 1, 2]), pa.array([3, 4, 5])]), - "string": pa.chunked_array([pa.array(["a", "b", "c"]), pa.array(["d", "e", "f"])]), + "string": pa.chunked_array([pa.array(["a", "b", "c"], type=pa.string_view()), pa.array(["d", "e", "f"], type=pa.string_view())]), } ) encoded = vortex.encode(table) assert encoded.to_arrow().combine_chunks() == pa.StructArray.from_arrays( - [pa.array([0, 1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e", "f"])], names=["number", "string"] + [pa.array([0, 1, 2, 3, 4, 5]), pa.array(["a", "b", "c", "d", "e", "f"], type=pa.string_view())], names=["number", "string"] ) diff --git a/requirements-dev.lock b/requirements-dev.lock index ece71843200..4265a9f1256 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -72,7 +72,7 @@ pluggy==1.4.0 # via pytest py-cpuinfo==9.0.0 # via pytest-benchmark -pyarrow==15.0.2 +pyarrow==17.0.0 pygments==2.17.2 # via mkdocs-material pymdown-extensions==10.7.1 diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 037b975524a..f191150177c 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -43,6 +43,7 @@ paste = { workspace = true } pin-project = { workspace = true } rand = { workspace = true } serde = { workspace = true, features = ["derive"] } +static_assertions = { workspace = true } vortex-buffer = { workspace = true } vortex-datetime-dtype = { workspace = true } vortex-dtype = { workspace = true } diff --git a/vortex-array/src/array/chunked/canonical.rs b/vortex-array/src/array/chunked/canonical.rs index fff2df73d26..7ed48c6e729 100644 --- a/vortex-array/src/array/chunked/canonical.rs +++ b/vortex-array/src/array/chunked/canonical.rs @@ -1,6 +1,7 @@ -use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer}; +use arrow_array::UInt8Array; +use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer, ScalarBuffer}; use itertools::Itertools; -use vortex_dtype::{DType, Nullability, PType, StructDType}; +use vortex_dtype::{DType, PType, StructDType}; use vortex_error::{vortex_bail, vortex_err, ErrString, VortexResult}; use crate::array::chunked::ChunkedArray; @@ -8,10 +9,8 @@ use crate::array::extension::ExtensionArray; use crate::array::null::NullArray; use crate::array::primitive::PrimitiveArray; use crate::array::struct_::StructArray; -use crate::array::varbin::VarBinArray; -use crate::array::BoolArray; -use crate::compute::slice; -use crate::compute::unary::{scalar_at_unchecked, try_cast}; +use crate::array::{BinaryView, BoolArray, VarBinViewArray}; +use crate::arrow::FromArrowArray; use crate::validity::Validity; use crate::variants::StructArrayTrait; use crate::{ @@ -115,12 +114,12 @@ pub(crate) fn try_canonicalize_chunks( Ok(Canonical::Primitive(prim_array)) } DType::Utf8(_) => { - let varbin_array = pack_varbin(chunks.as_slice(), validity, dtype)?; - Ok(Canonical::VarBin(varbin_array)) + let varbin_array = pack_views(chunks.as_slice(), dtype, validity)?; + Ok(Canonical::VarBinView(varbin_array)) } DType::Binary(_) => { - let varbin_array = pack_varbin(chunks.as_slice(), validity, dtype)?; - Ok(Canonical::VarBin(varbin_array)) + let varbin_array = pack_views(chunks.as_slice(), dtype, validity)?; + Ok(Canonical::VarBinView(varbin_array)) } DType::Null => { let len = chunks.iter().map(|chunk| chunk.len()).sum(); @@ -201,48 +200,58 @@ fn pack_primitives( )) } -/// Builds a new [VarBinArray] by repacking the values from the chunks into a single +/// Builds a new [VarBinViewArray] by repacking the values from the chunks into a single /// contiguous array. /// /// It is expected this function is only called from [try_canonicalize_chunks], and thus all chunks have /// been checked to have the same DType already. -fn pack_varbin(chunks: &[Array], validity: Validity, dtype: &DType) -> VortexResult { - let len: usize = chunks.iter().map(|c| c.len()).sum(); - let mut offsets = Vec::with_capacity(len + 1); - offsets.push(0); - let mut data_bytes = Vec::new(); - +fn pack_views( + chunks: &[Array], + dtype: &DType, + validity: Validity, +) -> VortexResult { + let mut views = Vec::new(); + let mut buffers = Vec::new(); for chunk in chunks { - let chunk = chunk.clone().into_varbin()?; - let offsets_arr = try_cast( - chunk.offsets().into_primitive()?.array(), - &DType::Primitive(PType::I32, Nullability::NonNullable), - )? - .into_primitive()?; + // Each chunk's views have buffer IDs that are zero-referenced. + // As part of the packing operation, we need to rewrite them to be referenced to the global + // merged buffers list. + let buffers_offset = buffers.len(); + let canonical_chunk = chunk.clone().into_varbinview()?; - let first_offset_value: usize = - usize::try_from(&scalar_at_unchecked(offsets_arr.array(), 0))?; - let last_offset_value: usize = usize::try_from(&scalar_at_unchecked( - offsets_arr.array(), - offsets_arr.len() - 1, - ))?; - let primitive_bytes = - slice(&chunk.bytes(), first_offset_value, last_offset_value)?.into_primitive()?; - data_bytes.extend_from_slice(primitive_bytes.buffer()); + for buffer in canonical_chunk.buffers() { + let canonical_buffer = buffer.into_canonical()?.into_primitive()?.into_array(); + buffers.push(canonical_buffer); + } - let adjustment_from_previous = *offsets.last().expect("offsets has at least one element"); - offsets.extend( - offsets_arr - .maybe_null_slice::() - .iter() - .skip(1) - .map(|off| off + adjustment_from_previous - first_offset_value as i32), - ); + for view in canonical_chunk.view_slice() { + if view.is_inlined() { + // Inlined views can be copied directly into the output + views.push(*view); + } else { + // Referencing views must have their buffer_index adjusted with new offsets + let view_ref = view.as_view(); + views.push(BinaryView::new_view( + view.len(), + *view_ref.prefix(), + (buffers_offset as u32) + view_ref.buffer_index(), + view_ref.offset(), + )); + } + } } - VarBinArray::try_new( - PrimitiveArray::from(offsets).into_array(), - PrimitiveArray::from(data_bytes).into_array(), + // Reinterpret views from Vec to Vec. + // BinaryView is 16 bytes, so we need to be careful to set the length + // and capacity of the new Vec accordingly. + let (ptr, length, capacity) = views.into_raw_parts(); + let views_u8: Vec = unsafe { Vec::from_raw_parts(ptr.cast(), 16 * length, 16 * capacity) }; + + let arrow_views_array = UInt8Array::new(ScalarBuffer::from(views_u8), None); + + VarBinViewArray::try_new( + Array::from_arrow(&arrow_views_array, false), + buffers, dtype.clone(), validity, ) @@ -250,32 +259,35 @@ fn pack_varbin(chunks: &[Array], validity: Validity, dtype: &DType) -> VortexRes #[cfg(test)] mod tests { + use arrow_array::builder::StringViewBuilder; use vortex_dtype::{DType, Nullability}; use crate::accessor::ArrayAccessor; - use crate::array::builder::VarBinBuilder; - use crate::array::chunked::canonical::pack_varbin; - use crate::array::VarBinArray; + use crate::array::chunked::canonical::pack_views; + use crate::arrow::FromArrowArray; use crate::compute::slice; use crate::validity::Validity; + use crate::Array; - fn varbin_array() -> VarBinArray { - let mut builder = VarBinBuilder::::with_capacity(4); - builder.push_value("foo"); - builder.push_value("bar"); - builder.push_value("baz"); - builder.push_value("quak"); - builder.finish(DType::Utf8(Nullability::NonNullable)) + fn varbin_array() -> Array { + let mut builder = StringViewBuilder::new(); + builder.append_value("foo"); + builder.append_value("bar"); + builder.append_value("baz"); + builder.append_value("quak"); + let arrow_view_array = builder.finish(); + + Array::from_arrow(&arrow_view_array, false) } #[test] pub fn pack_sliced_varbin() { - let array1 = slice(varbin_array().array(), 1, 3).unwrap(); - let array2 = slice(varbin_array().array(), 2, 4).unwrap(); - let packed = pack_varbin( + let array1 = slice(&varbin_array(), 1, 3).unwrap(); + let array2 = slice(&varbin_array(), 2, 4).unwrap(); + let packed = pack_views( &[array1, array2], - Validity::NonNullable, &DType::Utf8(Nullability::NonNullable), + Validity::NonNullable, ) .unwrap(); assert_eq!(packed.len(), 4); diff --git a/vortex-array/src/array/constant/canonical.rs b/vortex-array/src/array/constant/canonical.rs index 3ed24df1bc9..70cf4288401 100644 --- a/vortex-array/src/array/constant/canonical.rs +++ b/vortex-array/src/array/constant/canonical.rs @@ -1,13 +1,12 @@ use std::iter; -use vortex_dtype::{match_each_native_ptype, DType, Nullability, PType}; +use vortex_dtype::{match_each_native_ptype, Nullability, PType}; use vortex_error::{vortex_bail, vortex_err, VortexResult}; -use vortex_scalar::{BoolScalar, Utf8Scalar}; +use vortex_scalar::{BinaryScalar, BoolScalar, Utf8Scalar}; use crate::array::constant::ConstantArray; use crate::array::primitive::PrimitiveArray; -use crate::array::varbin::VarBinArray; -use crate::array::BoolArray; +use crate::array::{BoolArray, VarBinViewArray}; use crate::validity::Validity; use crate::{ArrayDType, Canonical, IntoCanonical}; @@ -32,11 +31,21 @@ impl IntoCanonical for ConstantArray { let const_value = s .value() .ok_or_else(|| vortex_err!("Constant UTF-8 array has null value"))?; - let bytes = const_value.as_bytes(); + let string = const_value.as_str(); - return Ok(Canonical::VarBin(VarBinArray::from_iter( + return Ok(Canonical::VarBinView(VarBinViewArray::from_iter( + iter::repeat(Some(string)).take(self.len()), + ))); + } + + if let Ok(b) = BinaryScalar::try_from(self.scalar()) { + let const_value = b + .value() + .ok_or_else(|| vortex_err!("Constant UTF-8 array has null value"))?; + let bytes = const_value.as_slice(); + + return Ok(Canonical::VarBinView(VarBinViewArray::from_iter( iter::repeat(Some(bytes)).take(self.len()), - DType::Utf8(validity.nullability()), ))); } diff --git a/vortex-array/src/array/varbin/flatten.rs b/vortex-array/src/array/varbin/flatten.rs index d00d0329064..cd0118decf2 100644 --- a/vortex-array/src/array/varbin/flatten.rs +++ b/vortex-array/src/array/varbin/flatten.rs @@ -1,10 +1,105 @@ -use vortex_error::VortexResult; +use std::sync::Arc; -use crate::array::varbin::VarBinArray; -use crate::{Canonical, IntoCanonical}; +use arrow_array::builder::GenericByteViewBuilder; +use arrow_array::types::{BinaryViewType, ByteViewType, StringViewType}; +use arrow_array::ArrayRef; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexResult}; + +use crate::array::varbinview::BinaryView; +use crate::array::{VarBinArray, VarBinViewArray}; +use crate::arrow::FromArrowArray; +use crate::validity::ArrayValidity; +use crate::{Array, ArrayDType, Canonical, IntoCanonical}; impl IntoCanonical for VarBinArray { fn into_canonical(self) -> VortexResult { - Ok(Canonical::VarBin(self)) + fn into_byteview(array: &VarBinArray, from_bytes_fn: F) -> ArrayRef + where T: ByteViewType, + F: Fn(&[u8]) -> &T::Native + { + let mut builder = GenericByteViewBuilder::::with_capacity(array.len()); + builder.append_block( + array + .bytes() + .into_buffer() + .expect("VarBinArray::bytes array must have buffer") + .into_arrow(), + ); + + for idx in 0..array.len() { + if !array.is_valid(idx) { + builder.append_null(); + continue; + } + let start = u32::try_from(array.offset_at(idx)).unwrap(); + let end = u32::try_from(array.offset_at(idx + 1)).unwrap(); + let len = end - start; + if (len as usize) <= BinaryView::MAX_INLINED_SIZE { + // Get access to the value using the internal T type here. + let bytes = array.bytes_at(idx).unwrap(); + let value = from_bytes_fn(bytes.as_slice()); + builder.append_value(value); + } else { + unsafe { builder.append_view_unchecked(0, start, end - start) }; + } + } + + Arc::new(builder.finish()) + } + + let arrow_array = match self.dtype() { + DType::Utf8(_) => into_byteview::(&self, |b| unsafe { + // SAFETY: VarBinViewArray values are checked at construction. If DType is Utf8, + // then all values must be valid UTF-8 bytes. + std::str::from_utf8_unchecked(b) + }), + DType::Binary(_) => into_byteview::(&self, |b| b), + _ => vortex_bail!("invalid DType for VarBinViewArray") + }; + let array = Array::from_arrow(arrow_array.clone(), arrow_array.is_nullable()); + let varbinview = VarBinViewArray::try_from(array)?; + + Ok(Canonical::VarBinView(varbinview)) + } +} + +#[cfg(test)] +mod test { + use vortex_dtype::{DType, Nullability}; + + use crate::array::varbin::builder::VarBinBuilder; + use crate::validity::ArrayValidity; + use crate::IntoCanonical; + + #[test] + fn test_canonical_varbin() { + let mut varbin = VarBinBuilder::::with_capacity(10); + varbin.push_null(); + varbin.push_null(); + // inlined value + varbin.push_value("123456789012".as_bytes()); + // non-inlinable value + varbin.push_value("1234567890123".as_bytes()); + let varbin = varbin.finish(DType::Utf8(Nullability::Nullable)); + + let canonical = varbin.into_canonical().unwrap().into_varbinview().unwrap(); + + assert!(!canonical.is_valid(0)); + assert!(!canonical.is_valid(1)); + + // First value is inlined (12 bytes) + assert!(canonical.view_at(2).is_inlined()); + assert_eq!( + canonical.bytes_at(2).unwrap().as_slice(), + "123456789012".as_bytes() + ); + + // Second value is not inlined (13 bytes) + assert!(!canonical.view_at(3).is_inlined()); + assert_eq!( + canonical.bytes_at(3).unwrap().as_slice(), + "1234567890123".as_bytes() + ); } } diff --git a/vortex-array/src/array/varbinview/accessor.rs b/vortex-array/src/array/varbinview/accessor.rs index a7e07928826..00b8c863aa6 100644 --- a/vortex-array/src/array/varbinview/accessor.rs +++ b/vortex-array/src/array/varbinview/accessor.rs @@ -1,10 +1,11 @@ +use itertools::Itertools; use vortex_error::VortexResult; use crate::accessor::ArrayAccessor; use crate::array::primitive::PrimitiveArray; use crate::array::varbinview::VarBinViewArray; use crate::validity::ArrayValidity; -use crate::IntoArrayVariant; +use crate::IntoCanonical; impl ArrayAccessor<[u8]> for VarBinViewArray { fn with_iterator FnOnce(&mut dyn Iterator>) -> R, R>( @@ -12,22 +13,22 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { f: F, ) -> VortexResult { let views = self.view_slice(); - let bytes: Vec = (0..self.metadata().data_lens.len()) - .map(|i| self.bytes(i).into_primitive()) - .collect::>>()?; + let bytes: Vec = (0..self.metadata().buffer_lens.len()) + .map(|i| self.buffer(i).into_canonical()?.into_primitive()) + .try_collect()?; let validity = self.logical_validity().to_null_buffer()?; match validity { None => { let mut iter = views.iter().map(|view| { if view.is_inlined() { - Some(unsafe { &view.inlined.data[..view.size()] }) + Some(unsafe { &view.inlined.data[..view.len() as usize] }) } else { let offset = unsafe { view._ref.offset as usize }; let buffer_idx = unsafe { view._ref.buffer_index as usize }; Some( &bytes[buffer_idx].maybe_null_slice::() - [offset..offset + view.size()], + [offset..offset + view.len() as usize], ) } }); @@ -37,13 +38,13 @@ impl ArrayAccessor<[u8]> for VarBinViewArray { let mut iter = views.iter().zip(validity.iter()).map(|(view, valid)| { if valid { if view.is_inlined() { - Some(unsafe { &view.inlined.data[..view.size()] }) + Some(unsafe { &view.inlined.data[..view.len() as usize] }) } else { let offset = unsafe { view._ref.offset as usize }; let buffer_idx = unsafe { view._ref.buffer_index as usize }; Some( &bytes[buffer_idx].maybe_null_slice::() - [offset..offset + view.size()], + [offset..offset + view.len() as usize], ) } } else { diff --git a/vortex-array/src/array/varbinview/compute.rs b/vortex-array/src/array/varbinview/compute.rs index 2fe3b413257..cbbe93a78d2 100644 --- a/vortex-array/src/array/varbinview/compute.rs +++ b/vortex-array/src/array/varbinview/compute.rs @@ -3,10 +3,11 @@ use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::varbin::varbin_scalar; -use crate::array::varbinview::{VarBinViewArray, VIEW_SIZE}; +use crate::array::varbinview::{VarBinViewArray, VIEW_SIZE_BYTES}; +use crate::arrow::FromArrowArray; use crate::compute::unary::ScalarAtFn; -use crate::compute::{slice, ArrayCompute, SliceFn}; -use crate::{Array, ArrayDType, IntoArray}; +use crate::compute::{slice, ArrayCompute, SliceFn, TakeFn}; +use crate::{Array, ArrayDType, IntoArray, IntoCanonical}; impl ArrayCompute for VarBinViewArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { @@ -16,6 +17,10 @@ impl ArrayCompute for VarBinViewArray { fn slice(&self) -> Option<&dyn SliceFn> { Some(self) } + + fn take(&self) -> Option<&dyn TakeFn> { + Some(self) + } } impl ScalarAtFn for VarBinViewArray { @@ -32,9 +37,13 @@ impl ScalarAtFn for VarBinViewArray { impl SliceFn for VarBinViewArray { fn slice(&self, start: usize, stop: usize) -> VortexResult { Ok(Self::try_new( - slice(&self.views(), start * VIEW_SIZE, stop * VIEW_SIZE)?, - (0..self.metadata().data_lens.len()) - .map(|i| self.bytes(i)) + slice( + &self.views(), + start * VIEW_SIZE_BYTES, + stop * VIEW_SIZE_BYTES, + )?, + (0..self.metadata().buffer_lens.len()) + .map(|i| self.buffer(i)) .collect::>(), self.dtype().clone(), self.validity().slice(start, stop)?, @@ -42,3 +51,15 @@ impl SliceFn for VarBinViewArray { .into_array()) } } + +/// Take involves creating a new array that references the old array, just with the given set of views. +impl TakeFn for VarBinViewArray { + fn take(&self, indices: &Array) -> VortexResult { + let array_arrow = self.clone().into_array().into_canonical()?.into_arrow(); + let indices_arrow = indices.clone().into_canonical()?.into_arrow(); + + let take_arrow = arrow_select::take::take(&array_arrow, &indices_arrow, None)?; + let nullable = take_arrow.is_nullable(); + Ok(Array::from_arrow(take_arrow, nullable)) + } +} diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 48ed0a5e5e0..dc81c33b782 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -1,5 +1,4 @@ use std::fmt::{Debug, Formatter}; -use std::ops::Deref; use std::sync::Arc; use std::{mem, slice}; @@ -7,13 +6,12 @@ use ::serde::{Deserialize, Serialize}; use arrow_array::builder::{BinaryViewBuilder, StringViewBuilder}; use arrow_array::{ArrayRef, BinaryViewArray, StringViewArray}; use arrow_buffer::ScalarBuffer; -use arrow_schema::DataType; use itertools::Itertools; +use static_assertions::{assert_eq_align, assert_eq_size}; use vortex_dtype::{DType, PType}; -use vortex_error::{vortex_bail, VortexError, VortexResult}; +use vortex_error::{vortex_bail, VortexResult}; -use crate::array::primitive::PrimitiveArray; -use crate::array::varbin::VarBinArray; +use super::PrimitiveArray; use crate::arrow::FromArrowArray; use crate::compute::slice; use crate::stats::StatsSet; @@ -29,6 +27,8 @@ mod compute; mod stats; mod variants; +impl_encoding!("vortex.varbinview", 5u16, VarBinView); + #[derive(Clone, Copy, Debug)] #[repr(C, align(8))] pub struct Inlined { @@ -50,6 +50,11 @@ impl Inlined { inlined.data[..value.len()].copy_from_slice(value); inlined } + + #[inline] + pub fn value(&self) -> &[u8] { + &self.data[0..(self.size as usize)] + } } #[derive(Clone, Copy, Debug)] @@ -70,6 +75,21 @@ impl Ref { offset, } } + + #[inline] + pub fn buffer_index(&self) -> u32 { + self.buffer_index + } + + #[inline] + pub fn offset(&self) -> u32 { + self.offset + } + + #[inline] + pub fn prefix(&self) -> &[u8; 4] { + &self.prefix + } } #[derive(Clone, Copy)] @@ -79,16 +99,58 @@ pub union BinaryView { _ref: Ref, } +assert_eq_size!(BinaryView, [u8; 16]); +assert_eq_size!(Inlined, [u8; 16]); +assert_eq_size!(Ref, [u8; 16]); +assert_eq_align!(BinaryView, u64); + impl BinaryView { pub const MAX_INLINED_SIZE: usize = 12; + pub fn new_inlined(value: &[u8]) -> Self { + assert!( + value.len() <= Self::MAX_INLINED_SIZE, + "expected inlined value to be <= 12 bytes, was {}", + value.len() + ); + + Self { + inlined: Inlined::new(value), + } + } + + /// Create a new view over bytes stored in a block. + pub fn new_view(len: u32, prefix: [u8; 4], block: u32, offset: u32) -> Self { + Self { + _ref: Ref::new(len, prefix, block, offset), + } + } + #[inline] - pub fn size(&self) -> usize { - unsafe { self.inlined.size as usize } + pub fn len(&self) -> u32 { + unsafe { self.inlined.size } } + #[inline] + pub fn is_empty(&self) -> bool { + self.len() > 0 + } + + #[inline] pub fn is_inlined(&self) -> bool { - unsafe { self.inlined.size <= Self::MAX_INLINED_SIZE as u32 } + self.len() <= (Self::MAX_INLINED_SIZE as u32) + } + + pub fn as_inlined(&self) -> &Inlined { + unsafe { &self.inlined } + } + + pub fn as_view(&self) -> &Ref { + unsafe { &self._ref } + } + + pub fn as_u128(&self) -> u128 { + unsafe { mem::transmute::(*self) } } } @@ -96,29 +158,51 @@ impl Debug for BinaryView { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut s = f.debug_struct("BinaryView"); if self.is_inlined() { - s.field("inline", unsafe { &self.inlined }); + s.field("inline", &"i".to_string()); } else { - s.field("ref", unsafe { &self._ref }); + s.field("ref", &"r".to_string()); } s.finish() } } // reminder: views are 16 bytes with 8-byte alignment -pub(crate) const VIEW_SIZE: usize = mem::size_of::(); - -impl_encoding!("vortex.varbinview", 5u16, VarBinView); +pub(crate) const VIEW_SIZE_BYTES: usize = size_of::(); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct VarBinViewMetadata { + // Validity metadata validity: ValidityMetadata, - data_lens: Vec, + + // Length of each buffer. The buffers are primitive byte arrays containing the raw string/binary + // data referenced by views. + buffer_lens: Vec, +} + +pub struct Buffers<'a> { + index: u32, + n_buffers: u32, + array: &'a VarBinViewArray, +} + +impl<'a> Iterator for Buffers<'a> { + type Item = Array; + + fn next(&mut self) -> Option { + if self.index >= self.n_buffers { + return None; + } + + let bytes = self.array.buffer(self.index as usize); + self.index += 1; + Some(bytes) + } } impl VarBinViewArray { pub fn try_new( views: Array, - data: Vec, + buffers: Vec, dtype: DType, validity: Validity, ) -> VortexResult { @@ -126,7 +210,7 @@ impl VarBinViewArray { vortex_bail!(MismatchedTypes: "u8", views.dtype()); } - for d in data.iter() { + for d in buffers.iter() { if !matches!(d.dtype(), &DType::BYTES) { vortex_bail!(MismatchedTypes: "u8", d.dtype()); } @@ -140,15 +224,15 @@ impl VarBinViewArray { vortex_bail!("incorrect validity {:?}", validity); } - let num_views = views.len() / VIEW_SIZE; + let num_views = views.len() / VIEW_SIZE_BYTES; let metadata = VarBinViewMetadata { validity: validity.to_metadata(num_views)?, - data_lens: data.iter().map(|a| a.len()).collect_vec(), + buffer_lens: buffers.iter().map(|a| a.len()).collect_vec(), }; - let mut children = Vec::with_capacity(data.len() + 2); + let mut children = Vec::with_capacity(buffers.len() + 2); children.push(views); - children.extend(data); + children.extend(buffers); if let Some(a) = validity.into_array() { children.push(a) } @@ -156,39 +240,79 @@ impl VarBinViewArray { Self::try_from_parts(dtype, num_views, metadata, children.into(), StatsSet::new()) } - fn view_slice(&self) -> &[BinaryView] { + /// Number of raw string data buffers held by this array. + pub fn buffer_count(&self) -> usize { + self.metadata().buffer_lens.len() + } + + /// Access to the underlying `views` child array as a slice of [BinaryView] structures. + /// + /// This is useful for iteration over the values, as well as for applying filters that may + /// only require hitting the prefixes or inline strings. + pub fn view_slice(&self) -> &[BinaryView] { unsafe { slice::from_raw_parts( PrimitiveArray::try_from(self.views()) .expect("Views must be a primitive array") .maybe_null_slice::() .as_ptr() as _, - self.views().len() / VIEW_SIZE, + self.views().len() / VIEW_SIZE_BYTES, ) } } - fn view_at(&self, index: usize) -> BinaryView { + pub fn view_at(&self, index: usize) -> BinaryView { self.view_slice()[index] } + /// Access to the primitive views array. + /// + /// Variable-sized binary view arrays contain a "view" child array, with 16-byte entries that + /// contain either a pointer into one of the array's owned `buffer`s OR an inlined copy of + /// the string (if the string has 12 bytes or fewer). #[inline] pub fn views(&self) -> Array { self.array() - .child(0, &DType::BYTES, self.len() * VIEW_SIZE) - .expect("missing views") + .child(0, &DType::BYTES, self.len() * VIEW_SIZE_BYTES) + .unwrap() } + /// Access one of the backing data buffers. + /// + /// # Panics + /// + /// This method panics if the provided index is out of bounds for the set of buffers provided + /// at construction time. #[inline] - pub fn bytes(&self, idx: usize) -> Array { + pub fn buffer(&self, idx: usize) -> Array { self.array() - .child(idx + 1, &DType::BYTES, self.metadata().data_lens[idx]) + .child(idx + 1, &DType::BYTES, self.metadata().buffer_lens[idx]) .expect("Missing data buffer") } + /// Retrieve an iterator over the raw data buffers. + /// These are the BYTE buffers that make up the array's contents. + /// + /// Example + /// + /// ``` + /// use vortex::array::VarBinViewArray; + /// let array = VarBinViewArray::from_iter_str(["a", "b", "c"]); + /// array.buffers().for_each(|block| { + /// // Do something with the `block` + /// }); + /// ``` + pub fn buffers(&self) -> Buffers { + Buffers { + index: 0, + n_buffers: self.buffer_count().try_into().unwrap(), + array: self, + } + } + pub fn validity(&self) -> Validity { self.metadata().validity.to_validity(self.array().child( - self.metadata().data_lens.len() + 1, + self.metadata().buffer_lens.len() + 1, &Validity::DTYPE, self.len(), )) @@ -235,20 +359,21 @@ impl VarBinViewArray { VarBinViewArray::try_from(array).expect("should be var bin view array") } + // TODO(aduffy): do we really need to do this with copying? pub fn bytes_at(&self, index: usize) -> VortexResult> { let view = self.view_at(index); - unsafe { - if !view.is_inlined() { - let data_buf = slice( - &self.bytes(view._ref.buffer_index as usize), - view._ref.offset as usize, - (view._ref.size + view._ref.offset) as usize, - )? - .into_primitive()?; - Ok(data_buf.maybe_null_slice::().to_vec()) - } else { - Ok(view.inlined.data[..view.size()].to_vec()) - } + // Expect this to be the common case: strings > 12 bytes. + if !view.is_inlined() { + let view_ref = view.as_view(); + let data_buf = slice( + &self.buffer(view_ref.buffer_index() as usize), + view_ref.offset() as usize, + (view.len() + view_ref.offset()) as usize, + )? + .into_primitive()?; + Ok(data_buf.maybe_null_slice::().to_vec()) + } else { + Ok(view.as_inlined().value().to_vec()) } } } @@ -257,35 +382,31 @@ impl ArrayTrait for VarBinViewArray {} impl IntoCanonical for VarBinViewArray { fn into_canonical(self) -> VortexResult { - let arrow_dtype = if matches!(self.dtype(), &DType::Utf8(_)) { - &DataType::Utf8 - } else { - &DataType::Binary - }; let nullable = self.dtype().is_nullable(); - let arrow_self = as_arrow(self); - let arrow_varbin = - arrow_cast::cast(arrow_self.deref(), arrow_dtype).map_err(VortexError::ArrowError)?; - let vortex_array = Array::from_arrow(arrow_varbin, nullable); + let arrow_self = varbinview_as_arrow(self); + let vortex_array = Array::from_arrow(arrow_self, nullable); - Ok(Canonical::VarBin(VarBinArray::try_from(&vortex_array)?)) + Ok(Canonical::VarBinView(VarBinViewArray::try_from( + &vortex_array, + )?)) } } -fn as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { +pub(crate) fn varbinview_as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { // Views should be buffer of u8 let views = var_bin_view .views() .into_primitive() .expect("views must be primitive"); assert_eq!(views.ptype(), PType::U8); + let nulls = var_bin_view .logical_validity() .to_null_buffer() .expect("null buffer"); - let data = (0..var_bin_view.metadata().data_lens.len()) - .map(|i| var_bin_view.bytes(i).into_primitive()) + let data = (0..var_bin_view.buffer_count()) + .map(|i| var_bin_view.buffer(i).into_primitive()) .collect::>>() .expect("bytes arrays must be primitive"); if !data.is_empty() { @@ -300,16 +421,20 @@ fn as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { // Switch on Arrow DType. match var_bin_view.dtype() { - DType::Binary(_) => Arc::new(BinaryViewArray::new( - ScalarBuffer::::from(views.buffer().clone().into_arrow()), - data, - nulls, - )), - DType::Utf8(_) => Arc::new(StringViewArray::new( - ScalarBuffer::::from(views.buffer().clone().into_arrow()), - data, - nulls, - )), + DType::Binary(_) => Arc::new(unsafe { + BinaryViewArray::new_unchecked( + ScalarBuffer::::from(views.buffer().clone().into_arrow()), + data, + nulls, + ) + }), + DType::Utf8(_) => Arc::new(unsafe { + StringViewArray::new_unchecked( + ScalarBuffer::::from(views.buffer().clone().into_arrow()), + data, + nulls, + ) + }), _ => panic!("expected utf8 or binary, got {}", var_bin_view.dtype()), } } @@ -327,8 +452,8 @@ impl ArrayValidity for VarBinViewArray { impl AcceptArrayVisitor for VarBinViewArray { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_child("views", &self.views())?; - for i in 0..self.metadata().data_lens.len() { - visitor.visit_child(format!("bytes_{i}").as_str(), &self.bytes(i))?; + for i in 0..self.metadata().buffer_lens.len() { + visitor.visit_child(format!("bytes_{i}").as_str(), &self.buffer(i))?; } visitor.visit_validity(&self.validity()) } @@ -362,10 +487,10 @@ impl<'a> FromIterator> for VarBinViewArray { mod test { use vortex_scalar::Scalar; - use crate::array::varbinview::{BinaryView, Inlined, Ref, VarBinViewArray, VIEW_SIZE}; + use crate::array::varbinview::{BinaryView, VarBinViewArray, VIEW_SIZE_BYTES}; use crate::compute::slice; use crate::compute::unary::scalar_at; - use crate::{Canonical, IntoCanonical}; + use crate::{Canonical, IntoArray, IntoCanonical}; #[test] pub fn varbin_view() { @@ -386,7 +511,7 @@ mod test { pub fn slice_array() { let binary_arr = slice( &VarBinViewArray::from_iter_str(["hello world", "hello world this is a long string"]) - .into(), + .into_array(), 1, 2, ) @@ -402,19 +527,17 @@ mod test { let binary_arr = VarBinViewArray::from_iter_str(["string1", "string2"]); let flattened = binary_arr.into_canonical().unwrap(); - assert!(matches!(flattened, Canonical::VarBin(_))); + assert!(matches!(flattened, Canonical::VarBinView(_))); - let var_bin = flattened.into(); + let var_bin = flattened.into_varbinview().unwrap().into_array(); assert_eq!(scalar_at(&var_bin, 0).unwrap(), Scalar::from("string1")); assert_eq!(scalar_at(&var_bin, 1).unwrap(), Scalar::from("string2")); } #[test] pub fn binary_view_size_and_alignment() { - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::size_of::(), VIEW_SIZE); - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::align_of::(), 8); + assert_eq!(size_of::(), VIEW_SIZE_BYTES); + assert_eq!(size_of::(), 16); + assert_eq!(align_of::(), 8); } } diff --git a/vortex-array/src/arrow/dtype.rs b/vortex-array/src/arrow/dtype.rs index 6398e6e37ec..5718fd39bd2 100644 --- a/vortex-array/src/arrow/dtype.rs +++ b/vortex-array/src/arrow/dtype.rs @@ -64,8 +64,8 @@ impl FromArrowType<&Field> for DType { match field.data_type() { DataType::Null => Null, DataType::Boolean => Bool(nullability), - DataType::Utf8 | DataType::LargeUtf8 => Utf8(nullability), - DataType::Binary | DataType::LargeBinary => Binary(nullability), + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => Utf8(nullability), + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => Binary(nullability), DataType::Date32 | DataType::Date64 | DataType::Time32(_) diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 21ba7986552..ae8ff273a05 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -5,28 +5,27 @@ use arrow_array::types::{ UInt32Type, UInt64Type, UInt8Type, }; use arrow_array::{ - ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray as ArrowBoolArray, Date32Array, - Date64Array, LargeBinaryArray, LargeStringArray, NullArray as ArrowNullArray, - PrimitiveArray as ArrowPrimitiveArray, StringArray, StructArray as ArrowStructArray, - Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, + ArrayRef, ArrowPrimitiveType, BooleanArray as ArrowBoolArray, Date32Array, Date64Array, + NullArray as ArrowNullArray, PrimitiveArray as ArrowPrimitiveArray, + StructArray as ArrowStructArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; use arrow_buffer::ScalarBuffer; use arrow_schema::{Field, Fields}; use vortex_datetime_dtype::{is_temporal_ext_type, TemporalMetadata, TimeUnit}; -use vortex_dtype::{DType, NativePType, PType}; +use vortex_dtype::{NativePType, PType}; use vortex_error::{vortex_bail, VortexResult}; use crate::array::{ - BoolArray, ExtensionArray, NullArray, PrimitiveArray, StructArray, TemporalArray, VarBinArray, + varbinview_as_arrow, BoolArray, ExtensionArray, NullArray, PrimitiveArray, StructArray, + TemporalArray, VarBinViewArray, }; -use crate::arrow::wrappers::as_offset_buffer; use crate::compute::unary::try_cast; use crate::encoding::ArrayEncoding; use crate::validity::ArrayValidity; use crate::variants::StructArrayTrait; -use crate::{Array, ArrayDType, IntoArray, ToArray}; +use crate::{Array, IntoArray}; /// The set of canonical array encodings, also the set of encodings that can be transferred to /// Arrow with zero-copy. @@ -41,26 +40,15 @@ use crate::{Array, ArrayDType, IntoArray, ToArray}; /// decompress it later to pass to a compute kernel, there are multiple suitable Arrow array /// variants to hold the data. /// -/// To disambiguate, we choose a canonical physical encoding for every Vortex [`DType`], which +/// To disambiguate, we choose a canonical physical encoding for every Vortex `DType`, which /// will correspond to an arrow-rs [`arrow_schema::DataType`]. -/// -/// # Views support -/// -/// Binary and String views are a new, better encoding format for nearly all use-cases. For now, -/// because DataFusion does not include pervasive support for compute over StringView, we opt to use -/// the [`VarBinArray`] as the canonical encoding (which corresponds to the Arrow `BinaryViewArray`). -/// -/// We expect to change this soon once DataFusion is able to finish up some initial support, which -/// is tracked in . #[derive(Debug, Clone)] pub enum Canonical { Null(NullArray), Bool(BoolArray), Primitive(PrimitiveArray), Struct(StructArray), - VarBin(VarBinArray), - // TODO(aduffy): switch to useing VarBinView instead of VarBin - // VarBinView(VarBinViewArray), + VarBinView(VarBinViewArray), Extension(ExtensionArray), } @@ -76,7 +64,7 @@ impl Canonical { Canonical::Bool(a) => bool_to_arrow(a), Canonical::Primitive(a) => primitive_to_arrow(a), Canonical::Struct(a) => struct_to_arrow(a), - Canonical::VarBin(a) => varbin_to_arrow(a), + Canonical::VarBinView(a) => varbinview_as_arrow(a), Canonical::Extension(a) => { if !is_temporal_ext_type(a.id()) { panic!("unsupported extension dtype with ID {}", a.id().as_ref()) @@ -121,10 +109,10 @@ impl Canonical { } } - pub fn into_varbin(self) -> VortexResult { + pub fn into_varbinview(self) -> VortexResult { match self { - Canonical::VarBin(a) => Ok(a), - _ => vortex_bail!(InvalidArgument: "cannot unwrap VarBinArray from {:?}", &self), + Canonical::VarBinView(a) => Ok(a), + _ => vortex_bail!(InvalidArgument: "cannot unwrap VarBinViewArray from {:?}", &self), } } @@ -214,76 +202,6 @@ fn struct_to_arrow(struct_array: StructArray) -> ArrayRef { Arc::new(ArrowStructArray::new(arrow_fields, field_arrays, nulls)) } -fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef { - let offsets = varbin_array - .offsets() - .into_primitive() - .expect("flatten_primitive"); - let offsets = match offsets.ptype() { - PType::I32 | PType::I64 => offsets, - PType::U64 => offsets.reinterpret_cast(PType::I64), - PType::U32 => offsets.reinterpret_cast(PType::I32), - // Unless it's u64, everything else can be converted into an i32. - _ => try_cast(&offsets.to_array(), PType::I32.into()) - .expect("cast to i32") - .into_primitive() - .expect("flatten_primitive"), - }; - let nulls = varbin_array - .logical_validity() - .to_null_buffer() - .expect("null buffer"); - - let data = varbin_array - .bytes() - .into_primitive() - .expect("flatten_primitive"); - assert_eq!(data.ptype(), PType::U8); - let data = data.buffer(); - - // Switch on Arrow DType. - match varbin_array.dtype() { - DType::Binary(_) => match offsets.ptype() { - PType::I32 => Arc::new(unsafe { - BinaryArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - PType::I64 => Arc::new(unsafe { - LargeBinaryArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - _ => panic!("Invalid offsets type"), - }, - DType::Utf8(_) => match offsets.ptype() { - PType::I32 => Arc::new(unsafe { - StringArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - PType::I64 => Arc::new(unsafe { - LargeStringArray::new_unchecked( - as_offset_buffer::(offsets), - data.clone().into_arrow(), - nulls, - ) - }), - _ => panic!("Invalid offsets type"), - }, - _ => panic!( - "expected utf8 or binary instead of {}", - varbin_array.dtype() - ), - } -} - fn temporal_to_arrow(temporal_array: TemporalArray) -> ArrayRef { macro_rules! extract_temporal_values { ($values:expr, $prim:ty) => {{ @@ -388,7 +306,7 @@ pub trait IntoArrayVariant { fn into_struct(self) -> VortexResult; - fn into_varbin(self) -> VortexResult; + fn into_varbinview(self) -> VortexResult; fn into_extension(self) -> VortexResult; } @@ -413,8 +331,8 @@ where self.into_canonical()?.into_struct() } - fn into_varbin(self) -> VortexResult { - self.into_canonical()?.into_varbin() + fn into_varbinview(self) -> VortexResult { + self.into_canonical()?.into_varbinview() } fn into_extension(self) -> VortexResult { @@ -444,7 +362,7 @@ impl From for Array { Canonical::Bool(a) => a.into(), Canonical::Primitive(a) => a.into(), Canonical::Struct(a) => a.into(), - Canonical::VarBin(a) => a.into(), + Canonical::VarBinView(a) => a.into(), Canonical::Extension(a) => a.into(), } } @@ -457,7 +375,8 @@ mod test { use arrow_array::cast::AsArray; use arrow_array::types::{Int32Type, Int64Type, UInt64Type}; use arrow_array::{ - Array, PrimitiveArray as ArrowPrimitiveArray, StringArray, StructArray as ArrowStructArray, + Array, PrimitiveArray as ArrowPrimitiveArray, StringViewArray, + StructArray as ArrowStructArray, }; use arrow_buffer::NullBufferBuilder; use arrow_schema::{DataType, Field}; @@ -540,7 +459,7 @@ mod test { nulls.append_n_non_nulls(4); nulls.append_null(); nulls.append_non_null(); - let names = Arc::new(StringArray::from_iter(vec![ + let names = Arc::new(StringViewArray::from_iter(vec![ Some("Joseph"), None, Some("Angela"), @@ -559,7 +478,7 @@ mod test { let arrow_struct = ArrowStructArray::new( vec![ - Arc::new(Field::new("name", DataType::Utf8, true)), + Arc::new(Field::new("name", DataType::Utf8View, true)), Arc::new(Field::new("age", DataType::Int32, true)), ] .into(), diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 049747aba08..6a0860c9fde 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(vec_into_raw_parts)] + //! Vortex crate containing core logic for encoding and memory representation of [arrays](Array). //! //! At the heart of Vortex are [arrays](Array) and [encodings](crate::encoding::ArrayEncoding). diff --git a/vortex-datafusion/src/datatype.rs b/vortex-datafusion/src/datatype.rs index 3baf21062d8..5275d1132bc 100644 --- a/vortex-datafusion/src/datatype.rs +++ b/vortex-datafusion/src/datatype.rs @@ -63,8 +63,8 @@ pub(crate) fn infer_data_type(dtype: &DType) -> DataType { PType::F32 => DataType::Float32, PType::F64 => DataType::Float64, }, - DType::Utf8(_) => DataType::Utf8, - DType::Binary(_) => DataType::Binary, + DType::Utf8(_) => DataType::Utf8View, + DType::Binary(_) => DataType::BinaryView, DType::Struct(struct_dtype, _) => { let mut fields = Vec::with_capacity(struct_dtype.names().len()); for (field_name, field_dt) in struct_dtype @@ -127,12 +127,12 @@ mod test { assert_eq!( infer_data_type(&DType::Utf8(Nullability::NonNullable)), - DataType::Utf8 + DataType::Utf8View ); assert_eq!( infer_data_type(&DType::Binary(Nullability::NonNullable)), - DataType::Binary + DataType::BinaryView ); assert_eq!( @@ -157,7 +157,7 @@ mod test { )), DataType::Struct(Fields::from(vec![ FieldRef::from(Field::new("field_a", DataType::Boolean, false)), - FieldRef::from(Field::new("field_b", DataType::Utf8, true)), + FieldRef::from(Field::new("field_b", DataType::Utf8View, true)), ])) ); } @@ -180,7 +180,7 @@ mod test { infer_schema(&schema_nonnull), Schema::new(Fields::from(vec![ Field::new("field_a", DataType::Boolean, false), - Field::new("field_b", DataType::Utf8, false), + Field::new("field_b", DataType::Utf8View, false), Field::new("field_c", DataType::Int32, true), ])) ); diff --git a/vortex-datafusion/src/memory.rs b/vortex-datafusion/src/memory.rs index 7520568b423..ab6dec7511a 100644 --- a/vortex-datafusion/src/memory.rs +++ b/vortex-datafusion/src/memory.rs @@ -212,25 +212,24 @@ mod test { use datafusion::prelude::SessionContext; use datafusion_common::{Column, TableReference}; use datafusion_expr::{and, col, lit, BinaryExpr, Expr, Operator}; - use vortex::array::{PrimitiveArray, StructArray, VarBinArray}; + use vortex::array::{PrimitiveArray, StructArray, VarBinViewArray}; use vortex::validity::Validity; use vortex::{Array, IntoArray}; - use vortex_dtype::{DType, Nullability}; use crate::memory::VortexMemTableOptions; use crate::{can_be_pushed_down, SessionContextExt as _}; fn presidents_array() -> Array { - let names = VarBinArray::from_vec( - vec![ + let names = VarBinViewArray::from_iter_str( + [ "Washington", "Adams", "Jefferson", "Madison", "Monroe", "Adams", - ], - DType::Utf8(Nullability::NonNullable), + ] + .iter(), ); let term_start = PrimitiveArray::from_vec( vec![1789u16, 1797, 1801, 1809, 1817, 1825], diff --git a/vortex-sampling-compressor/src/compressors/dict.rs b/vortex-sampling-compressor/src/compressors/dict.rs index ec6d320e8ea..59db5804f58 100644 --- a/vortex-sampling-compressor/src/compressors/dict.rs +++ b/vortex-sampling-compressor/src/compressors/dict.rs @@ -1,10 +1,13 @@ use std::collections::HashSet; -use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray}; +use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray, VarBinView, VarBinViewArray}; use vortex::encoding::EncodingRef; use vortex::stats::ArrayStatistics; use vortex::{Array, ArrayDef, IntoArray}; -use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray, DictEncoding}; +use vortex_dict::{ + dict_encode_primitive, dict_encode_varbin, dict_encode_varbinview, Dict, DictArray, + DictEncoding, +}; use vortex_error::VortexResult; use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor}; @@ -54,6 +57,11 @@ impl EncodingCompressor for DictCompressor { let (codes, values) = dict_encode_varbin(&vb); (codes.into_array(), values.into_array()) } + VarBinView::ID => { + let vb = VarBinViewArray::try_from(array)?; + let (codes, values) = dict_encode_varbinview(&vb); + (codes.into_array(), values.into_array()) + } _ => unreachable!("This array kind should have been filtered out"), }; diff --git a/vortex-scalar/src/arrow.rs b/vortex-scalar/src/arrow.rs index e72dad7b80e..cf94a25562c 100644 --- a/vortex-scalar/src/arrow.rs +++ b/vortex-scalar/src/arrow.rs @@ -60,13 +60,13 @@ impl From<&Scalar> for Arc { .value .as_buffer_string() .expect("should be buffer string"), - StringArray + StringViewArray ) } DType::Binary(_) => { value_to_arrow_scalar!( value.value.as_buffer().expect("should be a buffer"), - BinaryArray + BinaryViewArray ) } DType::Struct(..) => {