-
Notifications
You must be signed in to change notification settings - Fork 1.1k
add garbage_collect_dictionary to arrow-select
#7716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
6ee4216
3d05a32
ad8c45e
b728ec8
3f324c7
10ba2f5
f60344a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,11 @@ | |
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Dictionary utilities for Arrow arrays | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use crate::filter::filter; | ||
| use crate::interleave::interleave; | ||
| use ahash::RandomState; | ||
| use arrow_array::builder::BooleanBufferBuilder; | ||
|
|
@@ -23,10 +28,63 @@ use arrow_array::types::{ | |
| LargeUtf8Type, Utf8Type, | ||
| }; | ||
| use arrow_array::{cast::AsArray, downcast_primitive}; | ||
| use arrow_array::{Array, ArrayRef, DictionaryArray, GenericByteArray, PrimitiveArray}; | ||
| use arrow_array::{ | ||
| downcast_dictionary_array, AnyDictionaryArray, Array, ArrayRef, ArrowNativeTypeOp, | ||
| BooleanArray, DictionaryArray, GenericByteArray, PrimitiveArray, | ||
| }; | ||
| use arrow_buffer::{ArrowNativeType, BooleanBuffer, ScalarBuffer, ToByteSlice}; | ||
| use arrow_schema::{ArrowError, DataType}; | ||
|
|
||
| /// Garbage collects a [DictionaryArray] by removing unreferenced values. | ||
| pub fn garbage_collect_dictionary<K: ArrowDictionaryKeyType>( | ||
| dictionary: &DictionaryArray<K>, | ||
| ) -> Result<DictionaryArray<K>, ArrowError> { | ||
| let keys = dictionary.keys(); | ||
| let values = dictionary.values(); | ||
|
|
||
davidhewitt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let mask = dictionary.occupancy(); | ||
|
|
||
| // If no work to do, return the original dictionary | ||
| if mask.count_set_bits() == values.len() { | ||
| return Ok(dictionary.clone()); | ||
| } | ||
|
|
||
| // Create a mapping from the old keys to the new keys, use a Vec for easy indexing | ||
| let mut key_remap = vec![K::Native::ZERO; values.len()]; | ||
| for (new_idx, old_idx) in mask.set_indices().enumerate() { | ||
| key_remap[old_idx] = K::Native::from_usize(new_idx) | ||
| .expect("new index should fit in K::Native, as old index was in range"); | ||
| } | ||
|
|
||
| // ... and then build the new keys array | ||
| let new_keys = keys.unary(|key| { | ||
| key_remap | ||
| .get(key.as_usize()) | ||
| .copied() | ||
| // nulls may be present in the keys, and they will have arbitrary value; we don't care | ||
| // and can safely return zero | ||
| .unwrap_or(K::Native::ZERO) | ||
| }); | ||
|
|
||
| // Create a new values array by filtering using the mask | ||
| let values = filter(dictionary.values(), &BooleanArray::new(mask, None))?; | ||
|
|
||
| Ok(DictionaryArray::new(new_keys, values)) | ||
| } | ||
|
|
||
| /// Equivalent to [`garbage_collect_dictionary`] but without requiring casting to a specific key type. | ||
| pub fn garbage_collect_any_dictionary( | ||
| dictionary: &dyn AnyDictionaryArray, | ||
| ) -> Result<ArrayRef, ArrowError> { | ||
|
Comment on lines
+82
to
+84
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any particular reason we didn't ad this as a method on
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation uses |
||
| // FIXME: this is a workaround for MSRV Rust versions below 1.86 where trait upcasting is not stable. | ||
| // From 1.86 onward, `&dyn AnyDictionaryArray` can be directly passed to `downcast_dictionary_array!`. | ||
| let dictionary = &*dictionary.slice(0, dictionary.len()); | ||
|
Comment on lines
+85
to
+87
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This fixed MSRV without materially changing the PR, I'm open to alternatives like just removing
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems fine to me |
||
| downcast_dictionary_array!( | ||
| dictionary => garbage_collect_dictionary(dictionary).map(|dict| Arc::new(dict) as ArrayRef), | ||
| _ => unreachable!("have a dictionary array") | ||
| ) | ||
| } | ||
|
|
||
| /// A best effort interner that maintains a fixed number of buckets | ||
| /// and interns keys based on their hash value | ||
| /// | ||
|
|
@@ -78,7 +136,7 @@ impl<'a, V> Interner<'a, V> { | |
| } | ||
| } | ||
|
|
||
| pub struct MergedDictionaries<K: ArrowDictionaryKeyType> { | ||
| pub(crate) struct MergedDictionaries<K: ArrowDictionaryKeyType> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this change needed? I didn't see
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as #7716 (comment) |
||
| /// Provides `key_mappings[`array_idx`][`old_key`] -> new_key` | ||
| pub key_mappings: Vec<Vec<K::Native>>, | ||
| /// The new values | ||
|
|
@@ -110,7 +168,7 @@ type PtrEq = fn(&dyn Array, &dyn Array) -> bool; | |
| /// some return over the naive approach used by MutableArrayData | ||
| /// | ||
| /// `len` is the total length of the merged output | ||
| pub fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>( | ||
| pub(crate) fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iis this change still needed? I didn't see
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes - I made |
||
| dictionaries: &[&DictionaryArray<K>], | ||
| len: usize, | ||
| ) -> bool { | ||
|
|
@@ -153,7 +211,7 @@ pub fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>( | |
| /// This method is meant to be very fast and the output dictionary values | ||
| /// may not be unique, unlike `GenericByteDictionaryBuilder` which is slower | ||
| /// but produces unique values | ||
| pub fn merge_dictionary_values<K: ArrowDictionaryKeyType>( | ||
| pub(crate) fn merge_dictionary_values<K: ArrowDictionaryKeyType>( | ||
| dictionaries: &[&DictionaryArray<K>], | ||
| masks: Option<&[BooleanBuffer]>, | ||
| ) -> Result<MergedDictionaries<K>, ArrowError> { | ||
|
|
@@ -298,13 +356,88 @@ fn masked_bytes<'a, T: ByteArrayType>( | |
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use crate::dictionary::merge_dictionary_values; | ||
| use super::*; | ||
|
|
||
| use arrow_array::cast::as_string_array; | ||
| use arrow_array::types::Int32Type; | ||
| use arrow_array::{DictionaryArray, Int32Array, StringArray}; | ||
| use arrow_array::types::Int8Type; | ||
| use arrow_array::{DictionaryArray, Int32Array, Int8Array, StringArray}; | ||
| use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer}; | ||
| use std::sync::Arc; | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_i32_dictionary() { | ||
| let values = StringArray::from_iter_values(["a", "b", "c", "d"]); | ||
| let keys = Int32Array::from_iter_values([0, 1, 1, 3, 0, 0, 1]); | ||
| let dict = DictionaryArray::<Int32Type>::new(keys, Arc::new(values)); | ||
|
|
||
| // Only "a", "b", "d" are referenced, "c" is not | ||
| let gc = garbage_collect_dictionary(&dict).unwrap(); | ||
|
|
||
| let expected_values = StringArray::from_iter_values(["a", "b", "d"]); | ||
| let expected_keys = Int32Array::from_iter_values([0, 1, 1, 2, 0, 0, 1]); | ||
| let expected = DictionaryArray::<Int32Type>::new(expected_keys, Arc::new(expected_values)); | ||
|
|
||
| assert_eq!(gc, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_any_dictionary() { | ||
| let values = StringArray::from_iter_values(["a", "b", "c", "d"]); | ||
| let keys = Int32Array::from_iter_values([0, 1, 1, 3, 0, 0, 1]); | ||
| let dict = DictionaryArray::<Int32Type>::new(keys, Arc::new(values)); | ||
|
|
||
| let gc = garbage_collect_any_dictionary(&dict).unwrap(); | ||
|
|
||
| let expected_values = StringArray::from_iter_values(["a", "b", "d"]); | ||
| let expected_keys = Int32Array::from_iter_values([0, 1, 1, 2, 0, 0, 1]); | ||
| let expected = DictionaryArray::<Int32Type>::new(expected_keys, Arc::new(expected_values)); | ||
|
|
||
| assert_eq!(gc.as_ref(), &expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_with_nulls() { | ||
| let values = StringArray::from_iter_values(["a", "b", "c"]); | ||
| let keys = Int8Array::from(vec![Some(2), None, Some(0)]); | ||
| let dict = DictionaryArray::<Int8Type>::new(keys, Arc::new(values)); | ||
|
|
||
| let gc = garbage_collect_dictionary(&dict).unwrap(); | ||
|
|
||
| let expected_values = StringArray::from_iter_values(["a", "c"]); | ||
| let expected_keys = Int8Array::from(vec![Some(1), None, Some(0)]); | ||
| let expected = DictionaryArray::<Int8Type>::new(expected_keys, Arc::new(expected_values)); | ||
|
|
||
| assert_eq!(gc, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_empty_dictionary() { | ||
| let values = StringArray::from_iter_values::<&str, _>([]); | ||
| let keys = Int32Array::from_iter_values([]); | ||
| let dict = DictionaryArray::<Int32Type>::new(keys, Arc::new(values)); | ||
|
|
||
| let gc = garbage_collect_dictionary(&dict).unwrap(); | ||
|
|
||
| assert_eq!(gc, dict); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_garbage_collect_dictionary_all_unreferenced() { | ||
| let values = StringArray::from_iter_values(["a", "b", "c"]); | ||
| let keys = Int32Array::from(vec![None, None, None]); | ||
| let dict = DictionaryArray::<Int32Type>::new(keys, Arc::new(values)); | ||
|
|
||
| let gc = garbage_collect_dictionary(&dict).unwrap(); | ||
|
|
||
| // All keys are null, so dictionary values can be empty | ||
| let expected_values = StringArray::from_iter_values::<&str, _>([]); | ||
| let expected_keys = Int32Array::from(vec![None, None, None]); | ||
| let expected = DictionaryArray::<Int32Type>::new(expected_keys, Arc::new(expected_values)); | ||
|
|
||
| assert_eq!(gc, expected); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_merge_strings() { | ||
| let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "a", "b", "d", "c", "e"]); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.