Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ tempfile = "3"
thiserror = "1.0.44"
chrono = { version = "0.4.31", default-features = false }
url = "2.2"
roaring = {version = "0.10"}

[profile.release]
codegen-units = 1
Expand Down
36 changes: 30 additions & 6 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,18 @@ path = "src/lib.rs"

[features]
crypto_expressions = ["md-5", "sha2", "blake2", "blake3"]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "encoding_expressions",
default = [
"crypto_expressions",
"regex_expressions",
"unicode_expressions",
"encoding_expressions",
"roaring_bitmap",
]
encoding_expressions = ["base64", "hex"]
regex_expressions = ["regex"]
unicode_expressions = ["unicode-segmentation"]
roaring_bitmap = ["roaring"]


[dependencies]
ahash = { version = "0.8", default-features = false, features = [
Expand Down Expand Up @@ -69,6 +76,7 @@ regex = { version = "1.8", optional = true }
sha2 = { version = "^0.10.1", optional = true }
unicode-segmentation = { version = "^1.7.1", optional = true }
uuid = { version = "^1.2", features = ["v4"] }
roaring = { version = "0.10", optional = true }

[dev-dependencies]
criterion = "0.5"
Expand Down
119 changes: 108 additions & 11 deletions datafusion/physical-expr/src/aggregate/approx_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ use arrow::array::{
PrimitiveArray,
};
use arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type,
UInt16Type, UInt32Type, UInt64Type, UInt8Type,
ArrowPrimitiveType, DataType, Field, Int32Type, Int64Type, UInt32Type, UInt64Type,
};
use arrow_array::{Int16Array, Int8Array, UInt16Array, UInt8Array};
use datafusion_common::{
downcast_value, internal_err, not_impl_err, DataFusionError, Result, ScalarValue,
};
use datafusion_expr::Accumulator;
use roaring::{self, RoaringBitmap};
use std::any::Any;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::hash::Hash;
use std::marker::PhantomData;
use std::ops::BitOrAssign;
use std::sync::Arc;

/// APPROX_DISTINCT aggregate expression
Expand Down Expand Up @@ -87,15 +89,13 @@ impl AggregateExpr for ApproxDistinct {

fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
let accumulator: Box<dyn Accumulator> = match &self.input_data_type {
// TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL
// TODO support for boolean (trivial case)
// https://github.com/apache/arrow-datafusion/issues/1109
DataType::UInt8 => Box::new(NumericHLLAccumulator::<UInt8Type>::new()),
DataType::UInt16 => Box::new(NumericHLLAccumulator::<UInt16Type>::new()),
DataType::UInt8 | DataType::UInt16 | DataType::Int8 | DataType::Int16 => {
Box::new(BitmaptAccumulator::new())
}
DataType::UInt32 => Box::new(NumericHLLAccumulator::<UInt32Type>::new()),
DataType::UInt64 => Box::new(NumericHLLAccumulator::<UInt64Type>::new()),
DataType::Int8 => Box::new(NumericHLLAccumulator::<Int8Type>::new()),
DataType::Int16 => Box::new(NumericHLLAccumulator::<Int16Type>::new()),
DataType::Int32 => Box::new(NumericHLLAccumulator::<Int32Type>::new()),
DataType::Int64 => Box::new(NumericHLLAccumulator::<Int64Type>::new()),
DataType::Utf8 => Box::new(StringHLLAccumulator::<i32>::new()),
Expand Down Expand Up @@ -129,6 +129,19 @@ impl PartialEq<dyn Any> for ApproxDistinct {
}
}

#[derive(Debug)]
struct BitmaptAccumulator {
bitmap: roaring::bitmap::RoaringBitmap,
}

impl BitmaptAccumulator {
pub fn new() -> Self {
Self {
bitmap: roaring::bitmap::RoaringBitmap::new(),
}
}
}

#[derive(Debug)]
struct BinaryHLLAccumulator<T>
where
Expand Down Expand Up @@ -227,7 +240,7 @@ impl<T: Hash> TryFrom<&ScalarValue> for HyperLogLog<T> {
}
}

macro_rules! default_accumulator_impl {
macro_rules! default_hllaccumulator_impl {
() => {
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
assert_eq!(1, states.len(), "expect only 1 element in the states");
Expand Down Expand Up @@ -273,7 +286,7 @@ where
Ok(())
}

default_accumulator_impl!();
default_hllaccumulator_impl!();
}

impl<T> Accumulator for StringHLLAccumulator<T>
Expand All @@ -289,7 +302,7 @@ where
Ok(())
}

default_accumulator_impl!();
default_hllaccumulator_impl!();
}

impl<T> Accumulator for NumericHLLAccumulator<T>
Expand All @@ -304,5 +317,89 @@ where
Ok(())
}

default_accumulator_impl!();
default_hllaccumulator_impl!();
}

impl Accumulator for BitmaptAccumulator {
//state() can be used by physical nodes to aggregate states together and send them over the network/threads, to combine values.
fn state(&self) -> Result<Vec<ScalarValue>> {
let mut bytes = vec![];
self.bitmap.serialize_into(&mut bytes).unwrap();
Ok(vec![ScalarValue::Binary(Some(bytes))])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let value = &values[0];
if value.is_empty() {
return Ok(());
}
match value.data_type() {
DataType::Int8 => {
let array = value.as_any().downcast_ref::<Int8Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}
DataType::Int16 => {
let array = value.as_any().downcast_ref::<Int16Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}

DataType::UInt8 => {
let array = value.as_any().downcast_ref::<UInt8Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}
DataType::UInt16 => {
let array = value.as_any().downcast_ref::<UInt16Array>().unwrap();
for value in array.iter() {
match value {
Some(v) => self.bitmap.insert(v as u32),
None => false,
};
}
}
e => {
return Err(DataFusionError::Internal(format!(
"Unsupported data type {:?} for bitmap distinct count",
e
)));
}
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap();

for b in binary_array.iter() {
let v = b.ok_or_else(|| {
DataFusionError::Internal(
"Impossibly got empty binary array from states".into(),
)
})?;
let bitmap = RoaringBitmap::deserialize_from(&v.to_vec()[..]).unwrap();
self.bitmap.bitor_assign(bitmap);
}
Ok(())
}

fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::from(self.bitmap.len()))
}

fn size(&self) -> usize {
self.bitmap.serialized_size()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure is a proper way to measure roaring bitmap

}
}