Skip to content

Commit 7739a83

Browse files
Dandandanalamb
andauthored
Improve coalesce and concat performance for views (apache#7614)
# Which issue does this PR close? - Closes apache#7615 - Follow on to apache#7597 # Rationale for this change Improve performance of `gc_string_view_batch` ``` filter: mixed_utf8view, 8192, nulls: 0, selectivity: 0.001 1.00 30.4±1.05ms ? ?/sec 1.29 39.3±0.88ms ? ?/sec filter: mixed_utf8view, 8192, nulls: 0, selectivity: 0.01 1.00 4.3±0.17ms ? ?/sec 1.20 5.2±0.15ms ? ?/sec filter: mixed_utf8view, 8192, nulls: 0, selectivity: 0.1 1.00 1805.1±25.77µs ? ?/sec 1.32 2.4±0.20ms ? ?/sec filter: mixed_utf8view, 8192, nulls: 0, selectivity: 0.8 1.00 2.6±0.12ms ? ?/sec 1.48 3.8±0.11ms ? ?/sec filter: mixed_utf8view, 8192, nulls: 0.1, selectivity: 0.001 1.00 42.5±0.48ms ? ?/sec 1.23 52.2±1.33ms ? ?/sec filter: mixed_utf8view, 8192, nulls: 0.1, selectivity: 0.01 1.00 5.8±0.12ms ? ?/sec 1.28 7.4±0.20ms ? ?/sec filter: mixed_utf8view, 8192, nulls: 0.1, selectivity: 0.1 1.00 2.2±0.02ms ? ?/sec 1.37 3.1±0.18ms ? ?/sec filter: mixed_utf8view, 8192, nulls: 0.1, selectivity: 0.8 1.00 3.6±0.15ms ? ?/sec 1.43 5.1±0.12ms ? ?/sec filter: single_utf8view, 8192, nulls: 0, selectivity: 0.001 1.00 51.0±0.59ms ? ?/sec 1.38 70.3±1.11ms ? ?/sec filter: single_utf8view, 8192, nulls: 0, selectivity: 0.01 1.00 6.7±0.03ms ? ?/sec 1.32 8.8±0.16ms ? ?/sec filter: single_utf8view, 8192, nulls: 0, selectivity: 0.1 1.00 3.0±0.01ms ? ?/sec 1.41 4.3±0.09ms ? ?/sec filter: single_utf8view, 8192, nulls: 0, selectivity: 0.8 1.00 4.5±0.34ms ? ?/sec 1.71 7.7±0.28ms ? ?/sec filter: single_utf8view, 8192, nulls: 0.1, selectivity: 0.001 1.00 64.2±0.74ms ? ?/sec 1.33 85.1±1.52ms ? ?/sec filter: single_utf8view, 8192, nulls: 0.1, selectivity: 0.01 1.00 9.4±0.09ms ? ?/sec 1.35 12.6±0.26ms ? ?/sec filter: single_utf8view, 8192, nulls: 0.1, selectivity: 0.1 1.00 3.8±0.03ms ? ?/sec 1.46 5.6±0.11ms ? ?/sec filter: single_utf8view, 8192, nulls: 0.1, selectivity: 0.8 1.00 5.7±0.28ms ? ?/sec 1.73 9.9±0.27ms ? ?/sec ``` # What changes are included in this PR? * Avoiding recreating the views from scratch. * Specialize concat for view types * Takes owned RecordBatch (effect on performance is small, might be measurable with smaller batch size / more columns). # Are there any user-facing changes? no --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 44d7194 commit 7739a83

File tree

3 files changed

+138
-55
lines changed

3 files changed

+138
-55
lines changed

arrow-array/src/builder/generic_bytes_view_builder.rs

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::any::Any;
1919
use std::marker::PhantomData;
2020
use std::sync::Arc;
2121

22-
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
22+
use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
2323
use arrow_data::ByteView;
2424
use arrow_schema::ArrowError;
2525
use hashbrown::hash_table::Entry;
@@ -28,7 +28,7 @@ use hashbrown::HashTable;
2828
use crate::builder::ArrayBuilder;
2929
use crate::types::bytes::ByteArrayNativeType;
3030
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31-
use crate::{ArrayRef, GenericByteViewArray};
31+
use crate::{Array, ArrayRef, GenericByteViewArray};
3232

3333
const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB
3434
const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB
@@ -79,7 +79,7 @@ impl BlockSizeGrowthStrategy {
7979
/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended
8080
/// using [`GenericByteViewBuilder::try_append_view`]
8181
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82-
views_builder: BufferBuilder<u128>,
82+
views_buffer: Vec<u128>,
8383
null_buffer_builder: NullBufferBuilder,
8484
completed: Vec<Buffer>,
8585
in_progress: Vec<u8>,
@@ -99,7 +99,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
9999
/// Creates a new [`GenericByteViewBuilder`] with space for `capacity` string values.
100100
pub fn with_capacity(capacity: usize) -> Self {
101101
Self {
102-
views_builder: BufferBuilder::new(capacity),
102+
views_buffer: Vec::with_capacity(capacity),
103103
null_buffer_builder: NullBufferBuilder::new(capacity),
104104
completed: vec![],
105105
in_progress: vec![],
@@ -148,7 +148,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
148148
pub fn with_deduplicate_strings(self) -> Self {
149149
Self {
150150
string_tracker: Some((
151-
HashTable::with_capacity(self.views_builder.capacity()),
151+
HashTable::with_capacity(self.views_buffer.capacity()),
152152
Default::default(),
153153
)),
154154
..self
@@ -201,10 +201,43 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
201201
let b = b.get_unchecked(start..end);
202202

203203
let view = make_view(b, block, offset);
204-
self.views_builder.append(view);
204+
self.views_buffer.push(view);
205205
self.null_buffer_builder.append_non_null();
206206
}
207207

208+
/// Appends an array to the builder.
209+
/// This will flush any in-progress block and append the data buffers
210+
/// and add the (adapted) views.
211+
pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
212+
self.flush_in_progress();
213+
// keep original views if this array is the first to be added or if there are no data buffers (all inline views)
214+
let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
215+
216+
self.completed.extend(array.data_buffers().iter().cloned());
217+
218+
if keep_views {
219+
self.views_buffer.extend_from_slice(array.views());
220+
} else {
221+
let starting_buffer = self.completed.len() as u32;
222+
223+
self.views_buffer.extend(array.views().iter().map(|v| {
224+
let mut byte_view = ByteView::from(*v);
225+
if byte_view.length > 12 {
226+
// Small views (<=12 bytes) are inlined, so only need to update large views
227+
byte_view.buffer_index += starting_buffer;
228+
};
229+
230+
byte_view.as_u128()
231+
}));
232+
}
233+
234+
if let Some(null_buffer) = array.nulls() {
235+
self.null_buffer_builder.append_buffer(null_buffer);
236+
} else {
237+
self.null_buffer_builder.append_n_non_nulls(array.len());
238+
}
239+
}
240+
208241
/// Try to append a view of the given `block`, `offset` and `length`
209242
///
210243
/// See [`Self::append_block`]
@@ -255,7 +288,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
255288
/// Useful if we want to know what value has been inserted to the builder
256289
/// The index has to be smaller than `self.len()`, otherwise it will panic
257290
pub fn get_value(&self, index: usize) -> &[u8] {
258-
let view = self.views_builder.as_slice().get(index).unwrap();
291+
let view = self.views_buffer.as_slice().get(index).unwrap();
259292
let len = *view as u32;
260293
if len <= 12 {
261294
// # Safety
@@ -287,7 +320,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
287320
let mut view_buffer = [0; 16];
288321
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
289322
view_buffer[4..4 + v.len()].copy_from_slice(v);
290-
self.views_builder.append(u128::from_le_bytes(view_buffer));
323+
self.views_buffer.push(u128::from_le_bytes(view_buffer));
291324
self.null_buffer_builder.append_non_null();
292325
return;
293326
}
@@ -311,16 +344,15 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
311344
Entry::Occupied(occupied) => {
312345
// If the string already exists, we will directly use the view
313346
let idx = occupied.get();
314-
self.views_builder
315-
.append(self.views_builder.as_slice()[*idx]);
347+
self.views_buffer.push(self.views_buffer[*idx]);
316348
self.null_buffer_builder.append_non_null();
317349
self.string_tracker = Some((ht, hasher));
318350
return;
319351
}
320352
Entry::Vacant(vacant) => {
321353
// o.w. we insert the (string hash -> view index)
322354
// the idx is current length of views_builder, as we are inserting a new view
323-
vacant.insert(self.views_builder.len());
355+
vacant.insert(self.views_buffer.len());
324356
}
325357
}
326358
self.string_tracker = Some((ht, hasher));
@@ -341,7 +373,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
341373
buffer_index: self.completed.len() as u32,
342374
offset,
343375
};
344-
self.views_builder.append(view.into());
376+
self.views_buffer.push(view.into());
345377
self.null_buffer_builder.append_non_null();
346378
}
347379

@@ -358,21 +390,20 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
358390
#[inline]
359391
pub fn append_null(&mut self) {
360392
self.null_buffer_builder.append_null();
361-
self.views_builder.append(0);
393+
self.views_buffer.push(0);
362394
}
363395

364396
/// Builds the [`GenericByteViewArray`] and reset this builder
365397
pub fn finish(&mut self) -> GenericByteViewArray<T> {
366398
self.flush_in_progress();
367399
let completed = std::mem::take(&mut self.completed);
368-
let len = self.views_builder.len();
369-
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
370400
let nulls = self.null_buffer_builder.finish();
371401
if let Some((ref mut ht, _)) = self.string_tracker.as_mut() {
372402
ht.clear();
373403
}
404+
let views = std::mem::take(&mut self.views_buffer);
374405
// SAFETY: valid by construction
375-
unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
406+
unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
376407
}
377408

378409
/// Builds the [`GenericByteViewArray`] without resetting the builder
@@ -381,8 +412,8 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
381412
if !self.in_progress.is_empty() {
382413
completed.push(Buffer::from_slice_ref(&self.in_progress));
383414
}
384-
let len = self.views_builder.len();
385-
let views = Buffer::from_slice_ref(self.views_builder.as_slice());
415+
let len = self.views_buffer.len();
416+
let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
386417
let views = ScalarBuffer::new(views, 0, len);
387418
let nulls = self.null_buffer_builder.finish_cloned();
388419
// SAFETY: valid by construction
@@ -396,7 +427,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
396427

397428
/// Return the allocated size of this builder in bytes, useful for memory accounting.
398429
pub fn allocated_size(&self) -> usize {
399-
let views = self.views_builder.capacity() * std::mem::size_of::<u128>();
430+
let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
400431
let null = self.null_buffer_builder.allocated_size();
401432
let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
402433
let in_progress = self.in_progress.capacity();
@@ -418,7 +449,7 @@ impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
418449
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419450
write!(f, "{}ViewBuilder", T::PREFIX)?;
420451
f.debug_struct("")
421-
.field("views_builder", &self.views_builder)
452+
.field("views_buffer", &self.views_buffer)
422453
.field("in_progress", &self.in_progress)
423454
.field("completed", &self.completed)
424455
.field("null_buffer_builder", &self.null_buffer_builder)

arrow-select/src/coalesce.rs

Lines changed: 73 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
//! [`filter`]: crate::filter::filter
2222
//! [`take`]: crate::take::take
2323
use crate::concat::concat_batches;
24-
use arrow_array::{
25-
builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions,
26-
};
24+
use arrow_array::StringViewArray;
25+
use arrow_array::{cast::AsArray, Array, ArrayRef, RecordBatch};
26+
use arrow_data::ByteView;
2727
use arrow_schema::{ArrowError, SchemaRef};
2828
use std::collections::VecDeque;
2929
use std::sync::Arc;
@@ -164,7 +164,7 @@ impl BatchCoalescer {
164164
return Ok(());
165165
}
166166

167-
let mut batch = gc_string_view_batch(&batch);
167+
let mut batch = gc_string_view_batch(batch);
168168

169169
// If pushing this batch would exceed the target batch size,
170170
// finish the current batch and start a new one
@@ -242,15 +242,19 @@ impl BatchCoalescer {
242242
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
243243
/// `StringViewArray` may only refer to a small portion of the buffer,
244244
/// significantly increasing memory usage.
245-
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
246-
let new_columns: Vec<ArrayRef> = batch
247-
.columns()
248-
.iter()
245+
fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
246+
let (schema, columns, num_rows) = batch.into_parts();
247+
let new_columns: Vec<ArrayRef> = columns
248+
.into_iter()
249249
.map(|c| {
250250
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
251251
let Some(s) = c.as_string_view_opt() else {
252-
return Arc::clone(c);
252+
return c;
253253
};
254+
if s.data_buffers().is_empty() {
255+
// If there are no data buffers, we can just return the array as is
256+
return c;
257+
}
254258
let ideal_buffer_size: usize = s
255259
.views()
256260
.iter()
@@ -264,42 +268,73 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
264268
})
265269
.sum();
266270
let actual_buffer_size = s.get_buffer_memory_size();
271+
let buffers = s.data_buffers();
267272

268273
// Re-creating the array copies data and can be time consuming.
269274
// We only do it if the array is sparse
270275
if actual_buffer_size > (ideal_buffer_size * 2) {
276+
if ideal_buffer_size == 0 {
277+
// If the ideal buffer size is 0, all views are inlined
278+
// so just reuse the views
279+
return Arc::new(unsafe {
280+
StringViewArray::new_unchecked(
281+
s.views().clone(),
282+
vec![],
283+
s.nulls().cloned(),
284+
)
285+
});
286+
}
271287
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
272288
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
273-
let mut builder = StringViewBuilder::with_capacity(s.len());
274-
if ideal_buffer_size > 0 {
275-
builder = builder.with_fixed_block_size(ideal_buffer_size as u32);
276-
}
277-
278-
for v in s.iter() {
279-
builder.append_option(v);
280-
}
281-
282-
let gc_string = builder.finish();
283-
284-
debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
289+
let mut buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size);
290+
291+
let views: Vec<u128> = s
292+
.views()
293+
.as_ref()
294+
.iter()
295+
.cloned()
296+
.map(|v| {
297+
let mut b: ByteView = ByteView::from(v);
298+
299+
if b.length > 12 {
300+
let offset = buffer.len() as u32;
301+
buffer.extend_from_slice(
302+
buffers[b.buffer_index as usize]
303+
.get(b.offset as usize..b.offset as usize + b.length as usize)
304+
.expect("Invalid buffer slice"),
305+
);
306+
b.offset = offset;
307+
b.buffer_index = 0; // Set buffer index to 0, as we only have one buffer
308+
}
309+
310+
b.into()
311+
})
312+
.collect();
313+
314+
let buffers = if buffer.is_empty() {
315+
vec![]
316+
} else {
317+
vec![buffer.into()]
318+
};
319+
320+
let gc_string = unsafe {
321+
StringViewArray::new_unchecked(views.into(), buffers, s.nulls().cloned())
322+
};
285323

286324
Arc::new(gc_string)
287325
} else {
288-
Arc::clone(c)
326+
c
289327
}
290328
})
291329
.collect();
292-
let mut options = RecordBatchOptions::new();
293-
options = options.with_row_count(Some(batch.num_rows()));
294-
RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
295-
.expect("Failed to re-create the gc'ed record batch")
330+
unsafe { RecordBatch::new_unchecked(schema, new_columns, num_rows) }
296331
}
297332

