Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
// This batch rewrites data source plans and should be run after the operator
// optimization batch and before any batches that depend on stats.
Batch("Data Source Rewrite Rules", Once, dataSourceRewriteRules: _*) :+
// This batch rewrites plans after the operator optimization batch and
// before any batches that depend on stats.
Batch("Post Operator Optimization Rules", Once, postOperatorOptimizationRules: _*) :+
// This batch pushes filters and projections into scan nodes. Before this batch, the logical
// plan may contain nodes that do not report stats. Anything that uses stats must run after
// this batch.
Expand Down Expand Up @@ -293,10 +293,10 @@ abstract class Optimizer(catalogManager: CatalogManager)
def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Override to provide additional rules for rewriting data source plans. Such rules will be
* applied after operator optimization rules and before any rules that depend on stats.
* Override to provide additional rules for rewriting plans after operator optimization rules and
* before any rules that depend on stats.
*/
def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil
def postOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
* <li>Analyzer Rules.</li>
* <li>Check Analysis Rules.</li>
* <li>Optimizer Rules.</li>
* <li>Data Source Rewrite Rules.</li>
* <li>Post Operator Optimization Rules.</li>
* <li>Planning Strategies.</li>
* <li>Customized Parser.</li>
* <li>(External) Catalog listeners.</li>
Expand Down Expand Up @@ -200,19 +200,20 @@ class SparkSessionExtensions {
optimizerRules += builder
}

private[this] val dataSourceRewriteRules = mutable.Buffer.empty[RuleBuilder]
private[this] val postOperatorOptimizationRules = mutable.Buffer.empty[RuleBuilder]

private[sql] def buildDataSourceRewriteRules(session: SparkSession): Seq[Rule[LogicalPlan]] = {
dataSourceRewriteRules.map(_.apply(session)).toSeq
private[sql] def buildPostOperatorOptimizationRules(
session: SparkSession): Seq[Rule[LogicalPlan]] = {
postOperatorOptimizationRules.map(_.apply(session)).toSeq
}

/**
* Inject an optimizer `Rule` builder that rewrites data source plans into the [[SparkSession]].
* Inject an optimizer `Rule` builder that rewrites logical plans into the [[SparkSession]].
* The injected rules will be executed after the operator optimization batch and before rules
* that depend on stats.
*/
def injectDataSourceRewriteRule(builder: RuleBuilder): Unit = {
dataSourceRewriteRules += builder
def injectPostOperatorOptimizationRule(builder: RuleBuilder): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't final.

I'll explain my current thinking here. We are going to use this batch to rewrite plans directly after operator optimization. I did not go for planRewriteRules due to the comment made by @dongjoon-hyun and @viirya (please let me know if you don't feel that way anymore). I also did not go for preCBORules as there quite a few things that will happen before CBO like discussed in the original thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I do not like this name. Our catalyst is designed as a query compiler. There does not exist a concept in a query compiler called "Post Operator Optimization". If you talk about this extension point in a Database class, no one understands what it means from the name.

My point is still the same. Here, we are adding the extension point for advanced users to add the rules between the rule-based optimizer (RBO) rules and the cost-based optimizer (CBO) rules.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch/function/rule names in Optimizer.scala is not critical to me. They are private. We can change them whenever we need. However, this API is an external developer API. We should be very careful to name it.

To me, either preCBORules or postRBORules are much better than postOperatorOptimizationRule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, eventually, we should move all the statistics based rules from the optimizer to the planner

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that we don't have a clear separation between RBO and CBO. There are RBO rules before and after the only CBO rule CostBasedJoinReorder.

I think the general idea of this batch is to allow people to inject special optimizer rules that can't be run together with the main operator optimizer batch. It's indeed a Spark specific thing, as the main operator optimizer batch will be run many times until reaching the fixed point, the new batch added here will be run only once.

It's really hard to do the naming here. To match the actual purpose and to be general, how about Phase 2 Optimizer Rules or Run Once Optimizer Rules?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Phase 2 Optimizer Rules or Run Once Optimizer Rules does not explain the location of the batch.

How many batches do we want to add? I am afraid we will add phase 2, 3, 4, .... This is endless. It looks very random. We should not expect the users need to understand/read the source code of our optimizer every new release. It is also fragile.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do share some of the points brought by @cloud-fan. There is no clear separation between CBO and RBO and we run quite some rule-based optimizations after the existing cost-based optimization rule. Also, we need to run this batch before early scan pushdown rules to capture possible rewrites, which, in turn, run before CBO. That’s why preCBORules does not seem to ideally convey what this rule does.

Our existing hook in the extensions API says The injected rules will be executed during the operator optimization batch. That was one of my motivations for the current name as we have resolution and post-hoc resolution rules.

That said, I don’t mean postOperatorOptimizationRules is a perfect name either. It just seems to better reflect the place in the Spark optimizer. I’ll be happy to discuss and iterate further.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be also interested to hear from @hvanhovell and other folks who commented.

postOperatorOptimizationRules += builder
}

private[this] val plannerStrategyBuilders = mutable.Buffer.empty[StrategyBuilder]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ abstract class BaseSessionStateBuilder(
override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
super.earlyScanPushDownRules ++ customEarlyScanPushDownRules

override def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] =
super.dataSourceRewriteRules ++ customDataSourceRewriteRules
override def postOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.postOperatorOptimizationRules ++ customPostOperatorOptimizationRules

override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
Expand All @@ -258,13 +258,13 @@ abstract class BaseSessionStateBuilder(
protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Custom rules for rewriting data source plans to add to the Optimizer. Prefer overriding
* Custom rules for rewriting plans after operator optimization. Prefer overriding
* this instead of creating your own Optimizer.
*
* Note that this may NOT depend on the `optimizer` function.
*/
protected def customDataSourceRewriteRules: Seq[Rule[LogicalPlan]] = {
extensions.buildDataSourceRewriteRules(session)
protected def customPostOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = {
extensions.buildPostOperatorOptimizationRules(session)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
}
}

test("SPARK-33621: inject data source rewrite rule") {
withSession(Seq(_.injectDataSourceRewriteRule(MyRule))) { session =>
assert(session.sessionState.optimizer.dataSourceRewriteRules.contains(MyRule(session)))
test("SPARK-33621: inject post operator optimization rule") {
withSession(Seq(_.injectPostOperatorOptimizationRule(MyRule))) { session =>
assert(session.sessionState.optimizer.postOperatorOptimizationRules.contains(MyRule(session)))
}
}

Expand Down