Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions arrow/benches/coalesce_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,18 +321,45 @@ impl DataStream {
self.batch_size
}

fn next_batch(&mut self) -> &RecordBatch {
/// Return the next batch.
///
/// Note we don't inline this so it is easier to profile where the time is being
/// spent in the benchmark
#[inline(never)]
fn next_batch(&mut self) -> RecordBatch {
let current_index = self.index;
self.index += 1;
if self.index >= self.batches.len() {
self.index = 0; // loop back to the start
}
self.batches
let batch = self
.batches
.get(current_index)
.expect("No more batches available")
.expect("No more batches available");
deep_copy_batch(batch)
}
}

/// Copies the underlying buffers of a RecordBatch so that the returned
/// batch does not share any buffers with the input batch.
///
/// This models a realistic scenario where the input batch is created from
/// a data source and then is filtered / not reused.
fn deep_copy_batch(batch: &RecordBatch) -> RecordBatch {
let columns = batch
.columns()
.iter()
.map(|array| {
let src_data = array.into_data();
let mut copy = MutableArrayData::new(vec![&src_data], true, src_data.len());
copy.extend(0, 0, src_data.len());
let new_data = copy.freeze();
make_array(new_data)
})
.collect::<Vec<_>>();
RecordBatch::try_new(batch.schema(), columns).unwrap()
}

#[derive(Debug, Clone)]
struct DataStreamBuilder {
schema: SchemaRef,
Expand Down
Loading