298333
#[cfg(test)]
299334
mod tests {
300335
use super::*;
301-
use arrow_array::builder::ArrayBuilder;
302-
use arrow_array::{StringViewArray, UInt32Array};
336+
use arrow_array::builder::{ArrayBuilder, StringViewBuilder};
337+
use arrow_array::{RecordBatchOptions, StringViewArray, UInt32Array};
303338
use arrow_schema::{DataType, Field, Schema};
304339
use std::ops::Range;
305340

@@ -518,9 +553,11 @@ mod tests {
518553
fn test_gc_string_view_test_batch_empty() {
519554
let schema = Schema::empty();
520555
let batch = RecordBatch::new_empty(schema.into());
521-
let output_batch = gc_string_view_batch(&batch);
522-
assert_eq!(batch.num_columns(), output_batch.num_columns());
523-
assert_eq!(batch.num_rows(), output_batch.num_rows());
556+
let cols = batch.num_columns();
557+
let num_rows = batch.num_rows();
558+
let output_batch = gc_string_view_batch(batch);
559+
assert_eq!(cols, output_batch.num_columns());
560+
assert_eq!(num_rows, output_batch.num_rows());
524561
}
525562

526563
#[test]
@@ -568,9 +605,11 @@ mod tests {
568605
/// and ensures the number of rows are the same
569606
fn do_gc(array: StringViewArray) -> StringViewArray {
570607
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
571-
let gc_batch = gc_string_view_batch(&batch);
572-
assert_eq!(batch.num_rows(), gc_batch.num_rows());
573-
assert_eq!(batch.schema(), gc_batch.schema());
608+
let rows = batch.num_rows();
609+
let schema = batch.schema();
610+
let gc_batch = gc_string_view_batch(batch);
611+
assert_eq!(rows, gc_batch.num_rows());
612+
assert_eq!(schema, gc_batch.schema());
574613
gc_batch
575614
.column(0)
576615
.as_any()

arrow-select/src/concat.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
//! ```
3232
3333
use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
34-
use arrow_array::builder::{BooleanBuilder, GenericByteBuilder, PrimitiveBuilder};
34+
use arrow_array::builder::{
35+
BooleanBuilder, GenericByteBuilder, GenericByteViewBuilder, PrimitiveBuilder,
36+
};
3537
use arrow_array::cast::AsArray;
3638
use arrow_array::types::*;
3739
use arrow_array::*;
@@ -84,6 +86,15 @@ fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capa
8486
}
8587
}
8688

89+
fn concat_byte_view<B: ByteViewType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
90+
let mut builder =
91+
GenericByteViewBuilder::<B>::with_capacity(arrays.iter().map(|a| a.len()).sum());
92+
for &array in arrays.iter() {
93+
builder.append_array(array.as_byte_view());
94+
}
95+
Ok(Arc::new(builder.finish()))
96+
}
97+
8798
fn concat_dictionaries<K: ArrowDictionaryKeyType>(
8899
arrays: &[&dyn Array],
89100
) -> Result<ArrayRef, ArrowError> {
@@ -425,6 +436,8 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
425436
_ => unreachable!("Unsupported run end index type: {r:?}"),
426437
}
427438
}
439+
DataType::Utf8View => concat_byte_view::<StringViewType>(arrays),
440+
DataType::BinaryView => concat_byte_view::<BinaryViewType>(arrays),
428441
_ => {
429442
let capacity = get_capacity(arrays, d);
430443
concat_fallback(arrays, capacity)

0 commit comments

Comments
 (0)