Skip to content
Merged
130 changes: 119 additions & 11 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,18 +512,72 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
};
}

// 3) Allocate exactly capacity for all non-inline data
let mut data_buf = Vec::with_capacity(total_large);
struct GcCopyGroup {
total_buffer_bytes: usize,
total_len: usize,
}

let gc_copy_groups = if total_large > i32::MAX as usize {
// Slow-path: need to split into multiple copy groups
let mut groups = vec![];
let mut current_length = 0;
let mut current_elements = 0;

// 4) Iterate over views and process each inline/non-inline view
let views_buf: Vec<u128> = (0..len)
.map(|i| unsafe { self.copy_view_to_buffer(i, &mut data_buf) })
.collect();
for view in self.views() {
let len = *view as u32;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is so slow, but it's right, I can make it faster(by handling the numbers via grouping or batching) if required

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how you would make this much faster - I think the code needs to find the locations to split in any event

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the buffer size is greater than i32::MAX, it's possible that a single buffer is much smaller than i32::MAX, so this can find batch-by-batch, rather than just adding small buffer one-by-one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- you are saying you could potentially optimize the input by looking at each input buffer or something and gcing it individually or something.

That would be interesting, though it would probably take a lot of care to make it fast.

if len > MAX_INLINE_VIEW_LEN {
if current_length + len > i32::MAX as u32 {
// Start a new group
groups.push(GcCopyGroup {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can preallocate groups (with_capacity) or use .collect

total_buffer_bytes: current_length as usize,
total_len: current_elements,
});
current_length = 0;
current_elements = 0;
}
current_length += len;
current_elements += 1;
}
}
if current_elements != 0 {
groups.push(GcCopyGroup {
total_buffer_bytes: current_length as usize,
total_len: current_elements,
});
}
groups
} else {
let gc_copy_group = GcCopyGroup {
total_buffer_bytes: total_large,
total_len: len,
};
vec![gc_copy_group]
};
assert!(gc_copy_groups.len() <= i32::MAX as usize);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion can be removed, I just ensure it would pass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be change to assert debug here.


// 3) Copy the buffers group by group
let mut views_buf = Vec::with_capacity(len);
let mut data_blocks = Vec::with_capacity(gc_copy_groups.len());

let mut current_view_idx = 0;

for (group_idx, gc_copy_group) in gc_copy_groups.iter().enumerate() {
let mut data_buf = Vec::with_capacity(gc_copy_group.total_buffer_bytes);

let group_views: Vec<u128> = (current_view_idx
..current_view_idx + gc_copy_group.total_len)
.map(|view_idx| unsafe {
self.copy_view_to_buffer(view_idx, group_idx as i32, &mut data_buf)
})
.collect();

views_buf.extend(group_views);
data_blocks.push(Buffer::from_vec(data_buf));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This coul as well use collect.

current_view_idx += gc_copy_group.total_len;
}

// 5) Wrap up buffers
let data_block = Buffer::from_vec(data_buf);
// 4) Wrap up buffers
let views_scalar = ScalarBuffer::from(views_buf);
let data_blocks = vec![data_block];

// SAFETY: views_scalar, data_blocks, and nulls are correctly aligned and sized
unsafe { GenericByteViewArray::new_unchecked(views_scalar, data_blocks, nulls) }
Expand All @@ -541,7 +595,12 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
/// `buffer_index` reset to `0` and its `offset` updated so that it points
/// into the bytes just appended at the end of `data_buf`.
#[inline(always)]
unsafe fn copy_view_to_buffer(&self, i: usize, data_buf: &mut Vec<u8>) -> u128 {
unsafe fn copy_view_to_buffer(
&self,
i: usize,
buffer_idx: i32,
data_buf: &mut Vec<u8>,
) -> u128 {
// SAFETY: `i < self.len()` ensures this is in‑bounds.
let raw_view = unsafe { *self.views().get_unchecked(i) };
let mut bv = ByteView::from(raw_view);
Expand All @@ -561,7 +620,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
let new_offset = data_buf.len() as u32;
data_buf.extend_from_slice(slice);

bv.buffer_index = 0;
bv.buffer_index = buffer_idx as u32;
bv.offset = new_offset;
bv.into()
}
Expand Down Expand Up @@ -1404,6 +1463,55 @@ mod tests {
}
}

#[test]
fn test_gc_huge_array() {
// Construct multiple 128 MiB BinaryView entries so total > 4 GiB
let block_len: usize = 128 * 1024 * 1024; // 128 MiB per view
let num_views: usize = 36;

// Create a single 128 MiB data block with a simple byte pattern
let buffer = Buffer::from_vec(vec![0xAB; block_len]);
let buffer2 = Buffer::from_vec(vec![0xFF; block_len]);

// Append this block and then add many views pointing to it
let mut builder = BinaryViewBuilder::new();
let block_id = builder.append_block(buffer);
for _ in 0..num_views / 2 {
builder
.try_append_view(block_id, 0, block_len as u32)
.expect("append view into 128MiB block");
}
let block_id2 = builder.append_block(buffer2);
for _ in 0..num_views / 2 {
builder
.try_append_view(block_id2, 0, block_len as u32)
.expect("append view into 128MiB block");
}

let array = builder.finish();
let total = array.total_buffer_bytes_used();
assert!(
total > u32::MAX as usize,
"Expected total non-inline bytes to exceed 4 GiB, got {}",
total
);

// Run gc and verify correctness
let gced = array.gc();
assert_eq!(gced.len(), num_views, "Length mismatch after gc");
assert_eq!(gced.null_count(), 0, "Null count mismatch after gc");
assert_ne!(
gced.data_buffers().len(),
1,
"gc with huge buffer should not consolidate data into a single buffer"
);

// Element-wise equality check across the entire array
array.iter().zip(gced.iter()).for_each(|(orig, got)| {
assert_eq!(orig, got, "Value mismatch after gc on huge array");
});
}

#[test]
fn test_eq() {
let test_data = [
Expand Down
Loading