Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 8 additions & 69 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHash
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, ExtendedExplainInfo}
import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.rules.CometExecRule.allExecs
import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, Unsupported}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.operator._
import org.apache.comet.serde.operator.CometDataWritingCommand

Expand Down Expand Up @@ -206,27 +205,20 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
// broadcast exchange is forced to be enabled by Comet config.
case plan if plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
val newChildren = plan.children.map {
case b: BroadcastExchangeExec
if isCometNative(b.child) &&
CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED.get(conf) =>
operator2Proto(b) match {
case Some(nativeOp) =>
val cometOp = CometBroadcastExchangeExec(b, b.output, b.mode, b.child)
CometSinkPlaceHolder(nativeOp, b, cometOp)
case None => b
}
case b: BroadcastExchangeExec =>
convertToCometIfAllChildrenAreNative(b, CometBroadcastExchangeExec).getOrElse(b)
case other => other
}
if (!newChildren.exists(_.isInstanceOf[BroadcastExchangeExec])) {
val newPlan = convertNode(plan.withNewChildren(newChildren))
if (isCometNative(newPlan) || isCometBroadCastForceEnabled(conf)) {
newPlan
} else {
if (isCometNative(newPlan)) {
val reason =
getCometBroadcastNotEnabledReason(conf).getOrElse("no reason available")
withInfo(plan, s"Broadcast is not enabled: $reason")
}
// copy fallback reasons to the original plan
newPlan
.getTagValue(CometExplainInfo.EXTENSION_INFO)
.foreach(reasons => withInfos(plan, reasons))
// return the original plan
plan
}
} else {
Expand Down Expand Up @@ -457,59 +449,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
}
}

/**
* Fallback for handling sinks that have not been handled explicitly. This method should
* eventually be removed once CometExecRule fully uses the operator serde framework.
*/
private def operator2Proto(op: SparkPlan, childOp: Operator*): Option[Operator] = {

def isCometSink(op: SparkPlan): Boolean = {
op match {
case _: CometSparkToColumnarExec => true
case _: CometSinkPlaceHolder => true
case _ => false
}
}

def isExchangeSink(op: SparkPlan): Boolean = {
op match {
case _: ShuffleExchangeExec => true
case ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) => true
case ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: CometShuffleExchangeExec), _) =>
true
case BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) => true
case BroadcastQueryStageExec(
_,
ReusedExchangeExec(_, _: CometBroadcastExchangeExec),
_) =>
true
case _: BroadcastExchangeExec => true
case _ => false
}
}

val builder = OperatorOuterClass.Operator.newBuilder().setPlanId(op.id)
childOp.foreach(builder.addChildren)

op match {
case op if isExchangeSink(op) =>
CometExchangeSink.convert(op, builder, childOp: _*)

case op if isCometSink(op) =>
CometScanWrapper.convert(op, builder, childOp: _*)

case _ =>
// Emit warning if:
// 1. it is not Spark shuffle operator, which is handled separately
// 2. it is not a Comet operator
if (!op.nodeName.contains("Comet") &&
!op.isInstanceOf[ShuffleExchangeExec]) {
withInfo(op, s"unsupported Spark operator: ${op.nodeName}")
}
None
}
}

/**
* Convert a Spark plan to a Comet plan using the specified serde handler, but only if all
* children are native.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ReusedExchangeExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand All @@ -45,7 +45,9 @@ import org.apache.spark.util.io.ChunkedByteBuffer

import com.google.common.base.Objects

import org.apache.comet.CometRuntimeException
import org.apache.comet.{CometConf, CometRuntimeException, ConfigEntry}
import org.apache.comet.serde.OperatorOuterClass
import org.apache.comet.serde.operator.CometSink
import org.apache.comet.shims.ShimCometBroadcastExchangeExec

/**
Expand Down Expand Up @@ -262,7 +264,24 @@ case class CometBroadcastExchangeExec(
copy(child = newChild)
}

object CometBroadcastExchangeExec {
object CometBroadcastExchangeExec extends CometSink[BroadcastExchangeExec] {

/**
* Exchange data is FFI safe because there is no use of mutable buffers involved.
*
* Source of broadcast exchange batches is ArrowStreamReader.
*/
override def isFfiSafe: Boolean = true

override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
CometConf.COMET_EXEC_BROADCAST_EXCHANGE_ENABLED)

override def createExec(
nativeOp: OperatorOuterClass.Operator,
b: BroadcastExchangeExec): CometNativeExec = {
CometSinkPlaceHolder(nativeOp, b, CometBroadcastExchangeExec(b, b.output, b.mode, b.child))
}

