diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 477e3a1ab9..238fbb2715 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -71,7 +71,7 @@ jobs: with: spark-version: ${{ matrix.spark-version.full }} spark-short-version: ${{ matrix.spark-version.short }} - comet-version: '0.5.0-SNAPSHOT' # TODO: get this from pom.xml + comet-version: '0.6.0-SNAPSHOT' # TODO: get this from pom.xml - name: Run Spark tests run: | cd apache-spark diff --git a/.github/workflows/spark_sql_test_ansi.yml b/.github/workflows/spark_sql_test_ansi.yml index e1d8388fb1..14ec6366f4 100644 --- a/.github/workflows/spark_sql_test_ansi.yml +++ b/.github/workflows/spark_sql_test_ansi.yml @@ -69,7 +69,7 @@ jobs: with: spark-version: ${{ matrix.spark-version.full }} spark-short-version: ${{ matrix.spark-version.short }} - comet-version: '0.5.0-SNAPSHOT' # TODO: get this from pom.xml + comet-version: '0.6.0-SNAPSHOT' # TODO: get this from pom.xml - name: Run Spark tests run: | cd apache-spark diff --git a/common/pom.xml b/common/pom.xml index 91109edf5d..b6cd75a32d 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -26,7 +26,7 @@ under the License. org.apache.datafusion comet-parent-spark${spark.version.short}_${scala.binary.version} - 0.5.0-SNAPSHOT + 0.6.0-SNAPSHOT ../pom.xml diff --git a/common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java b/common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java new file mode 100644 index 0000000000..8794902b48 --- /dev/null +++ b/common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector; + +/** + * An interface could be implemented by vectors that have row id mapping. + * + *

