-
Notifications
You must be signed in to change notification settings - Fork 1.1k
add garbage_collect_dictionary to arrow-select
#7716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
6ee4216
3d05a32
ad8c45e
b728ec8
3f324c7
10ba2f5
f60344a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,12 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Dictionary utilities for Arrow arrays | ||
|
|
||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
|
|
||
| use crate::filter::filter; | ||
| use crate::interleave::interleave; | ||
| use ahash::RandomState; | ||
| use arrow_array::builder::BooleanBufferBuilder; | ||
|
|
@@ -23,10 +29,76 @@ use arrow_array::types::{ | |
| LargeUtf8Type, Utf8Type, | ||
| }; | ||
| use arrow_array::{cast::AsArray, downcast_primitive}; | ||
| use arrow_array::{Array, ArrayRef, DictionaryArray, GenericByteArray, PrimitiveArray}; | ||
| use arrow_buffer::{ArrowNativeType, BooleanBuffer, ScalarBuffer, ToByteSlice}; | ||
| use arrow_array::{ | ||
| downcast_dictionary_array, AnyDictionaryArray, Array, ArrayRef, ArrowNativeTypeOp, | ||
| BooleanArray, DictionaryArray, GenericByteArray, PrimitiveArray, | ||
| }; | ||
| use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, ScalarBuffer, ToByteSlice}; | ||
| use arrow_schema::{ArrowError, DataType}; | ||
|
|
||
| /// Garbage collects a [DictionaryArray] by removing unreferenced values. | ||
| pub fn garbage_collect_dictionary<K: ArrowDictionaryKeyType>( | ||
| dictionary: &DictionaryArray<K>, | ||
| ) -> Result<DictionaryArray<K>, ArrowError> { | ||
| let keys = dictionary.keys(); | ||
| let values = dictionary.values(); | ||
|
|
||
davidhewitt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let mut mask_builder = | ||
| BooleanBufferBuilder::new_from_buffer(MutableBuffer::new_null(values.len()), values.len()); | ||
|
|
||
| for key in keys { | ||
| if let Some(key) = key { | ||
| mask_builder.set_bit(key.as_usize(), true); | ||
| } | ||
| } | ||
|
|
||
| let mask = mask_builder.finish(); | ||
|
|
||
| // If no work to do, return the original dictionary | ||
| if mask.count_set_bits() == values.len() { | ||
| return Ok(dictionary.clone()); | ||
| } | ||
|
|
||
| // Remap the keys to new indices based on the set bits in the mask | ||
| let key_remap: HashMap<usize, usize> = mask | ||
davidhewitt marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .set_indices() | ||
| .enumerate() | ||
| .map(|(new, old)| (old, new)) | ||
| .collect(); | ||
|
|
||
| let new_keys = keys | ||
| .iter() | ||
| .map(|key| { | ||
| key.map_or(<K::Native as ArrowNativeTypeOp>::ZERO, |k| { | ||
| K::Native::from_usize( | ||
| key_remap | ||
| .get(&k.as_usize()) | ||
| .copied() | ||
| .expect("key should be in remap"), | ||
| ) | ||
| .expect("key remap should always be in range of K") | ||
| }) | ||
| }) | ||
| .collect::<ScalarBuffer<K::Native>>(); | ||
|
|
||
| let new_keys = PrimitiveArray::new(new_keys, keys.nulls().cloned()); | ||
|
|
||
| // Create a new values array with the masked values | ||
| let values = filter(dictionary.values(), &BooleanArray::new(mask, None))?; | ||
|
||
|
|
||
| Ok(DictionaryArray::new(new_keys, values)) | ||
| } | ||
|
|
||
| /// Equivalent to [`garbage_collect_dictionary`] but without requiring casting to a specific key type. | ||
| pub fn garbage_collect_any_dictionary( | ||
| dictionary: &dyn AnyDictionaryArray, | ||
| ) -> Result<ArrayRef, ArrowError> { | ||
|
Comment on lines
+82
to
+84
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any particular reason we didn't ad this as a method on
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation uses |
||
| downcast_dictionary_array!( | ||
| dictionary => garbage_collect_dictionary(dictionary).map(|dict| Arc::new(dict) as ArrayRef), | ||
| _ => unreachable!("have a dictionary array") | ||
| ) | ||
| } | ||
|
|
||
| /// A best effort interner that maintains a fixed number of buckets | ||
| /// and interns keys based on their hash value | ||
| /// | ||
|
|
@@ -78,7 +150,7 @@ impl<'a, V> Interner<'a, V> { | |
| } | ||
| } | ||
|
|
||
| pub struct MergedDictionaries<K: ArrowDictionaryKeyType> { | ||
| pub(crate) struct MergedDictionaries<K: ArrowDictionaryKeyType> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this change needed? I didn't see
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as #7716 (comment) |
||
| /// Provides `key_mappings[`array_idx`][`old_key`] -> new_key` | ||
| pub key_mappings: Vec<Vec<K::Native>>, | ||
| /// The new values | ||
|
|
@@ -110,7 +182,7 @@ type PtrEq = fn(&dyn Array, &dyn Array) -> bool; | |
| /// some return over the naive approach used by MutableArrayData | ||
| /// | ||
| /// `len` is the total length of the merged output | ||
| pub fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>( | ||
| pub(crate) fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iis this change still needed? I didn't see
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes - I made |
||
| dictionaries: &[&DictionaryArray<K>], | ||
| len: usize, | ||
| ) -> bool { | ||
|
|
@@ -153,7 +225,7 @@ pub fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>( | |
| /// This method is meant to be very fast and the output dictionary values | ||
| /// may not be unique, unlike `GenericByteDictionaryBuilder` which is slower | ||
| /// but produces unique values | ||
| pub fn merge_dictionary_values<K: ArrowDictionaryKeyType>( | ||
| pub(crate) fn merge_dictionary_values<K: ArrowDictionaryKeyType>( | ||
| dictionaries: &[&DictionaryArray<K>], | ||
| masks: Option<&[BooleanBuffer]>, | ||
| ) -> Result<MergedDictionaries<K>, ArrowError> { | ||
|
|
@@ -298,13 +370,88 @@ fn masked_bytes<'a, T: ByteArrayType>( | |
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use crate::dictionary::merge_dictionary_values; | ||
| use super::*; | ||
|
|
||
| use arrow_array::cast::as_string_array; | ||
| use arrow_array::types::Int32Type; | ||
| use arrow_array::{DictionaryArray, Int32Array, StringArray}; | ||
| use arrow_array::types::Int8Type; | ||
| use arrow_array::{DictionaryArray, Int32Array, Int8Array, StringArray}; | ||
| use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer}; | ||
| use std::sync::Arc; | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_i32_dictionary() { | ||
| let values = StringArray::from_iter_values(["a", "b", "c", "d"]); | ||
| let keys = Int32Array::from_iter_values([0, 1, 1, 3, 0, 0, 1]); | ||
| let dict = DictionaryArray::<Int32Type>::new(keys, Arc::new(values)); | ||
|
|
||
| // Only "a", "b", "d" are referenced, "c" is not | ||
| let gc = garbage_collect_dictionary(&dict).unwrap(); | ||
|
|
||
| let expected_values = StringArray::from_iter_values(["a", "b", "d"]); | ||
| let expected_keys = Int32Array::from_iter_values([0, 1, 1, 2, 0, 0, 1]); | ||
| let expected = DictionaryArray::<Int32Type>::new(expected_keys, Arc::new(expected_values)); | ||
|
|
||
| assert_eq!(gc, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_any_dictionary() { | ||
| let values = StringArray::from_iter_values(["a", "b", "c", "d"]); | ||
| let keys = Int32Array::from_iter_values([0, 1, 1, 3, 0, 0, 1]); | ||
| let dict = DictionaryArray::<Int32Type>::new(keys, Arc::new(values)); | ||
|
|
||
| let gc = garbage_collect_any_dictionary(&dict).unwrap(); | ||
|
|
||
| let expected_values = StringArray::from_iter_values(["a", "b", "d"]); | ||
| let expected_keys = Int32Array::from_iter_values([0, 1, 1, 2, 0, 0, 1]); | ||
| let expected = DictionaryArray::<Int32Type>::new(expected_keys, Arc::new(expected_values)); | ||
|
|
||
| assert_eq!(gc.as_ref(), &expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_with_nulls() { | ||
| let values = StringArray::from_iter_values(["a", "b", "c"]); | ||
| let keys = Int8Array::from(vec![Some(2), None, Some(0)]); | ||
| let dict = DictionaryArray::<Int8Type>::new(keys, Arc::new(values)); | ||
|
|
||
| let gc = garbage_collect_dictionary(&dict).unwrap(); | ||
|
|
||
| let expected_values = StringArray::from_iter_values(["a", "c"]); | ||
| let expected_keys = Int8Array::from(vec![Some(1), None, Some(0)]); | ||
| let expected = DictionaryArray::<Int8Type>::new(expected_keys, Arc::new(expected_values)); | ||
|
|
||
| assert_eq!(gc, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_empty_dictionary() { | ||
| let values = StringArray::from_iter_values::<&str, _>([]); | ||
| let keys = Int32Array::from_iter_values([]); | ||
| let dict = DictionaryArray::<Int32Type>::new(keys, Arc::new(values)); | ||
|
|
||
| let gc = garbage_collect_dictionary(&dict).unwrap(); | ||
|
|
||
| assert_eq!(gc, dict); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_dictionary_all_unreferenced() { | ||
| let values = StringArray::from_iter_values(["a", "b", "c"]); | ||
| let keys = Int32Array::from(vec![None, None, None]); | ||
| let dict = DictionaryArray::<Int32Type>::new(keys, Arc::new(values)); | ||
|
|
||
| let gc = garbage_collect_dictionary(&dict).unwrap(); | ||
|
|
||
| // All keys are null, so dictionary values can be empty | ||
| let expected_values = StringArray::from_iter_values::<&str, _>([]); | ||
| let expected_keys = Int32Array::from(vec![None, None, None]); | ||
| let expected = DictionaryArray::<Int32Type>::new(expected_keys, Arc::new(expected_values)); | ||
|
|
||
| assert_eq!(gc, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_merge_strings() { | ||
| let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "a", "b", "d", "c", "e"]); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.