From 8e4f051c83f61aaa4a980c9ee2de90774b6f241c Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 2 Jul 2025 08:16:53 -0700 Subject: [PATCH 01/28] feat: support literal for ARRAY top level --- native/core/src/execution/planner.rs | 15 ++- native/proto/src/proto/expr.proto | 20 +-- native/proto/src/proto/types.proto | 39 ++++++ .../apache/comet/expressions/CometCast.scala | 3 +- .../apache/comet/serde/QueryPlanSerde.scala | 122 +++++++++++++++++- 5 files changed, 185 insertions(+), 14 deletions(-) create mode 100644 native/proto/src/proto/types.proto diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 655559dc9d..a96c58df88 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -85,6 +85,8 @@ use datafusion::physical_expr::window::WindowExpr; use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; +use arrow::array::Int32Array; +use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec; use datafusion_comet_proto::spark_operator::SparkFilePartition; @@ -474,6 +476,18 @@ impl PhysicalPlanner { ))) } } + }, + Value::ListVal(values) => { + dbg!(values); + //dbg!(literal.datatype.as_ref().unwrap()); + //dbg!(data_type); + match data_type { + DataType::List(f) if f.data_type().equals_datatype(&DataType::Int32) => { + SingleRowListArrayBuilder::new(Arc::new(Int32Array::from(values.clone().int_values))) + .build_list_scalar() + } + _ => todo!() + } } } }; @@ -2322,7 +2336,6 @@ impl PhysicalPlanner { other => other, }; let func = self.session_ctx.udf(fun_name)?; - let coerced_types = func .coerce_types(&input_expr_types) .unwrap_or_else(|_| input_expr_types.clone()); diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 8b193ba846..1152d7a1b2 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -21,6 +21,8 @@ syntax = "proto3"; package spark.spark_expression; +import "types.proto"; + option java_package = "org.apache.comet.serde"; // The basic message representing a Spark expression. @@ -112,13 +114,13 @@ enum StatisticsType { } message Count { - repeated Expr children = 1; + repeated Expr children = 1; } message Sum { - Expr child = 1; - DataType datatype = 2; - bool fail_on_error = 3; + Expr child = 1; + DataType datatype = 2; + bool fail_on_error = 3; } message Min { @@ -215,10 +217,11 @@ message Literal { string string_val = 8; bytes bytes_val = 9; bytes decimal_val = 10; - } + ListLiteral list_val = 11; + } - DataType datatype = 11; - bool is_null = 12; + DataType datatype = 12; + bool is_null = 13; } enum EvalMode { @@ -478,5 +481,4 @@ message DataType { } DataTypeInfo type_info = 2; -} - +} \ No newline at end of file diff --git a/native/proto/src/proto/types.proto b/native/proto/src/proto/types.proto new file mode 100644 index 0000000000..1c277c2e3d --- /dev/null +++ b/native/proto/src/proto/types.proto @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + + +syntax = "proto3"; + +package spark.spark_expression; + +option java_package = "org.apache.comet.serde"; + +message ListLiteral { + // Only one of these fields should be populated based on the array type + repeated bool boolean_values = 1; + repeated int32 byte_values = 2; + repeated int32 short_values = 3; + repeated int32 int_values = 4; + repeated int64 long_values = 5; + repeated float float_values = 6; + repeated double double_values = 7; + repeated string string_values = 8; + repeated bytes bytes_values = 9; + repeated bytes decimal_values = 10; + repeated ListLiteral list_values = 11; +} \ No newline at end of file diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index e0e89b35fc..6d52824b56 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -19,7 +19,7 @@ package org.apache.comet.expressions -import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType} sealed trait SupportLevel @@ -62,6 +62,7 @@ object CometCast { } (fromType, toType) match { + case (dt: ArrayType, _: ArrayType) if dt.elementType == NullType => Compatible() case (dt: DataType, _) if dt.typeName == "timestamp_ntz" => // https://github.com/apache/datafusion-comet/issues/378 toType match { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1b72521270..5d6b35ee16 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, NormalizeNaNAndZero} import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.util.{CharVarcharCodegenUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec @@ -55,6 +56,7 @@ import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType import org.apache.comet.serde.ExprOuterClass.DataType._ import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, BuildSide, JoinType, Operator} import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} +import org.apache.comet.serde.Types.ListLiteral import org.apache.comet.shims.CometExprShim /** @@ -203,6 +205,52 @@ object QueryPlanSerde extends Logging with CometExprShim { false } +// def convertArrayToProtoLiteral(array: Seq[Any], arrayType: ArrayType): Literal = { +// val elementType = arrayType.elementType +// val listLiteralBuilder = ListLiteral.newBuilder() +// +// elementType match { +// case BooleanType => +// listLiteralBuilder.addAllBooleanValues(array.map(_.asInstanceOf[Boolean]).asJava) +// +// case ByteType => +// listLiteralBuilder.addAllByteValues(array.map(_.asInstanceOf[Byte].toInt).asJava) +// +// case ShortType => +// listLiteralBuilder.addAllShortValues(array.map(_.asInstanceOf[Short].toInt).asJava) +// +// case IntegerType => +// listLiteralBuilder.addAllIntValues(array.map(_.asInstanceOf[Int]).asJava) +// +// case LongType => +// listLiteralBuilder.addAllLongValues(array.map(_.asInstanceOf[Long]).asJava) +// +// case FloatType => +// listLiteralBuilder.addAllFloatValues(array.map(_.asInstanceOf[Float]).asJava) +// +// case DoubleType => +// listLiteralBuilder.addAllDoubleValues(array.map(_.asInstanceOf[Double]).asJava) +// +// case StringType => +// listLiteralBuilder.addAllStringValues(array.map(_.asInstanceOf[String]).asJava) +// +// case BinaryType => +// listLiteralBuilder.addAllBytesValues + // (array.map(x => com.google.protobuf + // .ByteString.copyFrom(x.asInstanceOf[Array[Byte]])).asJava) +// +// case nested: ArrayType => +// val nestedListLiterals = array.map { +// case null => ListLiteral.newBuilder().build() // or handle nulls appropriately +// case seq: Seq[_] => convertArrayToProtoLiteral(seq, nested).getListVal +// } +// listLiteralBuilder.addAllListValues(nestedListLiterals.asJava) +// +// case _ => +// throw new UnsupportedOperationException(s"Unsupported element type: $elementType") +// } +// } + /** * Serializes Spark datatype to protobuf. Note that, a datatype can be serialized by this method * doesn't mean it is supported by Comet native execution, i.e., `supportedDataType` may return @@ -711,8 +759,54 @@ object QueryPlanSerde extends Logging with CometExprShim { binding, (builder, binaryExpr) => builder.setNeqNullSafe(binaryExpr)) - case Literal(value, dataType) - if supportedDataType(dataType, allowComplex = value == null) => + case GreaterThan(left, right) => + createBinaryExpr( + expr, + left, + right, + inputs, + binding, + (builder, binaryExpr) => builder.setGt(binaryExpr)) + + case GreaterThanOrEqual(left, right) => + createBinaryExpr( + expr, + left, + right, + inputs, + binding, + (builder, binaryExpr) => builder.setGtEq(binaryExpr)) + + case LessThan(left, right) => + createBinaryExpr( + expr, + left, + right, + inputs, + binding, + (builder, binaryExpr) => builder.setLt(binaryExpr)) + + case LessThanOrEqual(left, right) => + createBinaryExpr( + expr, + left, + right, + inputs, + binding, + (builder, binaryExpr) => builder.setLtEq(binaryExpr)) + + case Literal(value, dataType) if supportedDataType( + dataType, + allowComplex = value == null || + // Nested literal support for native reader + + + // can be tracked https://github.com/apache/datafusion-comet/issues/1937 + // now supports only Array of primitive + (Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION) + .contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()) && dataType + .isInstanceOf[ArrayType]) && !isComplexType( + dataType.asInstanceOf[ArrayType].elementType)) => val exprBuilder = ExprOuterClass.Literal.newBuilder() if (value == null) { @@ -741,6 +835,28 @@ object QueryPlanSerde extends Logging with CometExprShim { com.google.protobuf.ByteString.copyFrom(value.asInstanceOf[Array[Byte]]) exprBuilder.setBytesVal(byteStr) case _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int]) + case a: ArrayType => + val listLiteralBuilder = ListLiteral.newBuilder() + val array = value.asInstanceOf[GenericArrayData].array + a.elementType match { + case BooleanType => + listLiteralBuilder.addAllBooleanValues( + array.map(_.asInstanceOf[java.lang.Boolean]).toIterable.asJava) + case ByteType => + listLiteralBuilder.addAllByteValues( + array.map(_.asInstanceOf[java.lang.Integer]).toIterable.asJava) + case ShortType => + listLiteralBuilder.addAllShortValues( + array.map(_.asInstanceOf[java.lang.Integer]).toIterable.asJava) + case IntegerType => + listLiteralBuilder.addAllIntValues( + array.map(_.asInstanceOf[java.lang.Integer]).toIterable.asJava) + case LongType => + listLiteralBuilder.addAllLongValues( + array.map(_.asInstanceOf[java.lang.Long]).toIterable.asJava) + } + exprBuilder.setListVal(listLiteralBuilder.build()) + exprBuilder.setDatatype(serializeDataType(dataType).get) case dt => logWarning(s"Unexpected datatype '$dt' for literal value '$value'") } From e2da225f4df1cdf98d914e9c0698b3e4807d61c0 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 2 Jul 2025 08:38:18 -0700 Subject: [PATCH 02/28] feat: support literal for ARRAY top level --- .../org/apache/comet/exec/CometNativeReaderSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index f33da3ba71..a42e8a6c4c 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -436,4 +436,12 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper |""".stripMargin, "select c0['key1'].b from tbl") } + + test("native reader - support ARRAY literal INT fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(1, 2, 3) from tbl") + } } From 89b9c1d6b748809b4d9b5114ea89943591f19237 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 2 Jul 2025 08:39:46 -0700 Subject: [PATCH 03/28] fixed size list --- native/core/src/execution/planner.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index a96c58df88..74d1cff2c0 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -478,13 +478,15 @@ impl PhysicalPlanner { } }, Value::ListVal(values) => { - dbg!(values); + //dbg!(values); //dbg!(literal.datatype.as_ref().unwrap()); //dbg!(data_type); match data_type { DataType::List(f) if f.data_type().equals_datatype(&DataType::Int32) => { - SingleRowListArrayBuilder::new(Arc::new(Int32Array::from(values.clone().int_values))) - .build_list_scalar() + let vals = values.clone().int_values; + let len = &vals.len(); + SingleRowListArrayBuilder::new(Arc::new(Int32Array::from(vals))) + .build_fixed_size_list_scalar(*len) } _ => todo!() } From c29d5e89dd61fe2ce75882cdfcdb856d48208775 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 9 Jul 2025 17:27:17 -0700 Subject: [PATCH 04/28] feat: support literal for ARRAY top level --- native/core/src/execution/planner.rs | 246 +++++++++++++++++- native/proto/src/lib.rs | 1 + native/proto/src/proto/types.proto | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 93 +++++-- .../comet/exec/CometNativeReaderSuite.scala | 114 +++++++- 5 files changed, 427 insertions(+), 29 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 74d1cff2c0..fd22ef5ca2 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -85,7 +85,11 @@ use datafusion::physical_expr::window::WindowExpr; use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; -use arrow::array::Int32Array; +use arrow::array::{ + BinaryArray, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Array, Decimal128Builder, + Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, + NullArray, StringBuilder, TimestampMicrosecondBuilder, +}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec; @@ -478,17 +482,237 @@ impl PhysicalPlanner { } }, Value::ListVal(values) => { - //dbg!(values); - //dbg!(literal.datatype.as_ref().unwrap()); - //dbg!(data_type); - match data_type { - DataType::List(f) if f.data_type().equals_datatype(&DataType::Int32) => { - let vals = values.clone().int_values; - let len = &vals.len(); - SingleRowListArrayBuilder::new(Arc::new(Int32Array::from(vals))) - .build_fixed_size_list_scalar(*len) + if let DataType::List(f) = data_type { + match f.data_type() { + DataType::Null => { + SingleRowListArrayBuilder::new(Arc::new(NullArray::new(values.clone().null_mask.len()))) + .build_list_scalar() + } + DataType::Boolean => { + let vals = values.clone(); + let len = vals.boolean_values.len(); + let mut arr = BooleanBuilder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.boolean_values[i]); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Int8 => { + let vals = values.clone(); + let len = vals.byte_values.len(); + let mut arr = Int8Builder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.byte_values[i] as i8); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Int16 => { + let vals = values.clone(); + let len = vals.short_values.len(); + let mut arr = Int16Builder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.short_values[i] as i16); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Int32 => { + let vals = values.clone(); + let len = vals.int_values.len(); + let mut arr = Int32Builder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.int_values[i]); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Int64 => { + let vals = values.clone(); + let len = vals.long_values.len(); + let mut arr = Int64Builder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.long_values[i]); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Float32 => { + let vals = values.clone(); + let len = vals.float_values.len(); + let mut arr = Float32Builder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.float_values[i]); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Float64 => { + let vals = values.clone(); + let len = vals.double_values.len(); + let mut arr = Float64Builder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.double_values[i]); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Timestamp(TimeUnit::Microsecond, None) => { + let vals = values.clone(); + let len = vals.long_values.len(); + let mut arr = TimestampMicrosecondBuilder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.long_values[i]); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { + let vals = values.clone(); + let len = vals.long_values.len(); + let mut arr = TimestampMicrosecondBuilder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.long_values[i]); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish().with_timezone(Arc::clone(tz)))) + .build_list_scalar() + } + DataType::Date32 => { + let vals = values.clone(); + let len = vals.int_values.len(); + let mut arr = Date32Builder::with_capacity(len); + + for i in 0 .. len { + if !vals.null_mask[i] { + arr.append_value(vals.int_values[i]); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Binary => { + let vals = values.clone(); + let mut arr = BinaryBuilder::new(); + + for (i, v) in vals.bytes_values.into_iter().enumerate() { + if !vals.null_mask[i] { + arr.append_value(v); + } else { + arr.append_null(); + } + } + + let binary_array: BinaryArray = arr.finish(); + SingleRowListArrayBuilder::new(Arc::new(binary_array)) + .build_list_scalar() + } + DataType::Utf8 => { + let vals = values.clone(); + let len = vals.string_values.len(); + let mut arr = StringBuilder::with_capacity(len, len); + + for (i, v) in vals.string_values.into_iter().enumerate() { + if !vals.null_mask[i] { + arr.append_value(v); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + .build_list_scalar() + } + DataType::Decimal128(p, s) => { + let vals = values.clone(); + let mut arr = Decimal128Builder::new().with_precision_and_scale(*p, *s)?; + + for (i, v) in vals.decimal_values.into_iter().enumerate() { + if !vals.null_mask[i] { + let big_integer = BigInt::from_signed_bytes_be(&v); + let integer = big_integer.to_i128().ok_or_else(|| { + GeneralError(format!( + "Cannot parse {big_integer:?} as i128 for Decimal literal" + )) + })?; + arr.append_value(integer); + } else { + arr.append_null(); + } + } + + let decimal_array: Decimal128Array = arr.finish(); + SingleRowListArrayBuilder::new(Arc::new(decimal_array)) + .build_list_scalar() + } + dt => { + return Err(GeneralError(format!( + "DataType::List literal does not support {dt:?} type" + ))) + } } - _ => todo!() + + } else { + return Err(GeneralError(format!( + "Expected DataType::List but got {data_type:?}" + ))) } } } diff --git a/native/proto/src/lib.rs b/native/proto/src/lib.rs index ed24440360..2c213c2514 100644 --- a/native/proto/src/lib.rs +++ b/native/proto/src/lib.rs @@ -21,6 +21,7 @@ // Include generated modules from .proto files. #[allow(missing_docs)] +#[allow(clippy::large_enum_variant)] pub mod spark_expression { include!(concat!("generated", "/spark.spark_expression.rs")); } diff --git a/native/proto/src/proto/types.proto b/native/proto/src/proto/types.proto index 1c277c2e3d..cc163522b4 100644 --- a/native/proto/src/proto/types.proto +++ b/native/proto/src/proto/types.proto @@ -36,4 +36,6 @@ message ListLiteral { repeated bytes bytes_values = 9; repeated bytes decimal_values = 10; repeated ListLiteral list_values = 11; + + repeated bool null_mask = 12; } \ No newline at end of file diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 5d6b35ee16..106110974c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -48,6 +48,8 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import com.google.protobuf.ByteString + import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo} import org.apache.comet.expressions._ @@ -817,14 +819,13 @@ object QueryPlanSerde extends Logging with CometExprShim { case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean]) case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte]) case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short]) - case _: IntegerType => exprBuilder.setIntVal(value.asInstanceOf[Int]) - case _: LongType => exprBuilder.setLongVal(value.asInstanceOf[Long]) + case _: IntegerType | _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int]) + case _: LongType | _: TimestampType | _: TimestampNTZType => + exprBuilder.setLongVal(value.asInstanceOf[Long]) case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float]) case _: DoubleType => exprBuilder.setDoubleVal(value.asInstanceOf[Double]) case _: StringType => exprBuilder.setStringVal(value.asInstanceOf[UTF8String].toString) - case _: TimestampType => exprBuilder.setLongVal(value.asInstanceOf[Long]) - case _: TimestampNTZType => exprBuilder.setLongVal(value.asInstanceOf[Long]) case _: DecimalType => // Pass decimal literal as bytes. val unscaled = value.asInstanceOf[Decimal].toBigDecimal.underlying.unscaledValue @@ -834,26 +835,84 @@ object QueryPlanSerde extends Logging with CometExprShim { val byteStr = com.google.protobuf.ByteString.copyFrom(value.asInstanceOf[Array[Byte]]) exprBuilder.setBytesVal(byteStr) - case _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int]) case a: ArrayType => val listLiteralBuilder = ListLiteral.newBuilder() val array = value.asInstanceOf[GenericArrayData].array a.elementType match { + case NullType => + array.foreach(_ => listLiteralBuilder.addNullMask(true)) case BooleanType => - listLiteralBuilder.addAllBooleanValues( - array.map(_.asInstanceOf[java.lang.Boolean]).toIterable.asJava) + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Boolean] + listLiteralBuilder.addBooleanValues(casted) + listLiteralBuilder.addNullMask(casted == null) + }) case ByteType => - listLiteralBuilder.addAllByteValues( - array.map(_.asInstanceOf[java.lang.Integer]).toIterable.asJava) + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Integer] + listLiteralBuilder.addByteValues(casted) + listLiteralBuilder.addNullMask(casted == null) + }) case ShortType => - listLiteralBuilder.addAllShortValues( - array.map(_.asInstanceOf[java.lang.Integer]).toIterable.asJava) - case IntegerType => - listLiteralBuilder.addAllIntValues( - array.map(_.asInstanceOf[java.lang.Integer]).toIterable.asJava) - case LongType => - listLiteralBuilder.addAllLongValues( - array.map(_.asInstanceOf[java.lang.Long]).toIterable.asJava) + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Short] + listLiteralBuilder.addShortValues( + if (casted != null) casted.intValue() + else null.asInstanceOf[java.lang.Integer]) + listLiteralBuilder.addNullMask(casted == null) + }) + case IntegerType | DateType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Integer] + listLiteralBuilder.addIntValues(casted) + listLiteralBuilder.addNullMask(casted == null) + }) + case LongType | TimestampType | TimestampNTZType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Long] + listLiteralBuilder.addLongValues(casted) + listLiteralBuilder.addNullMask(casted == null) + }) + case FloatType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Float] + listLiteralBuilder.addFloatValues(casted) + listLiteralBuilder.addNullMask(casted == null) + }) + case DoubleType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Double] + listLiteralBuilder.addDoubleValues(casted) + listLiteralBuilder.addNullMask(casted == null) + }) + case StringType => + array.foreach(v => { + val casted = v.asInstanceOf[org.apache.spark.unsafe.types.UTF8String] + listLiteralBuilder.addStringValues( + if (casted != null) casted.toString else "") + listLiteralBuilder.addNullMask(casted == null) + }) + case _: DecimalType => + array + .foreach(v => { + val casted = + v.asInstanceOf[Decimal] + listLiteralBuilder.addDecimalValues(if (casted != null) { + com.google.protobuf.ByteString + .copyFrom(casted.toBigDecimal.underlying.unscaledValue.toByteArray) + } else ByteString.EMPTY) + listLiteralBuilder.addNullMask(casted == null) + }) + case _: BinaryType => + array + .foreach(v => { + val casted = + v.asInstanceOf[Array[Byte]] + listLiteralBuilder.addBytesValues(if (casted != null) { + com.google.protobuf.ByteString.copyFrom(casted) + } else ByteString.EMPTY) + listLiteralBuilder.addNullMask(casted == null) + }) } exprBuilder.setListVal(listLiteralBuilder.build()) exprBuilder.setDatatype(serializeDataType(dataType).get) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index a42e8a6c4c..0750593c96 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -442,6 +442,118 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(1, 2, 3) from tbl") + "select array(1, 2, 3, null) from tbl") + } + + test("native reader - support ARRAY literal BOOL fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(true, false, null) from tbl") + } + + test("native reader - support ARRAY literal NULL fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(null) from tbl") + } + + test("native reader - support empty ARRAY literal") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array() from tbl") + } + + test("native reader - support ARRAY literal BYTE fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(1, 2, 3, null) from tbl") + } + + test("native reader - support ARRAY literal SHORT fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(cast(1 as short), cast(2 as short), cast(3 as short), null) from tbl") + } + + test("native reader - support ARRAY literal DATE fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(CAST('2024-01-01' AS DATE), CAST('2024-02-01' AS DATE), CAST('2024-03-01' AS DATE), null) from tbl") + } + + test("native reader - support ARRAY literal LONG fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(cast(1 as bigint), cast(2 as bigint), cast(3 as bigint), null) from tbl") + } + + test("native reader - support ARRAY literal TIMESTAMP fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(CAST('2024-01-01 10:00:00' AS TIMESTAMP), CAST('2024-01-02 10:00:00' AS TIMESTAMP), CAST('2024-01-03 10:00:00' AS TIMESTAMP), null) from tbl") + } + + test("native reader - support ARRAY literal TIMESTAMP TZ fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(CAST('2024-01-01 10:00:00' AS TIMESTAMP_NTZ), CAST('2024-01-02 10:00:00' AS TIMESTAMP_NTZ), CAST('2024-01-03 10:00:00' AS TIMESTAMP_NTZ), null) from tbl") + } + + test("native reader - support ARRAY literal FLOAT fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(cast(1 as float), cast(2 as float), cast(3 as float), null) from tbl") + } + + test("native reader - support ARRAY literal DOUBLE fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(cast(1 as double), cast(2 as double), cast(3 as double), null) from tbl") + } + + test("native reader - support ARRAY literal STRING fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array('a', 'bc', 'def', null) from tbl") + } + + test("native reader - support ARRAY literal DECIMAL fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(cast(1 as decimal(10, 2)), cast(2.5 as decimal(10, 2)),cast(3.75 as decimal(10, 2)), null) from tbl") + } + + test("native reader - support ARRAY literal BINARY fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(cast('a' as binary), cast('bc' as binary), cast('def' as binary), null) from tbl") } } From 4a36c8b5e35b74d94c63e437a005f87c533fabdb Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 9 Jul 2025 17:40:55 -0700 Subject: [PATCH 05/28] feat: support literal for ARRAY top level --- .../apache/comet/serde/QueryPlanSerde.scala | 46 ------------------- 1 file changed, 46 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 106110974c..62f063e26b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -207,52 +207,6 @@ object QueryPlanSerde extends Logging with CometExprShim { false } -// def convertArrayToProtoLiteral(array: Seq[Any], arrayType: ArrayType): Literal = { -// val elementType = arrayType.elementType -// val listLiteralBuilder = ListLiteral.newBuilder() -// -// elementType match { -// case BooleanType => -// listLiteralBuilder.addAllBooleanValues(array.map(_.asInstanceOf[Boolean]).asJava) -// -// case ByteType => -// listLiteralBuilder.addAllByteValues(array.map(_.asInstanceOf[Byte].toInt).asJava) -// -// case ShortType => -// listLiteralBuilder.addAllShortValues(array.map(_.asInstanceOf[Short].toInt).asJava) -// -// case IntegerType => -// listLiteralBuilder.addAllIntValues(array.map(_.asInstanceOf[Int]).asJava) -// -// case LongType => -// listLiteralBuilder.addAllLongValues(array.map(_.asInstanceOf[Long]).asJava) -// -// case FloatType => -// listLiteralBuilder.addAllFloatValues(array.map(_.asInstanceOf[Float]).asJava) -// -// case DoubleType => -// listLiteralBuilder.addAllDoubleValues(array.map(_.asInstanceOf[Double]).asJava) -// -// case StringType => -// listLiteralBuilder.addAllStringValues(array.map(_.asInstanceOf[String]).asJava) -// -// case BinaryType => -// listLiteralBuilder.addAllBytesValues - // (array.map(x => com.google.protobuf - // .ByteString.copyFrom(x.asInstanceOf[Array[Byte]])).asJava) -// -// case nested: ArrayType => -// val nestedListLiterals = array.map { -// case null => ListLiteral.newBuilder().build() // or handle nulls appropriately -// case seq: Seq[_] => convertArrayToProtoLiteral(seq, nested).getListVal -// } -// listLiteralBuilder.addAllListValues(nestedListLiterals.asJava) -// -// case _ => -// throw new UnsupportedOperationException(s"Unsupported element type: $elementType") -// } -// } - /** * Serializes Spark datatype to protobuf. Note that, a datatype can be serialized by this method * doesn't mean it is supported by Comet native execution, i.e., `supportedDataType` may return From 5e9f5a17419c2b97f340024e853fdb22490e34e9 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 10 Jul 2025 14:28:09 -0700 Subject: [PATCH 06/28] feat: support literal for ARRAY top level --- native/spark-expr/src/conversion_funcs/cast.rs | 4 ++++ .../main/scala/org/apache/comet/expressions/CometCast.scala | 2 ++ 2 files changed, 6 insertions(+) diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 7140d2be09..a5169ba186 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -20,6 +20,7 @@ use crate::utils::array_with_timezone; use crate::{EvalMode, SparkError, SparkResult}; use arrow::array::builder::StringBuilder; use arrow::array::{DictionaryArray, StringArray, StructArray}; +use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Schema}; use arrow::{ array::{ @@ -968,6 +969,9 @@ fn cast_array( to_type, cast_options, )?), + (List(_), List(_)) if can_cast_types(from_type, to_type) => { + Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?) + } (UInt8 | UInt16 | UInt32 | UInt64, Int8 | Int16 | Int32 | Int64) if cast_options.allow_cast_unsigned_ints => { diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 6d52824b56..337eae11db 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -63,6 +63,8 @@ object CometCast { (fromType, toType) match { case (dt: ArrayType, _: ArrayType) if dt.elementType == NullType => Compatible() + case (dt: ArrayType, dt1: ArrayType) => + isSupported(dt.elementType, dt1.elementType, timeZoneId, evalMode) case (dt: DataType, _) if dt.typeName == "timestamp_ntz" => // https://github.com/apache/datafusion-comet/issues/378 toType match { From 6c3e3ec1001cb5407460b8cef6522df2511491c7 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 10 Jul 2025 14:29:11 -0700 Subject: [PATCH 07/28] feat: support literal for ARRAY top level --- .../org/apache/comet/exec/CometNativeReaderSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 0750593c96..4c7ea3a202 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -556,4 +556,12 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper |""".stripMargin, "select array(cast('a' as binary), cast('bc' as binary), cast('def' as binary), null) from tbl") } + + test("native reader - array equality") { + testSingleLineQuery( + """ + | select array(1) a union all select array(2) + |""".stripMargin, + "select * from tbl where a = array(1L)") + } } From 3b9e06878fed0af20eed0704965a8180e3bdb499 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 11 Jul 2025 17:57:43 -0700 Subject: [PATCH 08/28] feat: support literal for ARRAY top level --- spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 62f063e26b..2fabe8d843 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, NormalizeNaNAndZero} import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.{CharVarcharCodegenUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet._ From 2ac7d335cdb954044493859070a72b17355cd5f5 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 17 Jul 2025 08:30:53 -0700 Subject: [PATCH 09/28] feat: support literal for ARRAY top level --- .../spark-expr/src/conversion_funcs/cast.rs | 2 +- .../apache/comet/serde/QueryPlanSerde.scala | 4 ++- .../comet/exec/CometNativeReaderSuite.scala | 25 +++++++------------ 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index a5169ba186..1cf061ab23 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -1022,7 +1022,7 @@ fn is_datafusion_spark_compatible( DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => { // note that the cast from Int32/Int64 -> Decimal128 here is actually // not compatible with Spark (no overflow checks) but we have tests that - // rely on this cast working so we have to leave it here for now + // rely on this cast working, so we have to leave it here for now matches!( to_type, DataType::Boolean diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 2fabe8d843..aa0432becd 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1775,7 +1775,9 @@ object QueryPlanSerde extends Logging with CometExprShim { op match { // Fully native scan for V1 - case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION => + case scan: CometScanExec + if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION + || scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder() nativeScanBuilder.setSource(op.simpleStringWithNodeId()) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 4c7ea3a202..d1b5cab262 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.Tag import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.{array, col} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType, StructType} @@ -253,18 +254,11 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper } test("native reader - read a STRUCT subfield - field from second") { - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> "true", - SQLConf.USE_V1_SOURCE_LIST.key -> "parquet", - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false", - CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion") { - testSingleLineQuery( - """ + testSingleLineQuery( + """ |select 1 a, named_struct('a', 1, 'b', 'n') c0 |""".stripMargin, - "select c0.b from tbl") - } + "select c0.b from tbl") } test("native reader - read a STRUCT subfield from ARRAY of STRUCTS - field from first") { @@ -557,11 +551,10 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper "select array(cast('a' as binary), cast('bc' as binary), cast('def' as binary), null) from tbl") } - test("native reader - array equality") { - testSingleLineQuery( - """ - | select array(1) a union all select array(2) - |""".stripMargin, - "select * from tbl where a = array(1L)") + test("SPARK-18053: ARRAY equality is broken") { + withTable("array_tbl") { + spark.range(10).select(array(col("id")).as("arr")).write.saveAsTable("array_tbl") + assert(sql("SELECT * FROM array_tbl where arr = ARRAY(1L)").count == 1) + } } } From 3089a30651e23e7c5745d9ba402d65c44471053c Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 17 Jul 2025 09:32:33 -0700 Subject: [PATCH 10/28] feat: support literal for ARRAY top level --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index aa0432becd..2fabe8d843 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1775,9 +1775,7 @@ object QueryPlanSerde extends Logging with CometExprShim { op match { // Fully native scan for V1 - case scan: CometScanExec - if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION - || scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => + case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION => val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder() nativeScanBuilder.setSource(op.simpleStringWithNodeId()) From 476b2fd7e9a8299742157589cdaf723cb023ceff Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 22 Jul 2025 18:21:33 -0700 Subject: [PATCH 11/28] fixed size list --- native/core/src/execution/planner.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index fd22ef5ca2..732c49a4c1 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1256,9 +1256,11 @@ impl PhysicalPlanner { } OpStruct::Filter(filter) => { assert_eq!(children.len(), 1); + dbg!(filter); let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; + dbg!(&predicate); let filter: Arc = match (filter.wrap_child_in_copy_exec, filter.use_datafusion_filter) { @@ -1270,7 +1272,7 @@ impl PhysicalPlanner { predicate, Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), )?), - (false, true) => Arc::new(DataFusionFilterExec::try_new( + (false, true) => Arc::new(CometFilterExec::try_new( predicate, Arc::clone(&child.native_plan), )?), @@ -1280,6 +1282,8 @@ impl PhysicalPlanner { )?), }; + dbg!(&filter); + Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, filter, vec![child])), @@ -1410,6 +1414,7 @@ impl PhysicalPlanner { )) } OpStruct::NativeScan(scan) => { + dbg!(scan); let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice()); let required_schema: SchemaRef = convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); @@ -1540,6 +1545,7 @@ impl PhysicalPlanner { // The `ScanExec` operator will take actual arrays from Spark during execution let scan = ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?; + Ok(( vec![scan.clone()], Arc::new(SparkPlan::new(spark_plan.plan_id, Arc::new(scan), vec![])), From c95d05e6708a5ff3df9c2a1e2afdf837ea2e36f0 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 1 Aug 2025 13:25:27 -0700 Subject: [PATCH 12/28] fixes --- native/core/src/execution/planner.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 732c49a4c1..03be8c672d 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1256,11 +1256,9 @@ impl PhysicalPlanner { } OpStruct::Filter(filter) => { assert_eq!(children.len(), 1); - dbg!(filter); let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - dbg!(&predicate); let filter: Arc = match (filter.wrap_child_in_copy_exec, filter.use_datafusion_filter) { @@ -1272,7 +1270,7 @@ impl PhysicalPlanner { predicate, Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), )?), - (false, true) => Arc::new(CometFilterExec::try_new( + (false, true) => Arc::new(DataFusionFilterExec::try_new( predicate, Arc::clone(&child.native_plan), )?), @@ -1282,8 +1280,6 @@ impl PhysicalPlanner { )?), }; - dbg!(&filter); - Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, filter, vec![child])), @@ -1414,7 +1410,6 @@ impl PhysicalPlanner { )) } OpStruct::NativeScan(scan) => { - dbg!(scan); let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice()); let required_schema: SchemaRef = convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); From 0e5fcb0d8c42f832a2ad784981371ca68830f5eb Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 4 Aug 2025 11:21:07 -0700 Subject: [PATCH 13/28] feat: ArrayType literal --- .../src/main/scala/org/apache/comet/DataTypeSupport.scala | 5 +++++ .../main/scala/org/apache/comet/rules/CometScanRule.scala | 6 +----- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 1 + .../test/scala/org/apache/spark/sql/CometTestBase.scala | 7 +------ 4 files changed, 8 insertions(+), 11 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala index 917aa95697..66eab8c9f0 100644 --- a/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala +++ b/spark/src/main/scala/org/apache/comet/DataTypeSupport.scala @@ -73,4 +73,9 @@ object DataTypeSupport { val ARRAY_ELEMENT = "array element" val MAP_KEY = "map key" val MAP_VALUE = "map value" + + def isComplexType(dt: DataType): Boolean = dt match { + case _: StructType | _: ArrayType | _: MapType => true + case _ => false + } } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 592069fcc6..6a328f4be2 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.{CometConf, CometSparkSessionExtensions, DataTypeSupport} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos} +import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.parquet.{CometParquetScan, SupportsComet} /** @@ -277,11 +278,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { val partitionSchemaSupported = typeChecker.isSchemaSupported(partitionSchema, fallbackReasons) - def isComplexType(dt: DataType): Boolean = dt match { - case _: StructType | _: ArrayType | _: MapType => true - case _ => false - } - def hasMapsContainingStructs(dataType: DataType): Boolean = { dataType match { case s: StructType => s.exists(field => hasMapsContainingStructs(field.dataType)) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 2fabe8d843..0f441a4853 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -51,6 +51,7 @@ import com.google.protobuf.ByteString import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo} +import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.expressions._ import org.apache.comet.objectstore.NativeConfig import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 365003aa8c..cf11bdf590 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.internal._ import org.apache.spark.sql.test._ -import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, MapType, StructType} +import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.comet._ import org.apache.comet.shims.ShimCometSparkSessionExtensions @@ -1142,9 +1142,4 @@ abstract class CometTestBase usingDataSourceExec(conf) && !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf) } - - def isComplexType(dt: DataType): Boolean = dt match { - case _: StructType | _: ArrayType | _: MapType => true - case _ => false - } } From 1f9a979b7b31d229b3e73240fc84483e3c2a52f7 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 4 Aug 2025 12:44:52 -0700 Subject: [PATCH 14/28] feat: ArrayType literal --- .../scala/org/apache/comet/CometArrayExpressionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 9951f4f9d0..3086b39e0d 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} +import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.serde.CometArrayExcept import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} @@ -272,8 +273,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } - // https://github.com/apache/datafusion-comet/issues/1929 - ignore("array_contains - array literals") { + test("array_contains - array literals") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") val filename = path.toString From 05373d62eb8fd803f98499585101536ddedd094b Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 4 Aug 2025 13:27:25 -0700 Subject: [PATCH 15/28] feat: ArrayType literal --- .../test/scala/org/apache/comet/CometArrayExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 3086b39e0d..8bbc79d3be 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -273,7 +273,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } - test("array_contains - array literals") { + ignore("array_contains - array literals") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") val filename = path.toString From 3ef8cce5cfc13a0b51b5135579dce8a8656f135a Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 Aug 2025 11:33:51 -0700 Subject: [PATCH 16/28] feat: ArrayType literal --- .../test/scala/org/apache/comet/CometArrayExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 8bbc79d3be..3086b39e0d 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -273,7 +273,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } - ignore("array_contains - array literals") { + test("array_contains - array literals") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") val filename = path.toString From 8c1c8888aa8a7e1ede2cdb3513bae2f86331cdd2 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 Aug 2025 12:38:41 -0700 Subject: [PATCH 17/28] feat: ArrayType literal --- .../scala/org/apache/comet/CometArrayExpressionSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 3086b39e0d..9976ecd748 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -292,14 +292,13 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp generateMap = false)) } val table = spark.read.parquet(filename) + table.createOrReplaceTempView("t2") for (field <- table.schema.fields) { val typeName = field.dataType.typeName - checkSparkAnswerAndOperator( - sql(s"SELECT array_contains(cast(null as array<$typeName>), b) FROM t2")) checkSparkAnswerAndOperator(sql( - s"SELECT array_contains(cast(array() as array<$typeName>), cast(null as $typeName)) FROM t2")) - checkSparkAnswerAndOperator(sql("SELECT array_contains(array(), 1) FROM t2")) + s"SELECT array_contains(cast(null as array<$typeName>), cast(null as $typeName)) FROM t2")) } + checkSparkAnswerAndOperator(sql("SELECT array_contains(array(), 1) FROM t2")) } } From 309d1af226c202f9db9ac0ebdc5fbc8f799c3a66 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 7 Aug 2025 14:27:21 -0700 Subject: [PATCH 18/28] replace builders --- native/core/src/execution/planner.rs | 181 ++++----------------------- 1 file changed, 27 insertions(+), 154 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 03be8c672d..3a70f2234e 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -86,10 +86,11 @@ use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; use arrow::array::{ - BinaryArray, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Array, Decimal128Builder, - Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, - NullArray, StringBuilder, TimestampMicrosecondBuilder, + BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, + TimestampMicrosecondArray, }; +use arrow::buffer::{BooleanBuffer, Buffer, MutableBuffer, OffsetBuffer}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec; @@ -490,181 +491,64 @@ impl PhysicalPlanner { } DataType::Boolean => { let vals = values.clone(); - let len = vals.boolean_values.len(); - let mut arr = BooleanBuilder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.boolean_values[i]); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + SingleRowListArrayBuilder::new(Arc::new(BooleanArray::new(BooleanBuffer::from(vals.boolean_values), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Int8 => { let vals = values.clone(); - let len = vals.byte_values.len(); - let mut arr = Int8Builder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.byte_values[i] as i8); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + SingleRowListArrayBuilder::new(Arc::new(Int8Array::new(vals.byte_values.iter().map(|&x| x as i8).collect::>().into(), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Int16 => { let vals = values.clone(); - let len = vals.short_values.len(); - let mut arr = Int16Builder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.short_values[i] as i16); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + SingleRowListArrayBuilder::new(Arc::new(Int16Array::new(vals.short_values.iter().map(|&x| x as i16).collect::>().into(), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Int32 => { let vals = values.clone(); - let len = vals.int_values.len(); - let mut arr = Int32Builder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.int_values[i]); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + SingleRowListArrayBuilder::new(Arc::new(Int32Array::new(vals.int_values.into(), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Int64 => { let vals = values.clone(); - let len = vals.long_values.len(); - let mut arr = Int64Builder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.long_values[i]); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + SingleRowListArrayBuilder::new(Arc::new(Int64Array::new(vals.long_values.into(), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Float32 => { let vals = values.clone(); - let len = vals.float_values.len(); - let mut arr = Float32Builder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.float_values[i]); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + SingleRowListArrayBuilder::new(Arc::new(Float32Array::new(vals.float_values.into(), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Float64 => { let vals = values.clone(); - let len = vals.double_values.len(); - let mut arr = Float64Builder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.double_values[i]); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + SingleRowListArrayBuilder::new(Arc::new(Float64Array::new(vals.double_values.into(), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Timestamp(TimeUnit::Microsecond, None) => { let vals = values.clone(); - let len = vals.long_values.len(); - let mut arr = TimestampMicrosecondBuilder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.long_values[i]); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + SingleRowListArrayBuilder::new(Arc::new(TimestampMicrosecondArray::new(vals.long_values.into(), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { let vals = values.clone(); - let len = vals.long_values.len(); - let mut arr = TimestampMicrosecondBuilder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.long_values[i]); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish().with_timezone(Arc::clone(tz)))) + SingleRowListArrayBuilder::new(Arc::new(TimestampMicrosecondArray::new(vals.long_values.into(), Some(vals.null_mask.into())).with_timezone(Arc::clone(tz)))) .build_list_scalar() } DataType::Date32 => { let vals = values.clone(); - let len = vals.int_values.len(); - let mut arr = Date32Builder::with_capacity(len); - - for i in 0 .. len { - if !vals.null_mask[i] { - arr.append_value(vals.int_values[i]); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) + SingleRowListArrayBuilder::new(Arc::new(Date32Array::new(vals.int_values.into(), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Binary => { let vals = values.clone(); - let mut arr = BinaryBuilder::new(); - - for (i, v) in vals.bytes_values.into_iter().enumerate() { - if !vals.null_mask[i] { - arr.append_value(v); - } else { - arr.append_null(); - } - } - - let binary_array: BinaryArray = arr.finish(); - SingleRowListArrayBuilder::new(Arc::new(binary_array)) + let offsets = MutableBuffer::new((vals.bytes_values.len() + 1) * size_of::()); + let offsets = Buffer::from(offsets); + let value_offsets = unsafe { OffsetBuffer::new_unchecked(offsets.into()) }; + SingleRowListArrayBuilder::new(Arc::new(BinaryArray::new(value_offsets, vals.int_values.into(), Some(vals.null_mask.into())))) .build_list_scalar() } DataType::Utf8 => { + // Using a builder here as it is quite complicated to create StringArray from vector of string with nulls let vals = values.clone(); let len = vals.string_values.len(); let mut arr = StringBuilder::with_capacity(len, len); @@ -682,25 +566,14 @@ impl PhysicalPlanner { } DataType::Decimal128(p, s) => { let vals = values.clone(); - let mut arr = Decimal128Builder::new().with_precision_and_scale(*p, *s)?; - - for (i, v) in vals.decimal_values.into_iter().enumerate() { - if !vals.null_mask[i] { - let big_integer = BigInt::from_signed_bytes_be(&v); - let integer = big_integer.to_i128().ok_or_else(|| { - GeneralError(format!( - "Cannot parse {big_integer:?} as i128 for Decimal literal" - )) - })?; - arr.append_value(integer); - } else { - arr.append_null(); - } - } - - let decimal_array: Decimal128Array = arr.finish(); - SingleRowListArrayBuilder::new(Arc::new(decimal_array)) - .build_list_scalar() + SingleRowListArrayBuilder::new(Arc::new(Decimal128Array::new(vals.decimal_values.into_iter().map(|v| { + let big_integer = BigInt::from_signed_bytes_be(&v); + big_integer.to_i128().ok_or_else(|| { + return GeneralError(format!( + "Cannot parse {big_integer:?} as i128 for Decimal literal" + )) + }).unwrap() + }).collect::>().into(), Some(vals.null_mask.into())).with_precision_and_scale(*p, *s)?)).build_list_scalar() } dt => { return Err(GeneralError(format!( From 0dff9be5cd1476ed96f1a33a32ced327f67eecdd Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 7 Aug 2025 14:37:27 -0700 Subject: [PATCH 19/28] tests --- .../comet/exec/CometNativeReaderSuite.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index d1b5cab262..8f1e7cfdf0 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -436,7 +436,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(1, 2, 3, null) from tbl") + "select array(1, 2, null, 3, null) from tbl") } test("native reader - support ARRAY literal BOOL fields") { @@ -444,7 +444,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(true, false, null) from tbl") + "select array(true, null, false, null) from tbl") } test("native reader - support ARRAY literal NULL fields") { @@ -468,7 +468,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(1, 2, 3, null) from tbl") + "select array(1, 2, null, 3, null) from tbl") } test("native reader - support ARRAY literal SHORT fields") { @@ -476,7 +476,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(cast(1 as short), cast(2 as short), cast(3 as short), null) from tbl") + "select array(cast(1 as short), cast(2 as short), null, cast(3 as short), null) from tbl") } test("native reader - support ARRAY literal DATE fields") { @@ -484,7 +484,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(CAST('2024-01-01' AS DATE), CAST('2024-02-01' AS DATE), CAST('2024-03-01' AS DATE), null) from tbl") + "select array(CAST('2024-01-01' AS DATE), CAST('2024-02-01' AS DATE), null, CAST('2024-03-01' AS DATE), null) from tbl") } test("native reader - support ARRAY literal LONG fields") { @@ -492,7 +492,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(cast(1 as bigint), cast(2 as bigint), cast(3 as bigint), null) from tbl") + "select array(cast(1 as bigint), cast(2 as bigint), null, cast(3 as bigint), null) from tbl") } test("native reader - support ARRAY literal TIMESTAMP fields") { @@ -500,7 +500,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(CAST('2024-01-01 10:00:00' AS TIMESTAMP), CAST('2024-01-02 10:00:00' AS TIMESTAMP), CAST('2024-01-03 10:00:00' AS TIMESTAMP), null) from tbl") + "select array(CAST('2024-01-01 10:00:00' AS TIMESTAMP), CAST('2024-01-02 10:00:00' AS TIMESTAMP), null, CAST('2024-01-03 10:00:00' AS TIMESTAMP), null) from tbl") } test("native reader - support ARRAY literal TIMESTAMP TZ fields") { @@ -508,7 +508,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(CAST('2024-01-01 10:00:00' AS TIMESTAMP_NTZ), CAST('2024-01-02 10:00:00' AS TIMESTAMP_NTZ), CAST('2024-01-03 10:00:00' AS TIMESTAMP_NTZ), null) from tbl") + "select array(CAST('2024-01-01 10:00:00' AS TIMESTAMP_NTZ), CAST('2024-01-02 10:00:00' AS TIMESTAMP_NTZ), null, CAST('2024-01-03 10:00:00' AS TIMESTAMP_NTZ), null) from tbl") } test("native reader - support ARRAY literal FLOAT fields") { @@ -516,7 +516,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(cast(1 as float), cast(2 as float), cast(3 as float), null) from tbl") + "select array(cast(1 as float), cast(2 as float), null, cast(3 as float), null) from tbl") } test("native reader - support ARRAY literal DOUBLE fields") { @@ -524,7 +524,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(cast(1 as double), cast(2 as double), cast(3 as double), null) from tbl") + "select array(cast(1 as double), cast(2 as double), null, cast(3 as double), null) from tbl") } test("native reader - support ARRAY literal STRING fields") { @@ -532,7 +532,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array('a', 'bc', 'def', null) from tbl") + "select array('a', 'bc', null, 'def', null) from tbl") } test("native reader - support ARRAY literal DECIMAL fields") { @@ -540,7 +540,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(cast(1 as decimal(10, 2)), cast(2.5 as decimal(10, 2)),cast(3.75 as decimal(10, 2)), null) from tbl") + "select array(cast(1 as decimal(10, 2)), cast(2.5 as decimal(10, 2)), null, cast(3.75 as decimal(10, 2)), null) from tbl") } test("native reader - support ARRAY literal BINARY fields") { @@ -548,7 +548,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(cast('a' as binary), cast('bc' as binary), cast('def' as binary), null) from tbl") + "select array(cast('a' as binary), cast('bc' as binary), null, cast('def' as binary), null) from tbl") } test("SPARK-18053: ARRAY equality is broken") { From eeac85067293bd50622b7df62eecc0358aacd86a Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 7 Aug 2025 15:13:49 -0700 Subject: [PATCH 20/28] tests --- .../apache/comet/serde/QueryPlanSerde.scala | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 0f441a4853..c4fec44782 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -751,18 +751,17 @@ object QueryPlanSerde extends Logging with CometExprShim { binding, (builder, binaryExpr) => builder.setLtEq(binaryExpr)) - case Literal(value, dataType) if supportedDataType( - dataType, - allowComplex = value == null || - // Nested literal support for native reader - - - // can be tracked https://github.com/apache/datafusion-comet/issues/1937 - // now supports only Array of primitive - (Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION) - .contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()) && dataType - .isInstanceOf[ArrayType]) && !isComplexType( - dataType.asInstanceOf[ArrayType].elementType)) => + case Literal(value, dataType) + if supportedDataType( + dataType, + allowComplex = value == null || + // Nested literal support for native reader + // can be tracked https://github.com/apache/datafusion-comet/issues/1937 + // now supports only Array of primitive + (Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION) + .contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()) && dataType + .isInstanceOf[ArrayType]) && !isComplexType( + dataType.asInstanceOf[ArrayType].elementType)) => val exprBuilder = ExprOuterClass.Literal.newBuilder() if (value == null) { From 583e54d10a502faae73668574db3fc9eaee2274b Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 7 Aug 2025 15:31:38 -0700 Subject: [PATCH 21/28] clippy --- native/core/src/execution/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 3a70f2234e..a627c49b7f 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -569,7 +569,7 @@ impl PhysicalPlanner { SingleRowListArrayBuilder::new(Arc::new(Decimal128Array::new(vals.decimal_values.into_iter().map(|v| { let big_integer = BigInt::from_signed_bytes_be(&v); big_integer.to_i128().ok_or_else(|| { - return GeneralError(format!( + GeneralError(format!( "Cannot parse {big_integer:?} as i128 for Decimal literal" )) }).unwrap() From c1c80f5d69cbd1d4bcd461274ba45df3e286afa3 Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 7 Aug 2025 22:05:47 -0700 Subject: [PATCH 22/28] tests --- .../apache/comet/serde/QueryPlanSerde.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c4fec44782..3455d85c93 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -798,13 +798,13 @@ object QueryPlanSerde extends Logging with CometExprShim { array.foreach(v => { val casted = v.asInstanceOf[java.lang.Boolean] listLiteralBuilder.addBooleanValues(casted) - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) case ByteType => array.foreach(v => { val casted = v.asInstanceOf[java.lang.Integer] listLiteralBuilder.addByteValues(casted) - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) case ShortType => array.foreach(v => { @@ -812,38 +812,38 @@ object QueryPlanSerde extends Logging with CometExprShim { listLiteralBuilder.addShortValues( if (casted != null) casted.intValue() else null.asInstanceOf[java.lang.Integer]) - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) case IntegerType | DateType => array.foreach(v => { val casted = v.asInstanceOf[java.lang.Integer] listLiteralBuilder.addIntValues(casted) - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) case LongType | TimestampType | TimestampNTZType => array.foreach(v => { val casted = v.asInstanceOf[java.lang.Long] listLiteralBuilder.addLongValues(casted) - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) case FloatType => array.foreach(v => { val casted = v.asInstanceOf[java.lang.Float] listLiteralBuilder.addFloatValues(casted) - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) case DoubleType => array.foreach(v => { val casted = v.asInstanceOf[java.lang.Double] listLiteralBuilder.addDoubleValues(casted) - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) case StringType => array.foreach(v => { val casted = v.asInstanceOf[org.apache.spark.unsafe.types.UTF8String] listLiteralBuilder.addStringValues( if (casted != null) casted.toString else "") - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) case _: DecimalType => array @@ -854,7 +854,7 @@ object QueryPlanSerde extends Logging with CometExprShim { com.google.protobuf.ByteString .copyFrom(casted.toBigDecimal.underlying.unscaledValue.toByteArray) } else ByteString.EMPTY) - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) case _: BinaryType => array @@ -864,7 +864,7 @@ object QueryPlanSerde extends Logging with CometExprShim { listLiteralBuilder.addBytesValues(if (casted != null) { com.google.protobuf.ByteString.copyFrom(casted) } else ByteString.EMPTY) - listLiteralBuilder.addNullMask(casted == null) + listLiteralBuilder.addNullMask(casted != null) }) } exprBuilder.setListVal(listLiteralBuilder.build()) From b89867158f5ddbb66de1dfd68786fbf7d6b7cfaa Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 7 Aug 2025 23:04:59 -0700 Subject: [PATCH 23/28] clippy --- native/core/src/execution/planner.rs | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index a627c49b7f..9f9a9ab145 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -86,8 +86,8 @@ use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; use arrow::array::{ - BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, + BinaryArray, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, TimestampMicrosecondArray, }; use arrow::buffer::{BooleanBuffer, Buffer, MutableBuffer, OffsetBuffer}; @@ -540,21 +540,32 @@ impl PhysicalPlanner { .build_list_scalar() } DataType::Binary => { + // Using a builder here as it is quite complicated to create StringArray from a vector with nulls + // to calculate correct offsets let vals = values.clone(); - let offsets = MutableBuffer::new((vals.bytes_values.len() + 1) * size_of::()); - let offsets = Buffer::from(offsets); - let value_offsets = unsafe { OffsetBuffer::new_unchecked(offsets.into()) }; - SingleRowListArrayBuilder::new(Arc::new(BinaryArray::new(value_offsets, vals.int_values.into(), Some(vals.null_mask.into())))) + let len = vals.bytes_values.len(); + let mut arr = BinaryBuilder::with_capacity(len, len); + + for (i, v) in vals.bytes_values.into_iter().enumerate() { + if vals.null_mask[i] { + arr.append_value(v); + } else { + arr.append_null(); + } + } + + SingleRowListArrayBuilder::new(Arc::new(arr.finish())) .build_list_scalar() } DataType::Utf8 => { - // Using a builder here as it is quite complicated to create StringArray from vector of string with nulls + // Using a builder here as it is quite complicated to create StringArray from a vector with nulls + // to calculate correct offsets let vals = values.clone(); let len = vals.string_values.len(); let mut arr = StringBuilder::with_capacity(len, len); for (i, v) in vals.string_values.into_iter().enumerate() { - if !vals.null_mask[i] { + if vals.null_mask[i] { arr.append_value(v); } else { arr.append_null(); From 8206c073c2e324e65be92a72574a634c350787ae Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 7 Aug 2025 23:11:41 -0700 Subject: [PATCH 24/28] clippy --- native/core/src/execution/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 9f9a9ab145..61598294cf 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -86,11 +86,11 @@ use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; use arrow::array::{ - BinaryArray, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, + BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, TimestampMicrosecondArray, }; -use arrow::buffer::{BooleanBuffer, Buffer, MutableBuffer, OffsetBuffer}; +use arrow::buffer::BooleanBuffer; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec; From e89e225d73afe12346c20a61eed22099001ee21f Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 8 Aug 2025 08:36:12 -0700 Subject: [PATCH 25/28] clippy --- native/core/src/execution/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 61598294cf..9defa1205a 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -86,8 +86,8 @@ use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; use arrow::array::{ - BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, + BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, TimestampMicrosecondArray, }; use arrow::buffer::BooleanBuffer; From 38822db99459d3cd20539f4ab73b1b6968133025 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 8 Aug 2025 21:44:04 -0700 Subject: [PATCH 26/28] tests --- spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index a1b1812b31..217cd322dd 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types._ +import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { From 2a45cb9f992539c54c84c18826e9aec4541cf42b Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 11 Aug 2025 11:18:52 -0700 Subject: [PATCH 27/28] feedback --- native/core/src/execution/planner.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 9defa1205a..097ec70623 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -540,12 +540,13 @@ impl PhysicalPlanner { .build_list_scalar() } DataType::Binary => { - // Using a builder here as it is quite complicated to create StringArray from a vector with nulls - // to calculate correct offsets + // Using a builder as it is cumbersome to create BinaryArray from a vector with nulls + // and calculate correct offsets let vals = values.clone(); - let len = vals.bytes_values.len(); - let mut arr = BinaryBuilder::with_capacity(len, len); - + let item_capacity = vals.string_values.len(); + let data_capacity = vals.string_values.first().map(|s| s.len() * item_capacity).unwrap_or(0); + let mut arr = BinaryBuilder::with_capacity(item_capacity, data_capacity); +; for (i, v) in vals.bytes_values.into_iter().enumerate() { if vals.null_mask[i] { arr.append_value(v); @@ -558,11 +559,12 @@ impl PhysicalPlanner { .build_list_scalar() } DataType::Utf8 => { - // Using a builder here as it is quite complicated to create StringArray from a vector with nulls - // to calculate correct offsets + // Using a builder as it is cumbersome to create StringArray from a vector with nulls + // and calculate correct offsets let vals = values.clone(); - let len = vals.string_values.len(); - let mut arr = StringBuilder::with_capacity(len, len); + let item_capacity = vals.string_values.len(); + let data_capacity = vals.string_values.first().map(|s| s.len() * item_capacity).unwrap_or(0); + let mut arr = StringBuilder::with_capacity(item_capacity, data_capacity); for (i, v) in vals.string_values.into_iter().enumerate() { if vals.null_mask[i] { From 4dfc087ff7d4cfb1ca21079a983308e4283e2abc Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 11 Aug 2025 11:30:08 -0700 Subject: [PATCH 28/28] clippy --- native/core/src/execution/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 097ec70623..7fa7bfe905 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -546,7 +546,7 @@ impl PhysicalPlanner { let item_capacity = vals.string_values.len(); let data_capacity = vals.string_values.first().map(|s| s.len() * item_capacity).unwrap_or(0); let mut arr = BinaryBuilder::with_capacity(item_capacity, data_capacity); -; + for (i, v) in vals.bytes_values.into_iter().enumerate() { if vals.null_mask[i] { arr.append_value(v);