For example, Iceberg's DeleteFile has a row id mapping to map row id to position. This + * interface is used to set and get the row id mapping. The row id mapping is an array of integers, + * where the index is the row id and the value is the position. Here is an example: + * [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array Position delete 2, 6 + * [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] + */ +public interface HasRowIdMapping { + default void setRowIdMapping(int[] rowIdMapping) { + throw new UnsupportedOperationException("setRowIdMapping is not supported"); + } + + default int[] getRowIdMapping() { + throw new UnsupportedOperationException("getRowIdMapping is not supported"); + } +} diff --git a/docs/source/contributor-guide/benchmark-results/tpc-ds.md b/docs/source/contributor-guide/benchmark-results/tpc-ds.md index a6650f7e74..012913189a 100644 --- a/docs/source/contributor-guide/benchmark-results/tpc-ds.md +++ b/docs/source/contributor-guide/benchmark-results/tpc-ds.md @@ -19,8 +19,8 @@ under the License. # Apache DataFusion Comet: Benchmarks Derived From TPC-DS -The following benchmarks were performed on a two node Kubernetes cluster with -data stored locally in Parquet format on NVMe storage. Performance characteristics will vary in different environments +The following benchmarks were performed on a Linux workstation with PCIe 5, AMD 7950X CPU (16 cores), 128 GB RAM, and +data stored locally in Parquet format on NVMe storage. Performance characteristics will vary in different environments and we encourage you to run these benchmarks in your own environments. The tracking issue for improving TPC-DS performance is [#858](https://github.com/apache/datafusion-comet/issues/858). @@ -43,3 +43,64 @@ The raw results of these benchmarks in JSON format is available here: - [Spark](0.5.0/spark-tpcds.json) - [Comet](0.5.0/comet-tpcds.json) + +# Scripts + +Here are the scripts that were used to generate these results. + +## Apache Spark + +```shell +#!/bin/bash +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --conf spark.driver.memory=8G \ + --conf spark.executor.memory=32G \ + --conf spark.executor.instances=2 \ + --conf spark.executor.cores=8 \ + --conf spark.cores.max=16 \ + --conf spark.eventLog.enabled=true \ + tpcbench.py \ + --benchmark tpcds \ + --name spark \ + --data /mnt/bigdata/tpcds/sf100/ \ + --queries ../../tpcds/ \ + --output . \ + --iterations 5 +``` + +## Apache Spark + Comet + +```shell +#!/bin/bash +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --conf spark.driver.memory=8G \ + --conf spark.executor.instances=2 \ + --conf spark.executor.memory=16G \ + --conf spark.executor.cores=8 \ + --total-executor-cores=16 \ + --conf spark.eventLog.enabled=true \ + --conf spark.driver.maxResultSize=2G \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=24g \ + --jars $COMET_JAR \ + --conf spark.driver.extraClassPath=$COMET_JAR \ + --conf spark.executor.extraClassPath=$COMET_JAR \ + --conf spark.plugins=org.apache.spark.CometPlugin \ + --conf spark.comet.enabled=true \ + --conf spark.comet.cast.allowIncompatible=true \ + --conf spark.comet.exec.replaceSortMergeJoin=false \ + --conf spark.comet.exec.shuffle.enabled=true \ + --conf spark.comet.exec.shuffle.mode=auto \ + --conf spark.comet.exec.shuffle.fallbackToColumnar=true \ + --conf spark.comet.exec.shuffle.compression.codec=lz4 \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + tpcbench.py \ + --name comet \ + --benchmark tpcds \ + --data /mnt/bigdata/tpcds/sf100/ \ + --queries ../../tpcds/ \ + --output . \ + --iterations 5 +``` \ No newline at end of file diff --git a/docs/source/contributor-guide/benchmark-results/tpc-h.md b/docs/source/contributor-guide/benchmark-results/tpc-h.md index 336deb7a7c..d383cae852 100644 --- a/docs/source/contributor-guide/benchmark-results/tpc-h.md +++ b/docs/source/contributor-guide/benchmark-results/tpc-h.md @@ -25,21 +25,84 @@ and we encourage you to run these benchmarks in your own environments. The tracking issue for improving TPC-H performance is [#391](https://github.com/apache/datafusion-comet/issues/391). -![](../../_static/images/benchmark-results/0.5.0-SNAPSHOT-2025-01-09/tpch_allqueries.png) +![](../../_static/images/benchmark-results/0.5.0/tpch_allqueries.png) Here is a breakdown showing relative performance of Spark and Comet for each query. -![](../../_static/images/benchmark-results/0.5.0-SNAPSHOT-2025-01-09/tpch_queries_compare.png) +![](../../_static/images/benchmark-results/0.5.0/tpch_queries_compare.png) The following chart shows how much Comet currently accelerates each query from the benchmark in relative terms. -![](../../_static/images/benchmark-results/0.5.0-SNAPSHOT-2025-01-09/tpch_queries_speedup_rel.png) +![](../../_static/images/benchmark-results/0.5.0/tpch_queries_speedup_rel.png) The following chart shows how much Comet currently accelerates each query from the benchmark in absolute terms. -![](../../_static/images/benchmark-results/0.5.0-SNAPSHOT-2025-01-09/tpch_queries_speedup_abs.png) +![](../../_static/images/benchmark-results/0.5.0/tpch_queries_speedup_abs.png) The raw results of these benchmarks in JSON format is available here: - [Spark](0.5.0/spark-tpch.json) - [Comet](0.5.0/comet-tpch.json) + +# Scripts + +Here are the scripts that were used to generate these results. + +## Apache Spark + +```shell +#!/bin/bash +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --conf spark.driver.memory=8G \ + --conf spark.executor.instances=1 \ + --conf spark.executor.cores=8 \ + --conf spark.cores.max=8 \ + --conf spark.executor.memory=16g \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=16g \ + --conf spark.eventLog.enabled=true \ + tpcbench.py \ + --name spark \ + --benchmark tpch \ + --data /mnt/bigdata/tpch/sf100/ \ + --queries ../../tpch/queries \ + --output . \ + --iterations 5 + +``` + +## Apache Spark + Comet + +```shell +#!/bin/bash +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --conf spark.driver.memory=8G \ + --conf spark.executor.instances=1 \ + --conf spark.executor.cores=8 \ + --conf spark.cores.max=8 \ + --conf spark.executor.memory=16g \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=16g \ + --conf spark.comet.exec.replaceSortMergeJoin=true \ + --conf spark.eventLog.enabled=true \ + --jars $COMET_JAR \ + --driver-class-path $COMET_JAR \ + --conf spark.driver.extraClassPath=$COMET_JAR \ + --conf spark.executor.extraClassPath=$COMET_JAR \ + --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ + --conf spark.comet.enabled=true \ + --conf spark.comet.exec.shuffle.enabled=true \ + --conf spark.comet.exec.shuffle.mode=auto \ + --conf spark.comet.exec.shuffle.fallbackToColumnar=true \ + --conf spark.comet.exec.shuffle.compression.codec=lz4 \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + tpcbench.py \ + --name comet \ + --benchmark tpch \ + --data /mnt/bigdata/tpch/sf100/ \ + --queries ../../tpch/queries \ + --output . \ + --iterations 5 +``` \ No newline at end of file diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md index 173d598ac2..e2372b3d66 100644 --- a/docs/source/contributor-guide/benchmarking.md +++ b/docs/source/contributor-guide/benchmarking.md @@ -24,62 +24,6 @@ benchmarking documentation and scripts are available in the [DataFusion Benchmar We also have many micro benchmarks that can be run from an IDE located [here](https://github.com/apache/datafusion-comet/tree/main/spark/src/test/scala/org/apache/spark/sql/benchmark). -Here are example commands for running the benchmarks against a Spark cluster. This command will need to be -adapted based on the Spark environment and location of data files. - -These commands are intended to be run from the `runners/datafusion-comet` directory in the `datafusion-benchmarks` -repository. - -## Running Benchmarks Against Apache Spark - -```shell -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.memory=32G \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - tpcbench.py \ - --benchmark tpch \ - --data /mnt/bigdata/tpch/sf100/ \ - --queries ../../tpch/queries \ - --iterations 3 -``` - -## Running Benchmarks Against Apache Spark with Apache DataFusion Comet Enabled - -### TPC-H - -```shell -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.memory=16G \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --jars $COMET_JAR \ - --conf spark.driver.extraClassPath=$COMET_JAR \ - --conf spark.executor.extraClassPath=$COMET_JAR \ - --conf spark.plugins=org.apache.spark.CometPlugin \ - --conf spark.comet.cast.allowIncompatible=true \ - --conf spark.comet.exec.replaceSortMergeJoin=true \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.exec.shuffle.enabled=true \ - --conf spark.comet.exec.shuffle.mode=auto \ - --conf spark.comet.exec.shuffle.enableFastEncoding=true \ - --conf spark.comet.exec.shuffle.fallbackToColumnar=true \ - --conf spark.comet.exec.shuffle.compression.codec=lz4 \ - tpcbench.py \ - --benchmark tpch \ - --data /mnt/bigdata/tpch/sf100/ \ - --queries ../../tpch/queries \ - --iterations 3 -``` - ### TPC-DS For TPC-DS, use `spark.comet.exec.replaceSortMergeJoin=false`. diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index 47d1f04c87..8a368cca26 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -130,7 +130,7 @@ Then build the Comet as [described](https://github.com/apache/arrow-datafusion-c Start Comet with `RUST_BACKTRACE=1` ```console -RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.5.0-SNAPSHOT.jar --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true +RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.4_2.12-0.6.0-SNAPSHOT.jar --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true ``` Get the expanded exception details diff --git a/docs/source/user-guide/installation.md b/docs/source/user-guide/installation.md index 22d482e475..390c926387 100644 --- a/docs/source/user-guide/installation.md +++ b/docs/source/user-guide/installation.md @@ -74,7 +74,7 @@ See the [Comet Kubernetes Guide](kubernetes.md) guide. Make sure `SPARK_HOME` points to the same Spark version as Comet was built for. ```console -export COMET_JAR=spark/target/comet-spark-spark3.4_2.12-0.5.0-SNAPSHOT.jar +export COMET_JAR=spark/target/comet-spark-spark3.4_2.12-0.6.0-SNAPSHOT.jar $SPARK_HOME/bin/spark-shell \ --jars $COMET_JAR \ @@ -130,7 +130,7 @@ explicitly contain Comet otherwise Spark may use a different class-loader for th components which will then fail at runtime. For example: ``` ---driver-class-path spark/target/comet-spark-spark3.4_2.12-0.5.0-SNAPSHOT.jar +--driver-class-path spark/target/comet-spark-spark3.4_2.12-0.6.0-SNAPSHOT.jar ``` Some cluster managers may require additional configuration, see diff --git a/fuzz-testing/pom.xml b/fuzz-testing/pom.xml index 2184e54eea..0b45025c67 100644 --- a/fuzz-testing/pom.xml +++ b/fuzz-testing/pom.xml @@ -25,7 +25,7 @@ under the License. org.apache.datafusion comet-parent-spark${spark.version.short}_${scala.binary.version} - 0.5.0-SNAPSHOT + 0.6.0-SNAPSHOT ../pom.xml diff --git a/native/Cargo.lock b/native/Cargo.lock index 7b00b7bc49..918f94a254 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -878,7 +878,7 @@ dependencies = [ [[package]] name = "datafusion-comet" -version = "0.5.0" +version = "0.6.0" dependencies = [ "arrow", "arrow-array", @@ -929,7 +929,7 @@ dependencies = [ [[package]] name = "datafusion-comet-proto" -version = "0.5.0" +version = "0.6.0" dependencies = [ "prost 0.12.6", "prost-build", @@ -937,7 +937,7 @@ dependencies = [ [[package]] name = "datafusion-comet-spark-expr" -version = "0.5.0" +version = "0.6.0" dependencies = [ "arrow", "arrow-array", diff --git a/native/Cargo.toml b/native/Cargo.toml index 72e2386bbb..624d63ad2d 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -20,7 +20,7 @@ members = ["core", "spark-expr", "proto"] resolver = "2" [workspace.package] -version = "0.5.0" +version = "0.6.0" homepage = "https://datafusion.apache.org/comet" repository = "https://github.com/apache/datafusion-comet" authors = ["Apache DataFusion "] @@ -48,8 +48,8 @@ datafusion-expr-common = { version = "44.0.0", default-features = false } datafusion-execution = { version = "44.0.0", default-features = false } datafusion-physical-plan = { version = "44.0.0", default-features = false } datafusion-physical-expr = { version = "44.0.0", default-features = false } -datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" } -datafusion-comet-proto = { path = "proto", version = "0.5.0" } +datafusion-comet-spark-expr = { path = "spark-expr", version = "0.6.0" } +datafusion-comet-proto = { path = "proto", version = "0.6.0" } chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.8" } futures = "0.3.28" diff --git a/pom.xml b/pom.xml index 76e2288ccc..4559d67412 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ under the License. org.apache.datafusion comet-parent-spark${spark.version.short}_${scala.binary.version} - 0.5.0-SNAPSHOT + 0.6.0-SNAPSHOT pom Comet Project Parent POM diff --git a/spark-integration/pom.xml b/spark-integration/pom.xml index 84c09c1c97..24b1f7a002 100644 --- a/spark-integration/pom.xml +++ b/spark-integration/pom.xml @@ -26,7 +26,7 @@ under the License. org.apache.datafusion comet-parent-spark${spark.version.short}_${scala.binary.version} - 0.5.0-SNAPSHOT + 0.6.0-SNAPSHOT ../pom.xml diff --git a/spark/pom.xml b/spark/pom.xml index ad7590dbc4..f15b0b2e8f 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -26,7 +26,7 @@ under the License. org.apache.datafusion comet-parent-spark${spark.version.short}_${scala.binary.version} - 0.5.0-SNAPSHOT + 0.6.0-SNAPSHOT ../pom.xml diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index c9d8ce55b1..addf737069 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -190,7 +190,7 @@ class CometSparkSessionExtensions // data source V1 case scanExec @ FileSourceScanExec( - HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _), + HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _), _: Seq[_], requiredSchema, _, @@ -199,7 +199,8 @@ class CometSparkSessionExtensions _, _, _) - if CometNativeScanExec.isSchemaSupported(requiredSchema) + if CometScanExec.isFileFormatSupported(fileFormat) + && CometNativeScanExec.isSchemaSupported(requiredSchema) && CometNativeScanExec.isSchemaSupported(partitionSchema) // TODO we only enable full native scan if COMET_EXEC_ENABLED is enabled // but this is not really what we want .. we currently insert `CometScanExec` @@ -1072,12 +1073,20 @@ class CometSparkSessionExtensions var firstNativeOp = true newPlan.transformDown { case op: CometNativeExec => - if (firstNativeOp) { + val newPlan = if (firstNativeOp) { firstNativeOp = false op.convertBlock() } else { op } + + // If reaching leaf node, reset `firstNativeOp` to true + // because it will start a new block in next iteration. + if (op.children.isEmpty) { + firstNativeOp = true + } + + newPlan case op => firstNativeOp = true op