Skip to content
Merged
37 changes: 37 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};

use arrow::array::{AsArray, StringViewBuilder};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use datafusion_common::Result;
use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -216,6 +218,41 @@ impl CoalesceBatchesStream {
match input_batch {
Poll::Ready(x) => match x {
Some(Ok(batch)) => {
let new_columns: Vec<Arc<dyn Array>> = batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: How about using ArrayRef instread of Arc<dyn Array> eases code readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thank you @dharanad !

.columns()
.iter()
.map(|c| {
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
if let Some(s) = c.as_string_view_opt() {
let view_cnt = s.views().len();
let buffer_size = s.get_buffer_memory_size();

// Re-creating the array copies data and can be time consuming.
// We only do it if the array is sparse, below is a heuristic to determine if the array is sparse.
if buffer_size > (view_cnt * 32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are lots of long strings in the buffer, will it be triggered every time even if nothing has been filtered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a rough heuristic to test whether the buffer is dense.
In fact, there's no way for us to know if anything has been filtered in CoalesceBatchesExec -- we only know this in downstream operators like FilterExec. The fact that we have a CoalesceBatchesExec here means that it is likely that the batch has been sparse and thus requires gc.

// We use a block size of 2MB (instead of 8KB) to reduce the number of buffers to track.
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
let mut builder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here builder didn't use the deduplication mechanism, is that the case we can assume that StringViewArray is already deduplicated before coalescing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deduplication hashes the string values, which has quite high overhead. Here we are processing small batches (default size 8192) and then concatenating them to a larger batch. Deduplicating on small batches gives us small benefits.

StringViewBuilder::with_capacity(s.len())
.with_block_size(1024 * 1024 * 2);

for v in s.iter() {
builder.append_option(v);
}

let gc_string = builder.finish();

Arc::new(gc_string)
} else {
Arc::clone(c)
}
} else {
Arc::clone(c)
}
})
.collect();
let batch = RecordBatch::try_new(batch.schema(), new_columns)?;

if batch.num_rows() >= self.target_batch_size
&& self.buffer.is_empty()
{
Expand Down