Skip to content

Commit 52d8d56

Browse files
authored
Revert "Revert "Improve coalesce and concat performance for views… (#7625)
… (#7614)" (#7623)" This reverts commit da461c8. This adds a test and fix for the wrong index issue. I also verified the change for DataFusion (and benchmarks show notable improvements). # Which issue does this PR close? Closes #NNN. # Rationale for this change # What changes are included in this PR? # Are there any user-facing changes?
1 parent da461c8 commit 52d8d56

File tree

3 files changed

+161
-55
lines changed

3 files changed

+161
-55
lines changed

arrow-array/src/builder/generic_bytes_view_builder.rs

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::any::Any;
1919
use std::marker::PhantomData;
2020
use std::sync::Arc;
2121

22-
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
22+
use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
2323
use arrow_data::ByteView;
2424
use arrow_schema::ArrowError;
2525
use hashbrown::hash_table::Entry;
@@ -28,7 +28,7 @@ use hashbrown::HashTable;
2828
use crate::builder::ArrayBuilder;
2929
use crate::types::bytes::ByteArrayNativeType;
3030
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31-
use crate::{ArrayRef, GenericByteViewArray};
31+
use crate::{Array, ArrayRef, GenericByteViewArray};
3232

3333
const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB
3434
const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB
@@ -79,7 +79,7 @@ impl BlockSizeGrowthStrategy {
7979
/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended
8080
/// using [`GenericByteViewBuilder::try_append_view`]
8181
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82-
views_builder: BufferBuilder<u128>,
82+
views_buffer: Vec<u128>,
8383
null_buffer_builder: NullBufferBuilder,
8484
completed: Vec<Buffer>,
8585
in_progress: Vec<u8>,
@@ -99,7 +99,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
9999
/// Creates a new [`GenericByteViewBuilder`] with space for `capacity` string values.
100100
pub fn with_capacity(capacity: usize) -> Self {
101101
Self {
102-
views_builder: BufferBuilder::new(capacity),
102+
views_buffer: Vec::with_capacity(capacity),
103103
null_buffer_builder: NullBufferBuilder::new(capacity),
104104
completed: vec![],
105105
in_progress: vec![],
@@ -148,7 +148,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
148148
pub fn with_deduplicate_strings(self) -> Self {
149149
Self {
150150
string_tracker: Some((
151-
HashTable::with_capacity(self.views_builder.capacity()),
151+
HashTable::with_capacity(self.views_buffer.capacity()),
152152
Default::default(),
153153
)),
154154
..self
@@ -201,10 +201,42 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
201201
let b = b.get_unchecked(start..end);
202202

203203
let view = make_view(b, block, offset);
204-
self.views_builder.append(view);
204+
self.views_buffer.push(view);
205205
self.null_buffer_builder.append_non_null();
206206
}
207207

208+
/// Appends an array to the builder.
209+
/// This will flush any in-progress block and append the data buffers
210+
/// and add the (adapted) views.
211+
pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
212+
self.flush_in_progress();
213+
// keep original views if this array is the first to be added or if there are no data buffers (all inline views)
214+
let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
215+
let starting_buffer = self.completed.len() as u32;
216+
217+
self.completed.extend(array.data_buffers().iter().cloned());
218+
219+
if keep_views {
220+
self.views_buffer.extend_from_slice(array.views());
221+
} else {
222+
self.views_buffer.extend(array.views().iter().map(|v| {
223+
let mut byte_view = ByteView::from(*v);
224+
if byte_view.length > 12 {
225+
// Small views (<=12 bytes) are inlined, so only need to update large views
226+
byte_view.buffer_index += starting_buffer;
227+
};
228+
229+
byte_view.as_u128()
230+
}));
231+
}
232+
233+
if let Some(null_buffer) = array.nulls() {
234+
self.null_buffer_builder.append_buffer(null_buffer);
235+
} else {
236+
self.null_buffer_builder.append_n_non_nulls(array.len());
237+
}
238+
}
239+
208240
/// Try to append a view of the given `block`, `offset` and `length`
209241
///
210242
/// See [`Self::append_block`]
@@ -255,7 +287,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
255287
/// Useful if we want to know what value has been inserted to the builder
256288
/// The index has to be smaller than `self.len()`, otherwise it will panic
257289
pub fn get_value(&self, index: usize) -> &[u8] {
258-
let view = self.views_builder.as_slice().get(index).unwrap();
290+
let view = self.views_buffer.as_slice().get(index).unwrap();
259291
let len = *view as u32;
260292
if len <= 12 {
261293
// # Safety
@@ -287,7 +319,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
287319
let mut view_buffer = [0; 16];
288320
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
289321
view_buffer[4..4 + v.len()].copy_from_slice(v);
290-
self.views_builder.append(u128::from_le_bytes(view_buffer));
322+
self.views_buffer.push(u128::from_le_bytes(view_buffer));
291323
self.null_buffer_builder.append_non_null();
292324
return;
293325
}
@@ -311,16 +343,15 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
311343
Entry::Occupied(occupied) => {
312344
// If the string already exists, we will directly use the view
313345
let idx = occupied.get();
314-
self.views_builder
315-
.append(self.views_builder.as_slice()[*idx]);
346+
self.views_buffer.push(self.views_buffer[*idx]);
316347
self.null_buffer_builder.append_non_null();
317348
self.string_tracker = Some((ht, hasher));
318349
return;
319350
}
320351
Entry::Vacant(vacant) => {
321352
// o.w. we insert the (string hash -> view index)
322353
// the idx is current length of views_builder, as we are inserting a new view
323-
vacant.insert(self.views_builder.len());
354+
vacant.insert(self.views_buffer.len());
324355
}
325356
}
326357
self.string_tracker = Some((ht, hasher));
@@ -341,7 +372,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
341372
buffer_index: self.completed.len() as u32,
342373
offset,
343374
};
344-
self.views_builder.append(view.into());
375+
self.views_buffer.push(view.into());
345376
self.null_buffer_builder.append_non_null();
346377
}
347378

@@ -358,21 +389,20 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
358389
#[inline]
359390
pub fn append_null(&mut self) {
360391
self.null_buffer_builder.append_null();
361-
self.views_builder.append(0);
392+
self.views_buffer.push(0);
362393
}
363394

364395
/// Builds the [`GenericByteViewArray`] and reset this builder
365396
pub fn finish(&mut self) -> GenericByteViewArray<T> {
366397
self.flush_in_progress();
367398
let completed = std::mem::take(&mut self.completed);
368-
let len = self.views_builder.len();
369-
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
370399
let nulls = self.null_buffer_builder.finish();
371400
if let Some((ref mut ht, _)) = self.string_tracker.as_mut() {
372401
ht.clear();
373402
}
403+
let views = std::mem::take(&mut self.views_buffer);
374404
// SAFETY: valid by construction
375-
unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
405+
unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
376406
}
377407

378408
/// Builds the [`GenericByteViewArray`] without resetting the builder
@@ -381,8 +411,8 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
381411
if !self.in_progress.is_empty() {
382412
completed.push(Buffer::from_slice_ref(&self.in_progress));
383413
}
384-
let len = self.views_builder.len();
385-
let views = Buffer::from_slice_ref(self.views_builder.as_slice());
414+
let len = self.views_buffer.len();
415+
let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
386416
let views = ScalarBuffer::new(views, 0, len);
387417
let nulls = self.null_buffer_builder.finish_cloned();
388418
// SAFETY: valid by construction
@@ -396,7 +426,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
396426

397427
/// Return the allocated size of this builder in bytes, useful for memory accounting.
398428
pub fn allocated_size(&self) -> usize {
399-
let views = self.views_builder.capacity() * std::mem::size_of::<u128>();
429+
let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
400430
let null = self.null_buffer_builder.allocated_size();
401431
let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
402432
let in_progress = self.in_progress.capacity();
@@ -418,7 +448,7 @@ impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
418448
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419449
write!(f, "{}ViewBuilder", T::PREFIX)?;
420450
f.debug_struct("")
421-
.field("views_builder", &self.views_builder)
451+
.field("views_buffer", &self.views_buffer)
422452
.field("in_progress", &self.in_progress)
423453
.field("completed", &self.completed)
424454
.field("null_buffer_builder", &self.null_buffer_builder)

arrow-select/src/coalesce.rs

Lines changed: 73 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
//! [`filter`]: crate::filter::filter
2222
//! [`take`]: crate::take::take
2323
use crate::concat::concat_batches;
24-
use arrow_array::{
25-
builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions,
26-
};
24+
use arrow_array::StringViewArray;
25+
use arrow_array::{cast::AsArray, Array, ArrayRef, RecordBatch};
26+
use arrow_data::ByteView;
2727
use arrow_schema::{ArrowError, SchemaRef};
2828
use std::collections::VecDeque;
2929
use std::sync::Arc;
@@ -164,7 +164,7 @@ impl BatchCoalescer {
164164
return Ok(());
165165
}
166166

167-
let mut batch = gc_string_view_batch(&batch);
167+
let mut batch = gc_string_view_batch(batch);
168168

169169
// If pushing this batch would exceed the target batch size,
170170
// finish the current batch and start a new one
@@ -242,15 +242,19 @@ impl BatchCoalescer {
242242
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
243243
/// `StringViewArray` may only refer to a small portion of the buffer,
244244
/// significantly increasing memory usage.
245-
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
246-
let new_columns: Vec<ArrayRef> = batch
247-
.columns()
248-
.iter()
245+
fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
246+
let (schema, columns, num_rows) = batch.into_parts();
247+
let new_columns: Vec<ArrayRef> = columns
248+
.into_iter()
249249
.map(|c| {
250250
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
251251
let Some(s) = c.as_string_view_opt() else {
252-
return Arc::clone(c);
252+
return c;
253253
};
254+
if s.data_buffers().is_empty() {
255+
// If there are no data buffers, we can just return the array as is
256+
return c;
257+
}
254258
let ideal_buffer_size: usize = s
255259
.views()
256260
.iter()
@@ -264,42 +268,73 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
264268
})
265269
.sum();
266270
let actual_buffer_size = s.get_buffer_memory_size();
271+
let buffers = s.data_buffers();
267272

268273
// Re-creating the array copies data and can be time consuming.
269274
// We only do it if the array is sparse
270275
if actual_buffer_size > (ideal_buffer_size * 2) {
276+
if ideal_buffer_size == 0 {
277+
// If the ideal buffer size is 0, all views are inlined
278+
// so just reuse the views
279+
return Arc::new(unsafe {
280+
StringViewArray::new_unchecked(
281+
s.views().clone(),
282+
vec![],
283+
s.nulls().cloned(),
284+
)
285+
});
286+
}
271287
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
272288
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
273-
let mut builder = StringViewBuilder::with_capacity(s.len());
274-
if ideal_buffer_size > 0 {
275-
builder = builder.with_fixed_block_size(ideal_buffer_size as u32);
276-
}
277-
278-
for v in s.iter() {
279-
builder.append_option(v);
280-
}
281-
282-
let gc_string = builder.finish();
283-
284-
debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
289+
let mut buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size);
290+
291+
let views: Vec<u128> = s
292+
.views()
293+
.as_ref()
294+
.iter()
295+
.cloned()
296+
.map(|v| {
297+
let mut b: ByteView = ByteView::from(v);
298+
299+
if b.length > 12 {
300+
let offset = buffer.len() as u32;
301+
buffer.extend_from_slice(
302+
buffers[b.buffer_index as usize]
303+
.get(b.offset as usize..b.offset as usize + b.length as usize)
304+
.expect("Invalid buffer slice"),
305+
);
306+
b.offset = offset;
307+
b.buffer_index = 0; // Set buffer index to 0, as we only have one buffer
308+
}
309+
310+
b.into()
311+
})
312+
.collect();
313+
314+
let buffers = if buffer.is_empty() {
315+
vec![]
316+
} else {
317+
vec![buffer.into()]
318+
};
319+
320+
let gc_string = unsafe {
321+
StringViewArray::new_unchecked(views.into(), buffers, s.nulls().cloned())
322+
};
285323

286324
Arc::new(gc_string)
287325
} else {
288-
Arc::clone(c)
326+
c
289327
}
290328
})
291329
.collect();
292-
let mut options = RecordBatchOptions::new();
293-
options = options.with_row_count(Some(batch.num_rows()));
294-
RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
295-
.expect("Failed to re-create the gc'ed record batch")
330+
unsafe { RecordBatch::new_unchecked(schema, new_columns, num_rows) }
296331
}
297332

298333
#[cfg(test)]
299334
mod tests {
300335
use super::*;
301-
use arrow_array::builder::ArrayBuilder;
302-
use arrow_array::{StringViewArray, UInt32Array};
336+
use arrow_array::builder::{ArrayBuilder, StringViewBuilder};
337+
use arrow_array::{RecordBatchOptions, StringViewArray, UInt32Array};
303338
use arrow_schema::{DataType, Field, Schema};
304339
use std::ops::Range;
305340

@@ -518,9 +553,11 @@ mod tests {
518553
fn test_gc_string_view_test_batch_empty() {
519554
let schema = Schema::empty();
520555
let batch = RecordBatch::new_empty(schema.into());
521-
let output_batch = gc_string_view_batch(&batch);
522-
assert_eq!(batch.num_columns(), output_batch.num_columns());
523-
assert_eq!(batch.num_rows(), output_batch.num_rows());
556+
let cols = batch.num_columns();
557+
let num_rows = batch.num_rows();
558+
let output_batch = gc_string_view_batch(batch);
559+
assert_eq!(cols, output_batch.num_columns());
560+
assert_eq!(num_rows, output_batch.num_rows());
524561
}
525562

526563
#[test]
@@ -568,9 +605,11 @@ mod tests {
568605
/// and ensures the number of rows are the same
569606
fn do_gc(array: StringViewArray) -> StringViewArray {
570607
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
571-
let gc_batch = gc_string_view_batch(&batch);
572-
assert_eq!(batch.num_rows(), gc_batch.num_rows());
573-
assert_eq!(batch.schema(), gc_batch.schema());
608+
let rows = batch.num_rows();
609+
let schema = batch.schema();
610+
let gc_batch = gc_string_view_batch(batch);
611+
assert_eq!(rows, gc_batch.num_rows());
612+
assert_eq!(schema, gc_batch.schema());
574613
gc_batch
575614
.column(0)
576615
.as_any()

0 commit comments

Comments
 (0)