private[comet] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool(
"comet-broadcast-exchange",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ TakeOrderedAndProject
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
: : : :- CometColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ TakeOrderedAndProject
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
: : : :- CometColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Filter
: +- CometColumnarExchange
: +- HashAggregate
: +- Project
: +- BroadcastHashJoin
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
: :- CometColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
Expand Down Expand Up @@ -55,7 +55,7 @@ Filter
+- CometColumnarExchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
:- CometColumnarToRow
: +- CometProject
: +- CometBroadcastHashJoin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Filter
: +- CometColumnarExchange
: +- HashAggregate
: +- Project
: +- BroadcastHashJoin
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
: :- CometColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
Expand Down Expand Up @@ -55,7 +55,7 @@ Filter
+- CometColumnarExchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
:- CometColumnarToRow
: +- CometProject
: +- CometBroadcastHashJoin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Filter
: +- CometColumnarExchange
: +- HashAggregate
: +- Project
: +- BroadcastHashJoin
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
: :- CometColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
Expand Down Expand Up @@ -55,7 +55,7 @@ Filter
+- CometColumnarExchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
:- CometColumnarToRow
: +- CometProject
: +- CometBroadcastHashJoin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Filter
: +- CometColumnarExchange
: +- HashAggregate
: +- Project
: +- BroadcastHashJoin
: +- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
: :- CometColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
Expand Down Expand Up @@ -55,7 +55,7 @@ Filter
+- CometColumnarExchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
+- BroadcastHashJoin [COMET: Comet is not compatible with Spark for case conversion in locale-specific cases. Set spark.comet.caseConversion.enabled=true to enable it anyway.]
:- CometColumnarToRow
: +- CometProject
: +- CometBroadcastHashJoin
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- CometColumnarToRow
: : : : : +- CometHashAggregate
: : : : : +- CometColumnarExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- CometColumnarToRow
: : : : : +- CometHashAggregate
: : : : : +- CometColumnarExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ TakeOrderedAndProject
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
: : : :- CometColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ TakeOrderedAndProject
: :- Project
: : +- Filter
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
: : : :- CometColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ TakeOrderedAndProject
+- HashAggregate
+- Project
+- Filter
+- BroadcastHashJoin
+- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
:- CometColumnarToRow
: +- CometProject
: +- CometBroadcastHashJoin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ TakeOrderedAndProject
+- HashAggregate
+- Project
+- Filter
+- BroadcastHashJoin
+- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
:- CometColumnarToRow
: +- CometProject
: +- CometBroadcastHashJoin
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Project
+- BroadcastNestedLoopJoin
+- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- CometColumnarToRow
: +- CometHashAggregate
: +- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Project
+- BroadcastNestedLoopJoin
+- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- CometColumnarToRow
: +- CometHashAggregate
: +- CometExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TakeOrderedAndProject
: +- BroadcastHashJoin
: :- Project
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported]
: : : :- CometColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TakeOrderedAndProject
: +- BroadcastHashJoin
: :- Project
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported]
: : : :- CometColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ TakeOrderedAndProject
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store
:- Project
: +- BroadcastNestedLoopJoin
: +- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastExchange
: : +- CometColumnarToRow
: : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ TakeOrderedAndProject
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store
:- Project
: +- BroadcastNestedLoopJoin
: +- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastExchange
: : +- CometColumnarToRow
: : +- CometHashAggregate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ HashAggregate
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- BroadcastHashJoin
:- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported]
: :- CometColumnarToRow
: : +- CometHashAggregate
: : +- CometExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ HashAggregate
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- BroadcastHashJoin
:- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported]
: :- CometColumnarToRow
: : +- CometHashAggregate
: : +- CometExchange
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
BroadcastNestedLoopJoin
:- BroadcastNestedLoopJoin
: :- BroadcastNestedLoopJoin
: : :- BroadcastNestedLoopJoin
: : : :- BroadcastNestedLoopJoin
: : : : :- BroadcastNestedLoopJoin
: : : : : :- BroadcastNestedLoopJoin
BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
:- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : : :- BroadcastNestedLoopJoin [COMET: BroadcastNestedLoopJoin is not supported]
: : : : : : :- CometColumnarToRow
: : : : : : : +- CometHashAggregate
: : : : : : : +- CometExchange
Expand Down
Loading
Loading