diff --git a/Cargo.toml b/Cargo.toml index 2bcbe059ab25..042946aae661 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,24 +17,7 @@ [workspace] exclude = ["datafusion-cli"] -members = [ - "datafusion/common", - "datafusion/core", - "datafusion/expr", - "datafusion/execution", - "datafusion/optimizer", - "datafusion/physical-expr", - "datafusion/physical-plan", - "datafusion/proto", - "datafusion/proto/gen", - "datafusion/sql", - "datafusion/sqllogictest", - "datafusion/substrait", - "datafusion/wasmtest", - "datafusion-examples", - "docs", - "test-utils", - "benchmarks", +members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", "datafusion/wasmtest", "datafusion-examples", "docs", "test-utils", "benchmarks", ] resolver = "2" @@ -58,19 +41,20 @@ arrow-schema = { version = "49.0.0", default-features = false } async-trait = "0.1.73" bigdecimal = "0.4.1" bytes = "1.4" +chrono = { version = "0.4.31", default-features = false } ctor = "0.2.0" +dashmap = "5.4.0" datafusion = { path = "datafusion/core", version = "33.0.0" } datafusion-common = { path = "datafusion/common", version = "33.0.0" } +datafusion-execution = { path = "datafusion/execution", version = "33.0.0" } datafusion-expr = { path = "datafusion/expr", version = "33.0.0" } -datafusion-sql = { path = "datafusion/sql", version = "33.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "33.0.0" } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "33.0.0" } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "33.0.0" } -datafusion-execution = { path = "datafusion/execution", version = "33.0.0" } datafusion-proto = { path = "datafusion/proto", version = "33.0.0" } +datafusion-sql = { path = "datafusion/sql", version = "33.0.0" } datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "33.0.0" } datafusion-substrait = { path = "datafusion/substrait", version = "33.0.0" } -dashmap = "5.4.0" doc-comment = "0.3" env_logger = "0.10" futures = "0.3" @@ -83,12 +67,12 @@ object_store = { version = "0.8.0", default-features = false } parking_lot = "0.12" parquet = { version = "49.0.0", default-features = false, features = ["arrow", "async", "object_store"] } rand = "0.8" +roaring = { version = "0.10" } rstest = "0.18.0" serde_json = "1" sqlparser = { version = "0.40.0", features = ["visitor"] } tempfile = "3" thiserror = "1.0.44" -chrono = { version = "0.4.31", default-features = false } url = "2.2" [profile.release] diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index f88c907b052f..4579108e35e9 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -806,6 +806,12 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +[[package]] +name = "bytemuck" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" + [[package]] name = "byteorder" version = "1.5.0" @@ -1264,6 +1270,7 @@ dependencies = [ "petgraph", "rand", "regex", + "roaring", "sha2", "unicode-segmentation", "uuid", @@ -2321,9 +2328,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl-probe" @@ -2754,6 +2761,12 @@ dependencies = [ "winreg", ] +[[package]] +name = "retain_mut" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086" + [[package]] name = "ring" version = "0.16.20" @@ -2789,6 +2802,17 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" +[[package]] +name = "roaring" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873" +dependencies = [ + "bytemuck", + "byteorder", + "retain_mut", +] + [[package]] name = "rstest" version = "0.17.0" @@ -3501,9 +3525,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "twox-hash" @@ -3543,9 +3567,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unicode-bidi" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" [[package]] name = "unicode-ident" diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index d237c68657a1..70f027064cf1 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -34,12 +34,14 @@ path = "src/lib.rs" [features] crypto_expressions = ["md-5", "sha2", "blake2", "blake3"] -default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "encoding_expressions", +default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "encoding_expressions", "roaring_bitmap", ] encoding_expressions = ["base64", "hex"] regex_expressions = ["regex"] +roaring_bitmap = ["roaring"] unicode_expressions = ["unicode-segmentation"] + [dependencies] ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", @@ -66,6 +68,7 @@ paste = "^1.0" petgraph = "0.6.2" rand = { workspace = true } regex = { version = "1.8", optional = true } +roaring = { version = "0.10", optional = true } sha2 = { version = "^0.10.1", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } uuid = { version = "^1.2", features = ["v4"] } diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs b/datafusion/physical-expr/src/aggregate/approx_distinct.rs index b79a5611c334..71d775453796 100644 --- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs @@ -26,18 +26,20 @@ use arrow::array::{ PrimitiveArray, }; use arrow::datatypes::{ - ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type, - UInt16Type, UInt32Type, UInt64Type, UInt8Type, + ArrowPrimitiveType, DataType, Field, Int32Type, Int64Type, UInt32Type, UInt64Type, }; +use arrow_array::{Int16Array, Int8Array, UInt16Array, UInt8Array}; use datafusion_common::{ downcast_value, internal_err, not_impl_err, DataFusionError, Result, ScalarValue, }; use datafusion_expr::Accumulator; +use roaring::{self, RoaringBitmap}; use std::any::Any; use std::convert::TryFrom; use std::convert::TryInto; use std::hash::Hash; use std::marker::PhantomData; +use std::ops::BitOrAssign; use std::sync::Arc; /// APPROX_DISTINCT aggregate expression @@ -87,15 +89,13 @@ impl AggregateExpr for ApproxDistinct { fn create_accumulator(&self) -> Result> { let accumulator: Box = match &self.input_data_type { - // TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL // TODO support for boolean (trivial case) // https://github.com/apache/arrow-datafusion/issues/1109 - DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), - DataType::UInt16 => Box::new(NumericHLLAccumulator::::new()), + DataType::UInt8 | DataType::UInt16 | DataType::Int8 | DataType::Int16 => { + Box::new(BitmaptAccumulator::new()) + } DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int16 => Box::new(NumericHLLAccumulator::::new()), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), DataType::Utf8 => Box::new(StringHLLAccumulator::::new()), @@ -129,6 +129,19 @@ impl PartialEq for ApproxDistinct { } } +#[derive(Debug)] +struct BitmaptAccumulator { + bitmap: roaring::bitmap::RoaringBitmap, +} + +impl BitmaptAccumulator { + pub fn new() -> Self { + Self { + bitmap: roaring::bitmap::RoaringBitmap::new(), + } + } +} + #[derive(Debug)] struct BinaryHLLAccumulator where @@ -227,7 +240,7 @@ impl TryFrom<&ScalarValue> for HyperLogLog { } } -macro_rules! default_accumulator_impl { +macro_rules! default_hllaccumulator_impl { () => { fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { assert_eq!(1, states.len(), "expect only 1 element in the states"); @@ -273,7 +286,7 @@ where Ok(()) } - default_accumulator_impl!(); + default_hllaccumulator_impl!(); } impl Accumulator for StringHLLAccumulator @@ -289,7 +302,7 @@ where Ok(()) } - default_accumulator_impl!(); + default_hllaccumulator_impl!(); } impl Accumulator for NumericHLLAccumulator @@ -304,5 +317,89 @@ where Ok(()) } - default_accumulator_impl!(); + default_hllaccumulator_impl!(); +} + +impl Accumulator for BitmaptAccumulator { + //state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values. + fn state(&self) -> Result> { + let mut bytes = vec![]; + self.bitmap.serialize_into(&mut bytes).unwrap(); + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let value = &values[0]; + if value.is_empty() { + return Ok(()); + } + match value.data_type() { + DataType::Int8 => { + let array = value.as_any().downcast_ref::().unwrap(); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; + } + } + DataType::Int16 => { + let array = value.as_any().downcast_ref::().unwrap(); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; + } + } + + DataType::UInt8 => { + let array = value.as_any().downcast_ref::().unwrap(); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; + } + } + DataType::UInt16 => { + let array = value.as_any().downcast_ref::().unwrap(); + for value in array.iter() { + match value { + Some(v) => self.bitmap.insert(v as u32), + None => false, + }; + } + } + e => { + return Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for bitmap distinct count", + e + ))); + } + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let binary_array = states[0].as_any().downcast_ref::().unwrap(); + + for b in binary_array.iter() { + let v = b.ok_or_else(|| { + DataFusionError::Internal( + "Impossibly got empty binary array from states".into(), + ) + })?; + let bitmap = RoaringBitmap::deserialize_from(&v.to_vec()[..]).unwrap(); + self.bitmap.bitor_assign(bitmap); + } + Ok(()) + } + + fn evaluate(&self) -> Result { + Ok(ScalarValue::from(self.bitmap.len())) + } + + fn size(&self) -> usize { + self.bitmap.serialized_size() + } }