Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 83 additions & 45 deletions arrow/benches/coalesce_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,33 @@ fn add_all_filter_benchmarks(c: &mut Criterion) {
num_output_batches: 50,
null_density,
selectivity,
max_string_len: 30,
schema: &single_schema,
}
.build();

// Model mostly short strings, but some longer ones
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously all the benchmarks used a max size of 30. Now I have 20 (12/20 = 60% will be inlined views) and 128 where only 12/128 ~ 1% will be inlined views.

FilterBenchmarkBuilder {
c,
name: "mixed_utf8view",
name: "mixed_utf8view (max_string_len=20)",
batch_size,
num_output_batches: 20,
null_density,
selectivity,
max_string_len: 20,
schema: &mixed_utf8view_schema,
}
.build();

// Model mostly longer strings
FilterBenchmarkBuilder {
c,
name: "mixed_utf8view (max_string_len=128)",
batch_size,
num_output_batches: 20,
null_density,
selectivity,
max_string_len: 128,
schema: &mixed_utf8view_schema,
}
.build();
Expand All @@ -99,6 +115,7 @@ fn add_all_filter_benchmarks(c: &mut Criterion) {
num_output_batches: 20,
null_density,
selectivity,
max_string_len: 30,
schema: &mixed_utf8_schema,
}
.build();
Expand All @@ -110,6 +127,7 @@ fn add_all_filter_benchmarks(c: &mut Criterion) {
num_output_batches: 10,
null_density,
selectivity,
max_string_len: 30,
schema: &mixed_dict_schema,
}
.build();
Expand All @@ -134,6 +152,12 @@ struct FilterBenchmarkBuilder<'a> {
null_density: f32,
/// between 0.0 .. 1.0, percent of rows that should be kept by the filter
selectivity: f32,
/// The maximum length of strings in the data stream
///
/// For StringViewArray, strings <= 12 bytes are stored inline, longer
/// strings are stored in a separate buffer so it is important to vary to
/// mix the relative paths
max_string_len: usize,
/// Schema of the data stream
schema: &'a SchemaRef,
}
Expand All @@ -147,6 +171,7 @@ impl FilterBenchmarkBuilder<'_> {
num_output_batches,
null_density,
selectivity,
max_string_len,
schema,
} = self;

Expand All @@ -159,6 +184,7 @@ impl FilterBenchmarkBuilder<'_> {
let data = DataStreamBuilder::new(Arc::clone(schema))
.with_batch_size(batch_size)
.with_null_density(null_density)
.with_max_string_len(max_string_len)
.build();

// Keep feeding the filter stream into the coalescer until we hit a total number of output batches
Expand Down Expand Up @@ -308,12 +334,13 @@ impl DataStream {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct DataStreamBuilder {
schema: SchemaRef,
batch_size: usize,
null_density: f32,
num_batches: usize, // number of unique batches to create
num_batches: usize, // number of unique batches to create
max_string_len: usize, // maximum length of strings in the data stream
}

impl DataStreamBuilder {
Expand All @@ -323,6 +350,7 @@ impl DataStreamBuilder {
batch_size: 8192,
null_density: 0.0,
num_batches: 10,
max_string_len: 30,
}
}

Expand All @@ -339,65 +367,75 @@ impl DataStreamBuilder {
self
}

fn with_max_string_len(mut self, max_string_len: usize) -> Self {
self.max_string_len = max_string_len;
self
}

/// build the data stream (not implemented yet)
fn build(self) -> DataStream {
let Self {
schema,
batch_size,
null_density,
num_batches,
} = self;

let batches = (0..num_batches)
let batches = (0..self.num_batches)
.map(|seed| {
let columns = schema
let columns = self
.schema
.fields()
.iter()
.map(|field| create_input_array(field, batch_size, null_density, seed as u64))
.map(|field| self.create_input_array(field, seed as u64))
.collect::<Vec<_>>();
RecordBatch::try_new(schema.clone(), columns).unwrap()
RecordBatch::try_new(self.schema.clone(), columns).unwrap()
})
.collect::<Vec<_>>();

let Self {
schema,
batch_size,
null_density: _,
num_batches: _,
max_string_len: _,
} = self;

DataStream {
schema,
index: 0,
batch_size,
batches: Arc::from(batches),
}
}
}

fn create_input_array(field: &Field, batch_size: usize, null_density: f32, seed: u64) -> ArrayRef {
let max_string_len = 30;
match field.data_type() {
DataType::Int32 => Arc::new(create_primitive_array_with_seed::<Int32Type>(
batch_size,
null_density,
seed,
)),
DataType::Float64 => Arc::new(create_primitive_array_with_seed::<Float64Type>(
batch_size,
null_density,
seed,
)),
DataType::Utf8 => Arc::new(create_string_array::<i32>(batch_size, null_density)), // TODO seed
DataType::Utf8View => {
Arc::new(create_string_view_array_with_max_len(
batch_size,
null_density,
max_string_len,
)) // TODO seed
}
DataType::Dictionary(key_type, value_type)
if key_type.as_ref() == &DataType::Int32 && value_type.as_ref() == &DataType::Utf8 =>
{
Arc::new(create_string_dict_array::<Int32Type>(
batch_size,
null_density,
max_string_len,
)) // TODO seed
fn create_input_array(&self, field: &Field, seed: u64) -> ArrayRef {
match field.data_type() {
DataType::Int32 => Arc::new(create_primitive_array_with_seed::<Int32Type>(
self.batch_size,
self.null_density,
seed,
)),
DataType::Float64 => Arc::new(create_primitive_array_with_seed::<Float64Type>(
self.batch_size,
self.null_density,
seed,
)),
DataType::Utf8 => Arc::new(create_string_array::<i32>(
self.batch_size,
self.null_density,
)), // TODO seed
DataType::Utf8View => {
Arc::new(create_string_view_array_with_max_len(
self.batch_size,
self.null_density,
self.max_string_len,
)) // TODO seed
}
DataType::Dictionary(key_type, value_type)
if key_type.as_ref() == &DataType::Int32
&& value_type.as_ref() == &DataType::Utf8 =>
{
Arc::new(create_string_dict_array::<Int32Type>(
self.batch_size,
self.null_density,
self.max_string_len,
)) // TODO seed
}
_ => panic!("Unsupported data type: {field:?}"),
}
_ => panic!("Unsupported data type: {field:?}"),
}
}
Loading