Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 24 additions & 30 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ index 47679ed7865..9ffbaecb98e 100644
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..88815fd078f 100644
index b14f4a405f6..ab7baf434a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
Expand All @@ -1340,12 +1340,15 @@ index b14f4a405f6..88815fd078f 100644
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -131,7 +132,7 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
@@ -131,7 +132,10 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
- df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
+ df.queryExecution.executedPlan.collectFirst { case p: CometColumnarToRowExec => p }.get
+ df.queryExecution.executedPlan.collectFirst {
+ case p: ColumnarToRowExec => p
+ case p: CometColumnarToRowExec => p
+ }.get
try {
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
Expand Down Expand Up @@ -2790,7 +2793,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index dd55fcfe42c..2702f87c1f1 100644
index dd55fcfe42c..0d66bcccbdc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
Expand All @@ -2814,7 +2817,7 @@ index dd55fcfe42c..2702f87c1f1 100644
}
}

@@ -242,6 +247,38 @@ private[sql] trait SQLTestUtilsBase
@@ -242,6 +247,29 @@ private[sql] trait SQLTestUtilsBase
protected override def _sqlContext: SQLContext = self.spark.sqlContext
}

Expand All @@ -2840,20 +2843,11 @@ index dd55fcfe42c..2702f87c1f1 100644
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
+ v != null && v.toBoolean
+ }
+
+ /**
+ * Whether Spark should apply Comet shuffle optimization. This is only effective when
+ * [[isCometEnabled]] returns true.
+ */
+ protected def isCometShuffleEnabled: Boolean = {
+ val v = System.getenv("ENABLE_COMET_SHUFFLE")
+ v != null && v.toBoolean
+ }
+
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
super.withSQLConf(pairs: _*)(f)
@@ -434,6 +471,8 @@ private[sql] trait SQLTestUtilsBase
@@ -434,6 +462,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
Expand All @@ -2863,10 +2857,10 @@ index dd55fcfe42c..2702f87c1f1 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..59adc094970 100644
index ed2e309fa07..71ba6533c9d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,34 @@ trait SharedSparkSessionBase
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -2879,24 +2873,21 @@ index ed2e309fa07..59adc094970 100644
+ if (!isCometScanOnly) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.comet.exec.all.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ if (enableCometAnsiMode) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ .set("spark.comet.ansi.enabled", "true")
+ }
+
+ if (isCometShuffleEnabled) {
+ conf
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ } else {
+ conf.set("spark.comet.exec.shuffle.enabled", "false")
+ }
+ }
conf.set(
StaticSQLConf.WAREHOUSE_PATH,
Expand Down Expand Up @@ -2951,10 +2942,10 @@ index 1966e1e64fd..cde97a0aafe 100644
spark.sql(
"""
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 07361cfdce9..e40c59a4207 100644
index 07361cfdce9..b4d53dbe900 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -55,25 +55,52 @@ object TestHive
@@ -55,25 +55,55 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
Expand Down Expand Up @@ -3005,10 +2996,13 @@ index 07361cfdce9..e40c59a4207 100644
+ if (v == null || !v.toBoolean) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.comet.exec.all.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ val a = System.getenv("ENABLE_COMET_ANSI_MODE")
Expand Down
54 changes: 24 additions & 30 deletions dev/diffs/3.5.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ index 47679ed7865..9ffbaecb98e 100644
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s }.length == sortAggCount)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..88815fd078f 100644
index b14f4a405f6..ab7baf434a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.QueryTest
Expand All @@ -1365,12 +1365,15 @@ index b14f4a405f6..88815fd078f 100644
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -131,7 +132,7 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
@@ -131,7 +132,10 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
- df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
+ df.queryExecution.executedPlan.collectFirst { case p: CometColumnarToRowExec => p }.get
+ df.queryExecution.executedPlan.collectFirst {
+ case p: ColumnarToRowExec => p
+ case p: CometColumnarToRowExec => p
+ }.get
try {
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
Expand Down Expand Up @@ -2775,7 +2778,7 @@ index abe606ad9c1..2d930b64cca 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index dd55fcfe42c..2702f87c1f1 100644
index dd55fcfe42c..0d66bcccbdc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
Expand All @@ -2799,7 +2802,7 @@ index dd55fcfe42c..2702f87c1f1 100644
}
}

@@ -242,6 +247,38 @@ private[sql] trait SQLTestUtilsBase
@@ -242,6 +247,29 @@ private[sql] trait SQLTestUtilsBase
protected override def _sqlContext: SQLContext = self.spark.sqlContext
}

Expand All @@ -2825,20 +2828,11 @@ index dd55fcfe42c..2702f87c1f1 100644
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
+ v != null && v.toBoolean
+ }
+
+ /**
+ * Whether Spark should apply Comet shuffle optimization. This is only effective when
+ * [[isCometEnabled]] returns true.
+ */
+ protected def isCometShuffleEnabled: Boolean = {
+ val v = System.getenv("ENABLE_COMET_SHUFFLE")
+ v != null && v.toBoolean
+ }
+
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
super.withSQLConf(pairs: _*)(f)
@@ -434,6 +471,8 @@ private[sql] trait SQLTestUtilsBase
@@ -434,6 +462,8 @@ private[sql] trait SQLTestUtilsBase
val schema = df.schema
val withoutFilters = df.queryExecution.executedPlan.transform {
case FilterExec(_, child) => child
Expand All @@ -2848,10 +2842,10 @@ index dd55fcfe42c..2702f87c1f1 100644

spark.internalCreateDataFrame(withoutFilters.execute(), schema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index ed2e309fa07..59adc094970 100644
index ed2e309fa07..71ba6533c9d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,34 @@ trait SharedSparkSessionBase
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -2864,24 +2858,21 @@ index ed2e309fa07..59adc094970 100644
+ if (!isCometScanOnly) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.comet.exec.all.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ if (enableCometAnsiMode) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ .set("spark.comet.ansi.enabled", "true")
+ }
+
+ if (isCometShuffleEnabled) {
+ conf
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ } else {
+ conf.set("spark.comet.exec.shuffle.enabled", "false")
+ }
+ }
conf.set(
StaticSQLConf.WAREHOUSE_PATH,
Expand Down Expand Up @@ -2936,10 +2927,10 @@ index dc8b184fcee..dd69a989d40 100644
spark.sql(
"""
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9284b35fb3e..2a0269bdc16 100644
index 9284b35fb3e..37f91610500 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -53,25 +53,52 @@ object TestHive
@@ -53,25 +53,55 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
Expand Down Expand Up @@ -2990,10 +2981,13 @@ index 9284b35fb3e..2a0269bdc16 100644
+ if (v == null || !v.toBoolean) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.comet.exec.all.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ val a = System.getenv("ENABLE_COMET_ANSI_MODE")
Expand Down
Loading
Loading