Skip to content

Commit e33d560

Browse files
authored
feat: Implement basic version of RLIKE (apache#734)
1 parent 25957dd commit e33d560

File tree

16 files changed

+461
-28
lines changed

16 files changed

+461
-28
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,13 @@ object CometConf extends ShimCometConf {
410410
.booleanConf
411411
.createWithDefault(false)
412412

413+
val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
414+
conf("spark.comet.regexp.allowIncompatible")
415+
.doc("Comet is not currently fully compatible with Spark for all regular expressions. " +
416+
"Set this config to true to allow them anyway using Rust's regular expression engine. " +
417+
"See compatibility guide for more information.")
418+
.booleanConf
419+
.createWithDefault(false)
413420
}
414421

415422
object ConfigHelpers {

docs/source/user-guide/compatibility.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ be used in production.
3232

3333
There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.
3434

35+
## Regular Expressions
36+
37+
Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
38+
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
39+
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
40+
3541
## Cast
3642

3743
Cast operations in Comet fall into three levels of support:

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ Comet provides the following configuration settings.
4444
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
4545
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
4646
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false |
47+
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
4748
| spark.comet.rowToColumnar.supportedOperatorList | A comma-separated list of row-based operators that will be converted to columnar format when 'spark.comet.rowToColumnar.enabled' is true | Range,InMemoryTableScan |
4849
| spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true |
4950
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false |

docs/templates/compatibility-template.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ be used in production.
3232

3333
There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.
3434

35+
## Regular Expressions
36+
37+
Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
38+
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
39+
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
40+
3541
## Cast
3642

3743
Cast operations in Comet fall into three levels of support:

native/core/src/execution/datafusion/expressions/strings.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,6 @@ make_predicate_function!(EndsWith, ends_with_dyn, ends_with_utf8_scalar_dyn);
143143

144144
make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn);
145145

146-
// make_predicate_function!(RLike, rlike_dyn, rlike_utf8_scalar_dyn);
147-
148146
#[derive(Debug, Hash)]
149147
pub struct SubstringExpr {
150148
pub child: Arc<dyn PhysicalExpr>,

native/core/src/execution/datafusion/planner.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use datafusion_expr::expr::find_df_window_func;
5757
use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition};
5858
use datafusion_physical_expr::window::WindowExpr;
5959
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
60+
use datafusion_physical_expr_common::expressions::Literal;
6061
use itertools::Itertools;
6162
use jni::objects::GlobalRef;
6263
use num::{BigInt, ToPrimitive};
@@ -108,7 +109,7 @@ use datafusion_comet_proto::{
108109
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
109110
};
110111
use datafusion_comet_spark_expr::{
111-
Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr,
112+
Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr,
112113
};
113114

114115
// For clippy error on type_complexity.
@@ -447,6 +448,18 @@ impl PhysicalPlanner {
447448

448449
Ok(Arc::new(Like::new(left, right)))
449450
}
451+
ExprStruct::Rlike(expr) => {
452+
let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?;
453+
let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?;
454+
match right.as_any().downcast_ref::<Literal>().unwrap().value() {
455+
ScalarValue::Utf8(Some(pattern)) => {
456+
Ok(Arc::new(RLike::try_new(left, pattern)?))
457+
}
458+
_ => Err(ExecutionError::GeneralError(
459+
"RLike only supports scalar patterns".to_string(),
460+
)),
461+
}
462+
}
450463
ExprStruct::CheckOverflow(expr) => {
451464
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
452465
let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap());

native/proto/src/proto/expr.proto

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ message Expr {
5454
StartsWith startsWith = 27;
5555
EndsWith endsWith = 28;
5656
Contains contains = 29;
57-
// RLike rlike = 30;
57+
RLike rlike = 30;
5858
ScalarFunc scalarFunc = 31;
5959
EqualNullSafe eqNullSafe = 32;
6060
NotEqualNullSafe neqNullSafe = 33;
@@ -368,10 +368,10 @@ message Like {
368368
Expr right = 2;
369369
}
370370

371-
// message RLike {
372-
// Expr left = 1;
373-
// Expr right = 2;
374-
// }
371+
message RLike {
372+
Expr left = 1;
373+
Expr right = 2;
374+
}
375375

376376
message StartsWith {
377377
Expr left = 1;

native/spark-expr/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod error;
2020
mod if_expr;
2121

2222
mod kernels;
23+
mod regexp;
2324
pub mod scalar_funcs;
2425
pub mod spark_hash;
2526
mod temporal;
@@ -30,6 +31,7 @@ mod xxhash64;
3031
pub use cast::{spark_cast, Cast};
3132
pub use error::{SparkError, SparkResult};
3233
pub use if_expr::IfExpr;
34+
pub use regexp::RLike;
3335
pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr, TimestampTruncExpr};
3436

3537
/// Spark supports three evaluation modes when evaluating expressions, which affect

native/spark-expr/src/regexp.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::utils::down_cast_any_ref;
19+
use crate::SparkError;
20+
use arrow::compute::take;
21+
use arrow_array::builder::BooleanBuilder;
22+
use arrow_array::types::Int32Type;
23+
use arrow_array::{Array, BooleanArray, DictionaryArray, RecordBatch, StringArray};
24+
use arrow_schema::{DataType, Schema};
25+
use datafusion_common::{internal_err, Result};
26+
use datafusion_expr::ColumnarValue;
27+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
28+
use regex::Regex;
29+
use std::any::Any;
30+
use std::fmt::{Display, Formatter};
31+
use std::hash::{Hash, Hasher};
32+
use std::sync::Arc;
33+
34+
/// Implementation of RLIKE operator.
35+
///
36+
/// Note that this implementation is not yet Spark-compatible and simply delegates to
37+
/// the Rust regexp crate. It will match Spark behavior for some simple cases but has
38+
/// differences in whitespace handling and does not support all the features of Java's
39+
/// regular expression engine, which are documented at:
40+
///
41+
/// https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html
42+
#[derive(Debug)]
43+
pub struct RLike {
44+
child: Arc<dyn PhysicalExpr>,
45+
// Only scalar patterns are supported
46+
pattern_str: String,
47+
pattern: Regex,
48+
}
49+
50+
impl Hash for RLike {
51+
fn hash<H: Hasher>(&self, state: &mut H) {
52+
state.write(self.pattern_str.as_bytes());
53+
}
54+
}
55+
56+
impl RLike {
57+
pub fn try_new(child: Arc<dyn PhysicalExpr>, pattern: &str) -> Result<Self> {
58+
Ok(Self {
59+
child,
60+
pattern_str: pattern.to_string(),
61+
pattern: Regex::new(pattern).map_err(|e| {
62+
SparkError::Internal(format!("Failed to compile pattern {}: {}", pattern, e))
63+
})?,
64+
})
65+
}
66+
67+
fn is_match(&self, inputs: &StringArray) -> BooleanArray {
68+
let mut builder = BooleanBuilder::with_capacity(inputs.len());
69+
if inputs.is_nullable() {
70+
for i in 0..inputs.len() {
71+
if inputs.is_null(i) {
72+
builder.append_null();
73+
} else {
74+
builder.append_value(self.pattern.is_match(inputs.value(i)));
75+
}
76+
}
77+
} else {
78+
for i in 0..inputs.len() {
79+
builder.append_value(self.pattern.is_match(inputs.value(i)));
80+
}
81+
}
82+
builder.finish()
83+
}
84+
}
85+
86+
impl Display for RLike {
87+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
88+
write!(
89+
f,
90+
"RLike [child: {}, pattern: {}] ",
91+
self.child, self.pattern_str
92+
)
93+
}
94+
}
95+
96+
impl PartialEq<dyn Any> for RLike {
97+
fn eq(&self, other: &dyn Any) -> bool {
98+
down_cast_any_ref(other)
99+
.downcast_ref::<Self>()
100+
.map(|x| self.child.eq(&x.child) && self.pattern_str.eq(&x.pattern_str))
101+
.unwrap_or(false)
102+
}
103+
}
104+
105+
impl PhysicalExpr for RLike {
106+
fn as_any(&self) -> &dyn Any {
107+
self
108+
}
109+
110+
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
111+
Ok(DataType::Boolean)
112+
}
113+
114+
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
115+
self.child.nullable(input_schema)
116+
}
117+
118+
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
119+
match self.child.evaluate(batch)? {
120+
ColumnarValue::Array(array) if array.as_any().is::<DictionaryArray<Int32Type>>() => {
121+
let dict_array = array
122+
.as_any()
123+
.downcast_ref::<DictionaryArray<Int32Type>>()
124+
.expect("dict array");
125+
let dict_values = dict_array
126+
.values()
127+
.as_any()
128+
.downcast_ref::<StringArray>()
129+
.expect("strings");
130+
// evaluate the regexp pattern against the dictionary values
131+
let new_values = self.is_match(dict_values);
132+
// convert to conventional (not dictionary-encoded) array
133+
let result = take(&new_values, dict_array.keys(), None)?;
134+
Ok(ColumnarValue::Array(result))
135+
}
136+
ColumnarValue::Array(array) => {
137+
let inputs = array
138+
.as_any()
139+
.downcast_ref::<StringArray>()
140+
.expect("string array");
141+
let array = self.is_match(inputs);
142+
Ok(ColumnarValue::Array(Arc::new(array)))
143+
}
144+
ColumnarValue::Scalar(_) => {
145+
internal_err!("non scalar regexp patterns are not supported")
146+
}
147+
}
148+
}
149+
150+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
151+
vec![&self.child]
152+
}
153+
154+
fn with_new_children(
155+
self: Arc<Self>,
156+
children: Vec<Arc<dyn PhysicalExpr>>,
157+
) -> Result<Arc<dyn PhysicalExpr>> {
158+
assert!(children.len() == 1);
159+
Ok(Arc::new(RLike::try_new(
160+
children[0].clone(),
161+
&self.pattern_str,
162+
)?))
163+
}
164+
165+
fn dyn_hash(&self, state: &mut dyn Hasher) {
166+
use std::hash::Hash;
167+
let mut s = state;
168+
self.hash(&mut s);
169+
}
170+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
================================================================================================
2+
Grouped Aggregate (single group key + single aggregate SUM)
3+
================================================================================================
4+
5+
OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-41-generic
6+
AMD Ryzen 9 7950X3D 16-Core Processor
7+
Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate SUM: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
8+
------------------------------------------------------------------------------------------------------------------------------------------------------------------
9+
SQL Parquet - Spark (SUM) 2663 2744 115 3.9 254.0 1.0X
10+
SQL Parquet - Comet (Scan, Exec) (SUM) 1067 1084 24 9.8 101.8 2.5X
11+
12+
13+
================================================================================================
14+
Grouped Aggregate (single group key + single aggregate COUNT)
15+
================================================================================================
16+
17+
OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-41-generic
18+
AMD Ryzen 9 7950X3D 16-Core Processor
19+
Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
20+
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
21+
SQL Parquet - Spark (COUNT) 2532 2552 28 4.1 241.5 1.0X
22+
SQL Parquet - Comet (Scan, Exec) (COUNT) 4590 4592 4 2.3 437.7 0.6X
23+
24+

0 commit comments

Comments
 (0)