diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 5c3ebf6fbc..07972cc6a5 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -410,6 +410,13 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = + conf("spark.comet.regexp.allowIncompatible") + .doc("Comet is not currently fully compatible with Spark for all regular expressions. " + + "Set this config to true to allow them anyway using Rust's regular expression engine. " + + "See compatibility guide for more information.") + .booleanConf + .createWithDefault(false) } object ConfigHelpers { diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index a16fd1b21a..0af44eb624 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -32,6 +32,12 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. +## Regular Expressions + +Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's +regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but +this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. + ## Cast Cast operations in Comet fall into three levels of support: diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 8a0f2440a2..7c449fa82d 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -44,6 +44,7 @@ Comet provides the following configuration settings. | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false | +| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false | | spark.comet.rowToColumnar.supportedOperatorList | A comma-separated list of row-based operators that will be converted to columnar format when 'spark.comet.rowToColumnar.enabled' is true | Range,InMemoryTableScan | | spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true | | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | diff --git a/docs/templates/compatibility-template.md b/docs/templates/compatibility-template.md index 64f8713546..137870a7ec 100644 --- a/docs/templates/compatibility-template.md +++ b/docs/templates/compatibility-template.md @@ -32,6 +32,12 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. +## Regular Expressions + +Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's +regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but +this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. + ## Cast Cast operations in Comet fall into three levels of support: diff --git a/native/core/src/execution/datafusion/expressions/strings.rs b/native/core/src/execution/datafusion/expressions/strings.rs index f7f5b02e81..9112c256bb 100644 --- a/native/core/src/execution/datafusion/expressions/strings.rs +++ b/native/core/src/execution/datafusion/expressions/strings.rs @@ -143,8 +143,6 @@ make_predicate_function!(EndsWith, ends_with_dyn, ends_with_utf8_scalar_dyn); make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn); -// make_predicate_function!(RLike, rlike_dyn, rlike_utf8_scalar_dyn); - #[derive(Debug, Hash)] pub struct SubstringExpr { pub child: Arc, diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index c283cebbac..02ebe2755f 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -59,6 +59,7 @@ use datafusion_expr::expr::find_df_window_func; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr_common::aggregate::create_aggregate_expr; +use datafusion_physical_expr_common::expressions::Literal; use itertools::Itertools; use jni::objects::GlobalRef; use num::{BigInt, ToPrimitive}; @@ -108,7 +109,7 @@ use datafusion_comet_proto::{ spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; use datafusion_comet_spark_expr::{ - Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr, + Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr, }; // For clippy error on type_complexity. @@ -447,6 +448,18 @@ impl PhysicalPlanner { Ok(Arc::new(Like::new(left, right))) } + ExprStruct::Rlike(expr) => { + let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + match right.as_any().downcast_ref::().unwrap().value() { + ScalarValue::Utf8(Some(pattern)) => { + Ok(Arc::new(RLike::try_new(left, pattern)?)) + } + _ => Err(ExecutionError::GeneralError( + "RLike only supports scalar patterns".to_string(), + )), + } + } ExprStruct::CheckOverflow(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 56518d9eed..158356dfae 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -54,7 +54,7 @@ message Expr { StartsWith startsWith = 27; EndsWith endsWith = 28; Contains contains = 29; - // RLike rlike = 30; + RLike rlike = 30; ScalarFunc scalarFunc = 31; EqualNullSafe eqNullSafe = 32; NotEqualNullSafe neqNullSafe = 33; @@ -368,10 +368,10 @@ message Like { Expr right = 2; } -// message RLike { -// Expr left = 1; -// Expr right = 2; -// } +message RLike { + Expr left = 1; + Expr right = 2; +} message StartsWith { Expr left = 1; diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 14ab080b46..c9606233d0 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -20,6 +20,7 @@ mod error; mod if_expr; mod kernels; +mod regexp; pub mod scalar_funcs; pub mod spark_hash; mod temporal; @@ -30,6 +31,7 @@ mod xxhash64; pub use cast::{spark_cast, Cast}; pub use error::{SparkError, SparkResult}; pub use if_expr::IfExpr; +pub use regexp::RLike; pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr, TimestampTruncExpr}; /// Spark supports three evaluation modes when evaluating expressions, which affect diff --git a/native/spark-expr/src/regexp.rs b/native/spark-expr/src/regexp.rs new file mode 100644 index 0000000000..2672d754f1 --- /dev/null +++ b/native/spark-expr/src/regexp.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::utils::down_cast_any_ref; +use crate::SparkError; +use arrow::compute::take; +use arrow_array::builder::BooleanBuilder; +use arrow_array::types::Int32Type; +use arrow_array::{Array, BooleanArray, DictionaryArray, RecordBatch, StringArray}; +use arrow_schema::{DataType, Schema}; +use datafusion_common::{internal_err, Result}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use regex::Regex; +use std::any::Any; +use std::fmt::{Display, Formatter}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +/// Implementation of RLIKE operator. +/// +/// Note that this implementation is not yet Spark-compatible and simply delegates to +/// the Rust regexp crate. It will match Spark behavior for some simple cases but has +/// differences in whitespace handling and does not support all the features of Java's +/// regular expression engine, which are documented at: +/// +/// https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html +#[derive(Debug)] +pub struct RLike { + child: Arc, + // Only scalar patterns are supported + pattern_str: String, + pattern: Regex, +} + +impl Hash for RLike { + fn hash(&self, state: &mut H) { + state.write(self.pattern_str.as_bytes()); + } +} + +impl RLike { + pub fn try_new(child: Arc, pattern: &str) -> Result { + Ok(Self { + child, + pattern_str: pattern.to_string(), + pattern: Regex::new(pattern).map_err(|e| { + SparkError::Internal(format!("Failed to compile pattern {}: {}", pattern, e)) + })?, + }) + } + + fn is_match(&self, inputs: &StringArray) -> BooleanArray { + let mut builder = BooleanBuilder::with_capacity(inputs.len()); + if inputs.is_nullable() { + for i in 0..inputs.len() { + if inputs.is_null(i) { + builder.append_null(); + } else { + builder.append_value(self.pattern.is_match(inputs.value(i))); + } + } + } else { + for i in 0..inputs.len() { + builder.append_value(self.pattern.is_match(inputs.value(i))); + } + } + builder.finish() + } +} + +impl Display for RLike { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RLike [child: {}, pattern: {}] ", + self.child, self.pattern_str + ) + } +} + +impl PartialEq for RLike { + fn eq(&self, other: &dyn Any) -> bool { + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| self.child.eq(&x.child) && self.pattern_str.eq(&x.pattern_str)) + .unwrap_or(false) + } +} + +impl PhysicalExpr for RLike { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(DataType::Boolean) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + self.child.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + match self.child.evaluate(batch)? { + ColumnarValue::Array(array) if array.as_any().is::>() => { + let dict_array = array + .as_any() + .downcast_ref::>() + .expect("dict array"); + let dict_values = dict_array + .values() + .as_any() + .downcast_ref::() + .expect("strings"); + // evaluate the regexp pattern against the dictionary values + let new_values = self.is_match(dict_values); + // convert to conventional (not dictionary-encoded) array + let result = take(&new_values, dict_array.keys(), None)?; + Ok(ColumnarValue::Array(result)) + } + ColumnarValue::Array(array) => { + let inputs = array + .as_any() + .downcast_ref::() + .expect("string array"); + let array = self.is_match(inputs); + Ok(ColumnarValue::Array(Arc::new(array))) + } + ColumnarValue::Scalar(_) => { + internal_err!("non scalar regexp patterns are not supported") + } + } + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert!(children.len() == 1); + Ok(Arc::new(RLike::try_new( + children[0].clone(), + &self.pattern_str, + )?)) + } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + use std::hash::Hash; + let mut s = state; + self.hash(&mut s); + } +} diff --git a/spark/benchmarks/CometAggregateBenchmark-jdk11-results.txt b/spark/benchmarks/CometAggregateBenchmark-jdk11-results.txt new file mode 100644 index 0000000000..9e3e15bc67 --- /dev/null +++ b/spark/benchmarks/CometAggregateBenchmark-jdk11-results.txt @@ -0,0 +1,24 @@ +================================================================================================ +Grouped Aggregate (single group key + single aggregate SUM) +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-41-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate SUM: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------------------------------------ +SQL Parquet - Spark (SUM) 2663 2744 115 3.9 254.0 1.0X +SQL Parquet - Comet (Scan, Exec) (SUM) 1067 1084 24 9.8 101.8 2.5X + + +================================================================================================ +Grouped Aggregate (single group key + single aggregate COUNT) +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-41-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 2532 2552 28 4.1 241.5 1.0X +SQL Parquet - Comet (Scan, Exec) (COUNT) 4590 4592 4 2.3 437.7 0.6X + + diff --git a/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala new file mode 100644 index 0000000000..8dc0c828c7 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/expressions/RegExp.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.expressions + +object RegExp { + + /** Determine whether the regexp pattern is supported natively and compatible with Spark */ + def isSupportedPattern(pattern: String): Boolean = { + // this is a placeholder for implementing logic to determine if the pattern + // is known to be compatible with Spark, so that we can enable regexp automatically + // for common cases and fallback to Spark for more complex cases + false + } + +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e06405a1a2..85ae56ee6e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -44,7 +44,7 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isCometOperatorEnabled, isCometScan, isSpark34Plus, withInfo} -import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, Unsupported} +import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible, Incompatible, RegExp, Unsupported} import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType.{DataTypeInfo, DecimalInfo, ListInfo, MapInfo, StructInfo} import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, BuildSide, JoinType, Operator} @@ -1235,25 +1235,41 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - // TODO waiting for arrow-rs update -// case RLike(left, right) => -// val leftExpr = exprToProtoInternal(left, inputs) -// val rightExpr = exprToProtoInternal(right, inputs) -// -// if (leftExpr.isDefined && rightExpr.isDefined) { -// val builder = ExprOuterClass.RLike.newBuilder() -// builder.setLeft(leftExpr.get) -// builder.setRight(rightExpr.get) -// -// Some( -// ExprOuterClass.Expr -// .newBuilder() -// .setRlike(builder) -// .build()) -// } else { -// None -// } + case RLike(left, right) => + // we currently only support scalar regex patterns + right match { + case Literal(pattern, DataTypes.StringType) => + if (!RegExp.isSupportedPattern(pattern.toString) && + !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { + withInfo( + expr, + s"Regexp pattern $pattern is not compatible with Spark. " + + s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " + + "to allow it anyway.") + return None + } + case _ => + withInfo(expr, "Only scalar regexp patterns are supported") + return None + } + + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) + + if (leftExpr.isDefined && rightExpr.isDefined) { + val builder = ExprOuterClass.RLike.newBuilder() + builder.setLeft(leftExpr.get) + builder.setRight(rightExpr.get) + Some( + ExprOuterClass.Expr + .newBuilder() + .setRlike(builder) + .build()) + } else { + withInfo(expr, left, right) + None + } case StartsWith(left, right) => val leftExpr = exprToProtoInternal(left, inputs) val rightExpr = exprToProtoInternal(right, inputs) diff --git a/spark/src/test/resources/tpcds-micro-benchmarks/rlike.sql b/spark/src/test/resources/tpcds-micro-benchmarks/rlike.sql new file mode 100644 index 0000000000..710379d6b3 --- /dev/null +++ b/spark/src/test/resources/tpcds-micro-benchmarks/rlike.sql @@ -0,0 +1,20 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- This is not part of TPC-DS but runs on TPC-DS data + +SELECT i_item_desc, i_item_desc RLIKE '[A-Z][aeiou]+' FROM item; \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index c22c6b06af..dd02be4f20 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -23,6 +23,7 @@ import java.time.{Duration, Period} import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag +import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.{CometTestBase, DataFrame, Row} @@ -617,6 +618,136 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("rlike simple case") { + val table = "rlike_names" + Seq(false, true).foreach { withDictionary => + val data = Seq("James Smith", "Michael Rose", "Rames Rose", "Rames rose") ++ + // add repetitive data to trigger dictionary encoding + Range(0, 100).map(_ => "John Smith") + withParquetFile(data.zipWithIndex, withDictionary) { file => + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + spark.read.parquet(file).createOrReplaceTempView(table) + val query = sql(s"select _2 as id, _1 rlike 'R[a-z]+s [Rr]ose' from $table") + checkSparkAnswerAndOperator(query) + } + } + } + } + + test("rlike fallback for non scalar pattern") { + val table = "rlike_fallback" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + sql(s"insert into $table values(1,'James Smith')") + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + val query2 = sql(s"select id from $table where name rlike name") + val (_, cometPlan) = checkSparkAnswer(query2) + val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) + assert(explain.contains("Only scalar regexp patterns are supported")) + } + } + } + + test("rlike whitespace") { + val table = "rlike_whitespace" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + val values = + Seq("James Smith", "\rJames\rSmith\r", "\nJames\nSmith\n", "\r\nJames\r\nSmith\r\n") + values.zipWithIndex.foreach { x => + sql(s"insert into $table values (${x._2}, '${x._1}')") + } + val patterns = Seq( + "James", + "J[a-z]mes", + "^James", + "\\AJames", + "Smith", + "James$", + "James\\Z", + "James\\z", + "^Smith", + "\\ASmith", + // $ produces different results - we could potentially transpile this to a different + // expression or just fall back to Spark for this case + // "Smith$", + "Smith\\Z", + "Smith\\z") + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + patterns.foreach { pattern => + val query2 = sql(s"select name, '$pattern', name rlike '$pattern' from $table") + checkSparkAnswerAndOperator(query2) + } + } + } + } + + test("rlike") { + val table = "rlike_fuzz" + val gen = new DataGenerator(new Random(42)) + withTable(table) { + // generate some data + // newline characters are intentionally omitted for now + val dataChars = "\t abc123" + sql(s"create table $table(id int, name varchar(20)) using parquet") + gen.generateStrings(100, dataChars, 6).zipWithIndex.foreach { x => + sql(s"insert into $table values(${x._2}, '${x._1}')") + } + + // test some common cases - this is far from comprehensive + // see https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html + // for all valid patterns in Java's regexp engine + // + // patterns not currently covered: + // - octal values + // - hex values + // - specific character matches + // - specific whitespace/newline matches + // - complex character classes (union, intersection, subtraction) + // - POSIX character classes + // - java.lang.Character classes + // - Classes for Unicode scripts, blocks, categories and binary properties + // - reluctant quantifiers + // - possessive quantifiers + // - logical operators + // - back-references + // - quotations + // - special constructs (name capturing and non-capturing) + val startPatterns = Seq("", "^", "\\A") + val endPatterns = Seq("", "$", "\\Z", "\\z") + val patternParts = Seq( + "[0-9]", + "[a-z]", + "[^a-z]", + "\\d", + "\\D", + "\\w", + "\\W", + "\\b", + "\\B", + "\\h", + "\\H", + "\\s", + "\\S", + "\\v", + "\\V") + val qualifiers = Seq("", "+", "*", "?", "{1,}") + + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + // testing every possible combination takes too long, so we pick some + // random combinations + for (_ <- 0 until 100) { + val pattern = gen.pickRandom(startPatterns) + + gen.pickRandom(patternParts) + + gen.pickRandom(qualifiers) + + gen.pickRandom(endPatterns) + val query = sql(s"select id, name, name rlike '$pattern' from $table") + checkSparkAnswerAndOperator(query) + } + } + } + } + test("contains") { val table = "names" withTable(table) { diff --git a/spark/src/test/scala/org/apache/comet/DataGenerator.scala b/spark/src/test/scala/org/apache/comet/DataGenerator.scala index 80e7c22881..443a058bca 100644 --- a/spark/src/test/scala/org/apache/comet/DataGenerator.scala +++ b/spark/src/test/scala/org/apache/comet/DataGenerator.scala @@ -36,6 +36,11 @@ object DataGenerator { class DataGenerator(r: Random) { import DataGenerator._ + /** Pick a random item from a sequence */ + def pickRandom[T](items: Seq[T]): T = { + items(r.nextInt(items.length)) + } + /** Generate a random string using the specified characters */ def generateString(chars: String, maxLen: Int): String = { val len = r.nextInt(maxLen) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index 22d8467018..074d424beb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -69,7 +69,8 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { "join_exploding_output", "join_inner", "join_left_outer", - "join_semi") + "join_semi", + "rlike") override def runQueries( queryLocation: String, @@ -110,6 +111,7 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { benchmark.addCase(s"$name$nameSuffix: Comet (Scan, Exec)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") { cometSpark.sql(queryString).noop()