From 21278a85e3c7b3cf75de78b29caaccf69c050da4 Mon Sep 17 00:00:00 2001 From: Tai Le Manh Date: Thu, 15 Aug 2024 18:17:48 +0700 Subject: [PATCH 1/3] Improve performance of REPEAT functions Signed-off-by: Tai Le Manh --- datafusion/functions/Cargo.toml | 5 + datafusion/functions/benches/repeat.rs | 129 ++++++++++++++++++++++ datafusion/functions/src/string/repeat.rs | 91 ++++++++------- 3 files changed, 188 insertions(+), 37 deletions(-) create mode 100644 datafusion/functions/benches/repeat.rs diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 9675d03a0161..64a219259fad 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -146,3 +146,8 @@ required-features = ["string_expressions"] harness = false name = "upper" required-features = ["string_expressions"] + +[[bench]] +harness = false +name = "repeat" +required-features = ["string_expressions"] diff --git a/datafusion/functions/benches/repeat.rs b/datafusion/functions/benches/repeat.rs new file mode 100644 index 000000000000..36e5b593b5c4 --- /dev/null +++ b/datafusion/functions/benches/repeat.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::array::{ArrayRef, Int64Array, OffsetSizeTrait}; +use arrow::util::bench_util::{ + create_string_array_with_len, create_string_view_array_with_len, +}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion_expr::ColumnarValue; +use datafusion_functions::string; +use std::sync::Arc; + +fn create_args( + size: usize, + str_len: usize, + repeat_times: i64, + use_string_view: bool, +) -> Vec { + let number_array = Arc::new(Int64Array::from( + (0..size).map(|_| repeat_times).collect::>(), + )); + + if use_string_view { + let string_array = + Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); + vec![ + ColumnarValue::Array(string_array), + ColumnarValue::Array(number_array), + ] + } else { + let string_array = + Arc::new(create_string_array_with_len::(size, 0.1, str_len)); + + vec![ + ColumnarValue::Array(string_array), + ColumnarValue::Array(Arc::clone(&number_array) as ArrayRef), + ] + } +} + +fn criterion_benchmark(c: &mut Criterion) { + let lower = string::lower(); + for size in [1024, 8192] { + // REPEAT 3 TIMES + let repeat_times = 5; + let mut group = c.benchmark_group(format!("repeat {} times", repeat_times)); + + let args = create_args::(size, 32, repeat_times, true); + group.bench_function( + &format!( + "repeat_string_view [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(lower.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(lower.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_large_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(lower.invoke(&args))), + ); + + group.finish(); + + // REPEAT 30 TIMES + let repeat_times = 50; + let mut group = c.benchmark_group(format!("repeat {} times", repeat_times)); + + let args = create_args::(size, 32, repeat_times, true); + group.bench_function( + &format!( + "repeat_string_view [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(lower.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(lower.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_large_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(lower.invoke(&args))), + ); + + group.finish(); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/string/repeat.rs b/datafusion/functions/src/string/repeat.rs index a377dee06f41..eaeaa79bee32 100644 --- a/datafusion/functions/src/string/repeat.rs +++ b/datafusion/functions/src/string/repeat.rs @@ -18,12 +18,14 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait, StringArray}; +use arrow::array::{ + ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, + GenericStringBuilder, Int64Array, OffsetSizeTrait, StringViewArray, +}; use arrow::datatypes::DataType; +use arrow::datatypes::DataType::{Int64, LargeUtf8, Utf8, Utf8View}; -use datafusion_common::cast::{ - as_generic_string_array, as_int64_array, as_string_view_array, -}; +use datafusion_common::cast::as_int64_array; use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::*; use datafusion_expr::{ColumnarValue, Volatility}; @@ -44,7 +46,6 @@ impl Default for RepeatFunc { impl RepeatFunc { pub fn new() -> Self { - use DataType::*; Self { signature: Signature::one_of( vec![ @@ -79,50 +80,66 @@ impl ScalarUDFImpl for RepeatFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - match args[0].data_type() { - DataType::Utf8View => make_scalar_function(repeat_utf8view, vec![])(args), - DataType::Utf8 => make_scalar_function(repeat::, vec![])(args), - DataType::LargeUtf8 => make_scalar_function(repeat::, vec![])(args), - other => exec_err!("Unsupported data type {other:?} for function repeat. Expected Utf8, Utf8View or LargeUtf8"), - } + make_scalar_function(repeat, vec![])(args) } } /// Repeats string the specified number of times. /// repeat('Pg', 4) = 'PgPgPgPg' -fn repeat(args: &[ArrayRef]) -> Result { - let string_array = as_generic_string_array::(&args[0])?; +fn repeat(args: &[ArrayRef]) -> Result { let number_array = as_int64_array(&args[1])?; - - let result = string_array - .iter() - .zip(number_array.iter()) - .map(|(string, number)| repeat_common(string, number)) - .collect::>(); - - Ok(Arc::new(result) as ArrayRef) + match args[0].data_type() { + Utf8View => { + let string_view_array = args[0].as_string_view(); + repeat_impl::(string_view_array, number_array) + } + Utf8 => { + let string_array = args[0].as_string::(); + repeat_impl::>(string_array, number_array) + } + LargeUtf8 => { + let string_array = args[0].as_string::(); + repeat_impl::>(string_array, number_array) + } + other => exec_err!( + "Unsupported data type {other:?} for function repeat. \ + Expected Utf8, Utf8View or LargeUtf8." + ), + } } -fn repeat_utf8view(args: &[ArrayRef]) -> Result { - let string_view_array = as_string_view_array(&args[0])?; - let number_array = as_int64_array(&args[1])?; - - let result = string_view_array +fn repeat_impl<'a, T, S>(string_array: S, number_array: &Int64Array) -> Result +where + T: OffsetSizeTrait, + S: StringArrayType<'a>, +{ + let mut builder: GenericStringBuilder = GenericStringBuilder::new(); + string_array .iter() .zip(number_array.iter()) - .map(|(string, number)| repeat_common(string, number)) - .collect::(); - - Ok(Arc::new(result) as ArrayRef) + .for_each(|(string, number)| match (string, number) { + (Some(string), Some(number)) if number >= 0 => { + builder.append_value(string.repeat(number as usize)) + } + (Some(_), Some(_)) => builder.append_value(""), + _ => builder.append_null(), + }); + let array = builder.finish(); + + Ok(Arc::new(array) as ArrayRef) } -fn repeat_common(string: Option<&str>, number: Option) -> Option { - match (string, number) { - (Some(string), Some(number)) if number >= 0 => { - Some(string.repeat(number as usize)) - } - (Some(_), Some(_)) => Some("".to_string()), - _ => None, +trait StringArrayType<'a>: ArrayAccessor + Sized { + fn iter(&self) -> ArrayIter; +} +impl<'a, O: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { + fn iter(&self) -> ArrayIter { + GenericStringArray::::iter(self) + } +} +impl<'a> StringArrayType<'a> for &'a StringViewArray { + fn iter(&self) -> ArrayIter { + StringViewArray::iter(self) } } From 1ea189ebc84f78f3569e3d30aaebcb5c4f5acf31 Mon Sep 17 00:00:00 2001 From: Tai Le Manh Date: Fri, 16 Aug 2024 11:33:30 +0700 Subject: [PATCH 2/3] Improve performance of REPEAT functions Signed-off-by: Tai Le Manh --- datafusion/functions/benches/repeat.rs | 29 ++++++++++++++--------- datafusion/functions/src/string/repeat.rs | 20 +++------------- datafusion/functions/src/unicode/lpad.rs | 20 +++------------- datafusion/functions/src/utils.rs | 21 +++++++++++++++- 4 files changed, 44 insertions(+), 46 deletions(-) diff --git a/datafusion/functions/benches/repeat.rs b/datafusion/functions/benches/repeat.rs index 36e5b593b5c4..916c8374e5fb 100644 --- a/datafusion/functions/benches/repeat.rs +++ b/datafusion/functions/benches/repeat.rs @@ -21,10 +21,11 @@ use arrow::array::{ArrayRef, Int64Array, OffsetSizeTrait}; use arrow::util::bench_util::{ create_string_array_with_len, create_string_view_array_with_len, }; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use criterion::{black_box, criterion_group, criterion_main, Criterion, SamplingMode}; use datafusion_expr::ColumnarValue; use datafusion_functions::string; use std::sync::Arc; +use std::time::Duration; fn create_args( size: usize, @@ -55,11 +56,14 @@ fn create_args( } fn criterion_benchmark(c: &mut Criterion) { - let lower = string::lower(); - for size in [1024, 8192] { + let repeat = string::repeat(); + for size in [1024, 4096] { // REPEAT 3 TIMES - let repeat_times = 5; + let repeat_times = 3; let mut group = c.benchmark_group(format!("repeat {} times", repeat_times)); + group.sampling_mode(SamplingMode::Flat); + group.sample_size(10); + group.measurement_time(Duration::from_secs(10)); let args = create_args::(size, 32, repeat_times, true); group.bench_function( @@ -67,7 +71,7 @@ fn criterion_benchmark(c: &mut Criterion) { "repeat_string_view [size={}, repeat_times={}]", size, repeat_times ), - |b| b.iter(|| black_box(lower.invoke(&args))), + |b| b.iter(|| black_box(repeat.invoke(&args))), ); let args = create_args::(size, 32, repeat_times, false); @@ -76,7 +80,7 @@ fn criterion_benchmark(c: &mut Criterion) { "repeat_string [size={}, repeat_times={}]", size, repeat_times ), - |b| b.iter(|| black_box(lower.invoke(&args))), + |b| b.iter(|| black_box(repeat.invoke(&args))), ); let args = create_args::(size, 32, repeat_times, false); @@ -85,14 +89,17 @@ fn criterion_benchmark(c: &mut Criterion) { "repeat_large_string [size={}, repeat_times={}]", size, repeat_times ), - |b| b.iter(|| black_box(lower.invoke(&args))), + |b| b.iter(|| black_box(repeat.invoke(&args))), ); group.finish(); // REPEAT 30 TIMES - let repeat_times = 50; + let repeat_times = 30; let mut group = c.benchmark_group(format!("repeat {} times", repeat_times)); + group.sampling_mode(SamplingMode::Flat); + group.sample_size(10); + group.measurement_time(Duration::from_secs(10)); let args = create_args::(size, 32, repeat_times, true); group.bench_function( @@ -100,7 +107,7 @@ fn criterion_benchmark(c: &mut Criterion) { "repeat_string_view [size={}, repeat_times={}]", size, repeat_times ), - |b| b.iter(|| black_box(lower.invoke(&args))), + |b| b.iter(|| black_box(repeat.invoke(&args))), ); let args = create_args::(size, 32, repeat_times, false); @@ -109,7 +116,7 @@ fn criterion_benchmark(c: &mut Criterion) { "repeat_string [size={}, repeat_times={}]", size, repeat_times ), - |b| b.iter(|| black_box(lower.invoke(&args))), + |b| b.iter(|| black_box(repeat.invoke(&args))), ); let args = create_args::(size, 32, repeat_times, false); @@ -118,7 +125,7 @@ fn criterion_benchmark(c: &mut Criterion) { "repeat_large_string [size={}, repeat_times={}]", size, repeat_times ), - |b| b.iter(|| black_box(lower.invoke(&args))), + |b| b.iter(|| black_box(repeat.invoke(&args))), ); group.finish(); diff --git a/datafusion/functions/src/string/repeat.rs b/datafusion/functions/src/string/repeat.rs index eaeaa79bee32..30eba1f5ef73 100644 --- a/datafusion/functions/src/string/repeat.rs +++ b/datafusion/functions/src/string/repeat.rs @@ -19,8 +19,8 @@ use std::any::Any; use std::sync::Arc; use arrow::array::{ - ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, - GenericStringBuilder, Int64Array, OffsetSizeTrait, StringViewArray, + ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, + Int64Array, OffsetSizeTrait, StringViewArray, }; use arrow::datatypes::DataType; use arrow::datatypes::DataType::{Int64, LargeUtf8, Utf8, Utf8View}; @@ -31,7 +31,7 @@ use datafusion_expr::TypeSignature::*; use datafusion_expr::{ColumnarValue, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; -use crate::utils::{make_scalar_function, utf8_to_str_type}; +use crate::utils::{make_scalar_function, utf8_to_str_type, StringArrayType}; #[derive(Debug)] pub struct RepeatFunc { @@ -129,20 +129,6 @@ where Ok(Arc::new(array) as ArrayRef) } -trait StringArrayType<'a>: ArrayAccessor + Sized { - fn iter(&self) -> ArrayIter; -} -impl<'a, O: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { - fn iter(&self) -> ArrayIter { - GenericStringArray::::iter(self) - } -} -impl<'a> StringArrayType<'a> for &'a StringViewArray { - fn iter(&self) -> ArrayIter { - StringViewArray::iter(self) - } -} - #[cfg(test)] mod tests { use arrow::array::{Array, StringArray}; diff --git a/datafusion/functions/src/unicode/lpad.rs b/datafusion/functions/src/unicode/lpad.rs index 521cdc5d0ff0..50dc06cdbfb5 100644 --- a/datafusion/functions/src/unicode/lpad.rs +++ b/datafusion/functions/src/unicode/lpad.rs @@ -20,8 +20,8 @@ use std::fmt::Write; use std::sync::Arc; use arrow::array::{ - Array, ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, - GenericStringBuilder, Int64Array, OffsetSizeTrait, StringViewArray, + Array, ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, Int64Array, + OffsetSizeTrait, StringViewArray, }; use arrow::datatypes::DataType; use unicode_segmentation::UnicodeSegmentation; @@ -32,7 +32,7 @@ use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; -use crate::utils::{make_scalar_function, utf8_to_str_type}; +use crate::utils::{make_scalar_function, utf8_to_str_type, StringArrayType}; #[derive(Debug)] pub struct LPadFunc { @@ -248,20 +248,6 @@ where Ok(Arc::new(array) as ArrayRef) } -trait StringArrayType<'a>: ArrayAccessor + Sized { - fn iter(&self) -> ArrayIter; -} -impl<'a, T: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { - fn iter(&self) -> ArrayIter { - GenericStringArray::::iter(self) - } -} -impl<'a> StringArrayType<'a> for &'a StringViewArray { - fn iter(&self) -> ArrayIter { - StringViewArray::iter(self) - } -} - #[cfg(test)] mod tests { use crate::unicode::lpad::LPadFunc; diff --git a/datafusion/functions/src/utils.rs b/datafusion/functions/src/utils.rs index 7b367174006d..ca993dbe0b79 100644 --- a/datafusion/functions/src/utils.rs +++ b/datafusion/functions/src/utils.rs @@ -17,7 +17,10 @@ use std::sync::Arc; -use arrow::array::ArrayRef; +use arrow::array::{ + ArrayAccessor, ArrayIter, ArrayRef, GenericStringArray, OffsetSizeTrait, + StringViewArray, +}; use arrow::datatypes::DataType; use datafusion_common::{Result, ScalarValue}; @@ -122,6 +125,22 @@ where }) } +pub trait StringArrayType<'a>: ArrayAccessor + Sized { + fn iter(&self) -> ArrayIter; +} + +impl<'a, T: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { + fn iter(&self) -> ArrayIter { + GenericStringArray::::iter(self) + } +} + +impl<'a> StringArrayType<'a> for &'a StringViewArray { + fn iter(&self) -> ArrayIter { + StringViewArray::iter(self) + } +} + #[cfg(test)] pub mod test { /// $FUNC ScalarUDFImpl to test From 230281c5135c8135bbc44ac3ec7d5d4938b20ceb Mon Sep 17 00:00:00 2001 From: Tai Le Manh Date: Fri, 16 Aug 2024 11:46:22 +0700 Subject: [PATCH 3/3] Fix cargo fmt Signed-off-by: Tai Le Manh --- datafusion/functions/src/string/common.rs | 21 +++++++++++++++++++-- datafusion/functions/src/string/repeat.rs | 7 ++++--- datafusion/functions/src/unicode/lpad.rs | 3 ++- datafusion/functions/src/utils.rs | 21 +-------------------- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/datafusion/functions/src/string/common.rs b/datafusion/functions/src/string/common.rs index 7037c1d1c3c3..54aebb039046 100644 --- a/datafusion/functions/src/string/common.rs +++ b/datafusion/functions/src/string/common.rs @@ -19,8 +19,9 @@ use std::fmt::{Display, Formatter}; use std::sync::Arc; use arrow::array::{ - new_null_array, Array, ArrayDataBuilder, ArrayRef, GenericStringArray, - GenericStringBuilder, OffsetSizeTrait, StringArray, + new_null_array, Array, ArrayAccessor, ArrayDataBuilder, ArrayIter, ArrayRef, + GenericStringArray, GenericStringBuilder, OffsetSizeTrait, StringArray, + StringViewArray, }; use arrow::buffer::{Buffer, MutableBuffer, NullBuffer}; use arrow::datatypes::DataType; @@ -251,6 +252,22 @@ impl<'a> ColumnarValueRef<'a> { } } +pub trait StringArrayType<'a>: ArrayAccessor + Sized { + fn iter(&self) -> ArrayIter; +} + +impl<'a, T: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { + fn iter(&self) -> ArrayIter { + GenericStringArray::::iter(self) + } +} + +impl<'a> StringArrayType<'a> for &'a StringViewArray { + fn iter(&self) -> ArrayIter { + StringViewArray::iter(self) + } +} + /// Optimized version of the StringBuilder in Arrow that: /// 1. Precalculating the expected length of the result, avoiding reallocations. /// 2. Avoids creating / incrementally creating a `NullBufferBuilder` diff --git a/datafusion/functions/src/string/repeat.rs b/datafusion/functions/src/string/repeat.rs index 30eba1f5ef73..20e4462784b8 100644 --- a/datafusion/functions/src/string/repeat.rs +++ b/datafusion/functions/src/string/repeat.rs @@ -19,8 +19,8 @@ use std::any::Any; use std::sync::Arc; use arrow::array::{ - ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, - Int64Array, OffsetSizeTrait, StringViewArray, + ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, Int64Array, + OffsetSizeTrait, StringViewArray, }; use arrow::datatypes::DataType; use arrow::datatypes::DataType::{Int64, LargeUtf8, Utf8, Utf8View}; @@ -31,7 +31,8 @@ use datafusion_expr::TypeSignature::*; use datafusion_expr::{ColumnarValue, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; -use crate::utils::{make_scalar_function, utf8_to_str_type, StringArrayType}; +use crate::string::common::StringArrayType; +use crate::utils::{make_scalar_function, utf8_to_str_type}; #[derive(Debug)] pub struct RepeatFunc { diff --git a/datafusion/functions/src/unicode/lpad.rs b/datafusion/functions/src/unicode/lpad.rs index 50dc06cdbfb5..e102673c4253 100644 --- a/datafusion/functions/src/unicode/lpad.rs +++ b/datafusion/functions/src/unicode/lpad.rs @@ -32,7 +32,8 @@ use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; -use crate::utils::{make_scalar_function, utf8_to_str_type, StringArrayType}; +use crate::string::common::StringArrayType; +use crate::utils::{make_scalar_function, utf8_to_str_type}; #[derive(Debug)] pub struct LPadFunc { diff --git a/datafusion/functions/src/utils.rs b/datafusion/functions/src/utils.rs index ca993dbe0b79..7b367174006d 100644 --- a/datafusion/functions/src/utils.rs +++ b/datafusion/functions/src/utils.rs @@ -17,10 +17,7 @@ use std::sync::Arc; -use arrow::array::{ - ArrayAccessor, ArrayIter, ArrayRef, GenericStringArray, OffsetSizeTrait, - StringViewArray, -}; +use arrow::array::ArrayRef; use arrow::datatypes::DataType; use datafusion_common::{Result, ScalarValue}; @@ -125,22 +122,6 @@ where }) } -pub trait StringArrayType<'a>: ArrayAccessor + Sized { - fn iter(&self) -> ArrayIter; -} - -impl<'a, T: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { - fn iter(&self) -> ArrayIter { - GenericStringArray::::iter(self) - } -} - -impl<'a> StringArrayType<'a> for &'a StringViewArray { - fn iter(&self) -> ArrayIter { - StringViewArray::iter(self) - } -} - #[cfg(test)] pub mod test { /// $FUNC ScalarUDFImpl to test