Skip to content

Commit a34dc4d

Browse files
a0x8oEnricoMi
andcommitted
[SPARK-39529][INFRA] Refactor and merge all related job selection logic into precondition
### What changes were proposed in this pull request? This PR borrows the idea from apache/spark#36928 but adds some more changes in order for scheduled jobs to share the `precondition` so all conditional logic is consolidated here. This PR also adds a new option to `is-changed.py` so dependent modules can be checked together. In this way, we don't have to change `build_and_test.yml` often when we add a new module. In addition, this PR removes `type` because `precondition` job now replaces it. Lastly, this PR enables PySpark, SparkR TPC-DS and Docker integration tests for scheduled jobs when applicable. Closes #36928 ### Why are the changes needed? To make it easier to read. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Tested locally and in my fork (https://github.com/HyukjinKwon/spark/actions) Closes #36940 from HyukjinKwon/SPARK-39529. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 81ba5ab commit a34dc4d

12 files changed

Lines changed: 218 additions & 84 deletions

File tree

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
144144
assert(tempDir.list().size === 1)
145145
}
146146

147-
test("If commit fails, if task is retried it should not be locked, and will succeed.") {
147+
ignore("If commit fails, if task is retried it should not be locked, and will succeed.") {
148148
val rdd = sc.parallelize(Seq(1), 1)
149149
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).failFirstCommitAttempt _,
150150
rdd.partitions.indices)

mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ class BinaryClassificationMetrics @Since("3.0.0") (
4545
@Since("1.3.0") val scoreAndLabels: RDD[_ <: Product],
4646
@Since("1.3.0") val numBins: Int = 1000)
4747
extends Logging {
48+
49+
@deprecated("The variable `scoreLabelsWeight` should be private and " +
50+
"will be removed in 4.0.0.", "3.4.0")
4851
val scoreLabelsWeight: RDD[(Double, (Double, Double))] = scoreAndLabels.map {
4952
case (prediction: Double, label: Double, weight: Double) =>
5053
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1218,7 +1218,7 @@
12181218
<dependency>
12191219
<groupId>mysql</groupId>
12201220
<artifactId>mysql-connector-java</artifactId>
1221-
<version>8.0.27</version>
1221+
<version>8.0.29</version>
12221222
<scope>test</scope>
12231223
</dependency>
12241224
<dependency>

sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@
1717

1818
package org.apache.spark.sql.catalyst.util
1919

20-
import org.apache.spark.sql.catalyst.expressions.{Abs, Add, And, BinaryComparison, BinaryOperator, BitwiseAnd, BitwiseNot, BitwiseOr, BitwiseXor, CaseWhen, Cast, Ceil, Coalesce, Contains, Divide, EndsWith, EqualTo, Exp, Expression, Floor, In, InSet, IsNotNull, IsNull, Literal, Log, Lower, Multiply, Not, Or, Overlay, Pow, Predicate, Remainder, Sqrt, StartsWith, StringPredicate, StringTranslate, StringTrim, StringTrimLeft, StringTrimRight, Substring, Subtract, UnaryMinus, Upper, WidthBucket}
20+
import org.apache.spark.sql.catalyst.expressions._
2121
import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, LiteralValue}
2222
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate}
23-
import org.apache.spark.sql.execution.datasources.PushableColumn
2423
import org.apache.spark.sql.types.BooleanType
2524

2625
/**
2726
* The builder to generate V2 expressions from catalyst expressions.
2827
*/
29-
class V2ExpressionBuilder(
30-
e: Expression, nestedPredicatePushdownEnabled: Boolean = false, isPredicate: Boolean = false) {
31-
32-
val pushableColumn = PushableColumn(nestedPredicatePushdownEnabled)
28+
class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) {
3329

3430
def build(): Option[V2Expression] = generateExpression(e, isPredicate)
3531

@@ -49,12 +45,8 @@ class V2ExpressionBuilder(
4945
case Literal(true, BooleanType) => Some(new AlwaysTrue())
5046
case Literal(false, BooleanType) => Some(new AlwaysFalse())
5147
case Literal(value, dataType) => Some(LiteralValue(value, dataType))
52-
case col @ pushableColumn(name) =>
53-
val ref = if (nestedPredicatePushdownEnabled) {
54-
FieldReference(name)
55-
} else {
56-
FieldReference.column(name)
57-
}
48+
case col @ ColumnOrField(nameParts) =>
49+
val ref = FieldReference(nameParts)
5850
if (isPredicate && col.dataType.isInstanceOf[BooleanType]) {
5951
Some(new V2Predicate("=", Array(ref, LiteralValue(true, BooleanType))))
6052
} else {
@@ -266,3 +258,12 @@ class V2ExpressionBuilder(
266258
case _ => None
267259
}
268260
}
261+
262+
object ColumnOrField {
263+
def unapply(e: Expression): Option[Seq[String]] = e match {
264+
case a: Attribute => Some(Seq(a.name))
265+
case s: GetStructField =>
266+
unapply(s.child).map(_ :+ s.childSchema(s.ordinal).name)
267+
case _ => None
268+
}
269+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ case class AdaptiveSparkPlanExec(
118118
Seq(
119119
RemoveRedundantProjects,
120120
ensureRequirements,
121+
ValidateSparkPlan,
121122
ReplaceHashWithSortAgg,
122123
RemoveRedundantSorts,
123124
DisableUnnecessaryBucketedScan,
@@ -301,17 +302,20 @@ case class AdaptiveSparkPlanExec(
301302
// plans are updated, we can clear the query stage list because at this point the two plans
302303
// are semantically and physically in sync again.
303304
val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
304-
val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
305-
val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
306-
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
307-
if (newCost < origCost ||
305+
val afterReOptimize = reOptimize(logicalPlan)
306+
if (afterReOptimize.isDefined) {
307+
val (newPhysicalPlan, newLogicalPlan) = afterReOptimize.get
308+
val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
309+
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
310+
if (newCost < origCost ||
308311
(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
309-
logOnLevel("Plan changed:\n" +
310-
sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n"))
311-
cleanUpTempTags(newPhysicalPlan)
312-
currentPhysicalPlan = newPhysicalPlan
313-
currentLogicalPlan = newLogicalPlan
314-
stagesToReplace = Seq.empty[QueryStageExec]
312+
logOnLevel("Plan changed:\n" +
313+
sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n"))
314+
cleanUpTempTags(newPhysicalPlan)
315+
currentPhysicalPlan = newPhysicalPlan
316+
currentLogicalPlan = newLogicalPlan
317+
stagesToReplace = Seq.empty[QueryStageExec]
318+
}
315319
}
316320
// Now that some stages have finished, we can try creating new stages.
317321
result = createQueryStages(currentPhysicalPlan)
@@ -641,29 +645,35 @@ case class AdaptiveSparkPlanExec(
641645
/**
642646
* Re-optimize and run physical planning on the current logical plan based on the latest stats.
643647
*/
644-
private def reOptimize(logicalPlan: LogicalPlan): (SparkPlan, LogicalPlan) = {
645-
logicalPlan.invalidateStatsCache()
646-
val optimized = optimizer.execute(logicalPlan)
647-
val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
648-
val newPlan = applyPhysicalRules(
649-
sparkPlan,
650-
preprocessingRules ++ queryStagePreparationRules,
651-
Some((planChangeLogger, "AQE Replanning")))
652-
653-
// When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will
654-
// add the `BroadcastExchangeExec` node manually in the DPP subquery,
655-
// not through `EnsureRequirements` rule. Therefore, when the DPP subquery is complicated
656-
// and need to be re-optimized, AQE also need to manually insert the `BroadcastExchangeExec`
657-
// node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
658-
// Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan
659-
// is already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
660-
val finalPlan = currentPhysicalPlan match {
661-
case b: BroadcastExchangeLike
662-
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan))
663-
case _ => newPlan
664-
}
648+
private def reOptimize(logicalPlan: LogicalPlan): Option[(SparkPlan, LogicalPlan)] = {
649+
try {
650+
logicalPlan.invalidateStatsCache()
651+
val optimized = optimizer.execute(logicalPlan)
652+
val sparkPlan = context.session.sessionState.planner.plan(ReturnAnswer(optimized)).next()
653+
val newPlan = applyPhysicalRules(
654+
sparkPlan,
655+
preprocessingRules ++ queryStagePreparationRules,
656+
Some((planChangeLogger, "AQE Replanning")))
657+
658+
// When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will
659+
// add the `BroadcastExchangeExec` node manually in the DPP subquery,
660+
// not through `EnsureRequirements` rule. Therefore, when the DPP subquery is complicated
661+
// and need to be re-optimized, AQE also need to manually insert the `BroadcastExchangeExec`
662+
// node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
663+
// Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan is
664+
// already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
665+
val finalPlan = currentPhysicalPlan match {
666+
case b: BroadcastExchangeLike
667+
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan))
668+
case _ => newPlan
669+
}
665670

666-
(finalPlan, optimized)
671+
Some((finalPlan, optimized))
672+
} catch {
673+
case e: InvalidAQEPlanException[_] =>
674+
logOnLevel(s"Re-optimize - ${e.getMessage()}:\n${e.plan}")
675+
None
676+
}
667677
}
668678

669679
/**
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.adaptive
19+
20+
import org.apache.spark.sql.catalyst.plans.QueryPlan
21+
22+
/**
23+
* Exception thrown when an invalid query plan is detected in AQE replanning,
24+
* in which case AQE will stop the current replanning process and keep using the latest valid plan.
25+
*
26+
* @param message The reason why the plan is considered invalid.
27+
* @param plan The invalid plan/sub-plan.
28+
*/
29+
case class InvalidAQEPlanException[QueryType <: QueryPlan[_]](message: String, plan: QueryType)
30+
extends Exception(message)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.adaptive
19+
20+
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
21+
import org.apache.spark.sql.catalyst.rules.Rule
22+
import org.apache.spark.sql.execution.SparkPlan
23+
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
24+
25+
/**
26+
* Detects invalid physical plans generated by AQE replanning and throws `InvalidAQEPlanException`
27+
* if such plans are detected. This rule should be called after EnsureRequirements where all
28+
* necessary Exchange nodes are added.
29+
*/
30+
object ValidateSparkPlan extends Rule[SparkPlan] {
31+
32+
def apply(plan: SparkPlan): SparkPlan = {
33+
validate(plan)
34+
plan
35+
}
36+
37+
/**
38+
* Validate that the plan satisfies the following condition:
39+
* - BroadcastQueryStage only appears as the immediate child and the build side of a broadcast
40+
* hash join or broadcast nested loop join.
41+
*/
42+
private def validate(plan: SparkPlan): Unit = plan match {
43+
case b: BroadcastHashJoinExec =>
44+
val (buildPlan, probePlan) = b.buildSide match {
45+
case BuildLeft => (b.left, b.right)
46+
case BuildRight => (b.right, b.left)
47+
}
48+
if (!buildPlan.isInstanceOf[BroadcastQueryStageExec]) {
49+
validate(buildPlan)
50+
}
51+
validate(probePlan)
52+
case b: BroadcastNestedLoopJoinExec =>
53+
val (buildPlan, probePlan) = b.buildSide match {
54+
case BuildLeft => (b.left, b.right)
55+
case BuildRight => (b.right, b.left)
56+
}
57+
if (!buildPlan.isInstanceOf[BroadcastQueryStageExec]) {
58+
validate(buildPlan)
59+
}
60+
validate(probePlan)
61+
case q: BroadcastQueryStageExec => errorOnInvalidBroadcastQueryStage(q)
62+
case _ => plan.children.foreach(validate)
63+
}
64+
65+
private def errorOnInvalidBroadcastQueryStage(plan: SparkPlan): Unit = {
66+
throw InvalidAQEPlanException("Invalid broadcast query stage", plan)
67+
}
68+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -491,12 +491,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
491491

492492
private[sql] object DataSourceV2Strategy {
493493

494-
private def translateLeafNodeFilterV2(
495-
predicate: Expression,
496-
supportNestedPredicatePushdown: Boolean): Option[Predicate] = {
497-
val pushablePredicate = PushablePredicate(supportNestedPredicatePushdown)
494+
private def translateLeafNodeFilterV2(predicate: Expression): Option[Predicate] = {
498495
predicate match {
499-
case pushablePredicate(expr) => Some(expr)
496+
case PushablePredicate(expr) => Some(expr)
500497
case _ => None
501498
}
502499
}
@@ -506,10 +503,8 @@ private[sql] object DataSourceV2Strategy {
506503
*
507504
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
508505
*/
509-
protected[sql] def translateFilterV2(
510-
predicate: Expression,
511-
supportNestedPredicatePushdown: Boolean): Option[Predicate] = {
512-
translateFilterV2WithMapping(predicate, None, supportNestedPredicatePushdown)
506+
protected[sql] def translateFilterV2(predicate: Expression): Option[Predicate] = {
507+
translateFilterV2WithMapping(predicate, None)
513508
}
514509

515510
/**
@@ -523,8 +518,7 @@ private[sql] object DataSourceV2Strategy {
523518
*/
524519
protected[sql] def translateFilterV2WithMapping(
525520
predicate: Expression,
526-
translatedFilterToExpr: Option[mutable.HashMap[Predicate, Expression]],
527-
nestedPredicatePushdownEnabled: Boolean)
521+
translatedFilterToExpr: Option[mutable.HashMap[Predicate, Expression]])
528522
: Option[Predicate] = {
529523
predicate match {
530524
case And(left, right) =>
@@ -538,26 +532,21 @@ private[sql] object DataSourceV2Strategy {
538532
// Pushing one leg of AND down is only safe to do at the top level.
539533
// You can see ParquetFilters' createFilter for more details.
540534
for {
541-
leftFilter <- translateFilterV2WithMapping(
542-
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
543-
rightFilter <- translateFilterV2WithMapping(
544-
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
535+
leftFilter <- translateFilterV2WithMapping(left, translatedFilterToExpr)
536+
rightFilter <- translateFilterV2WithMapping(right, translatedFilterToExpr)
545537
} yield new V2And(leftFilter, rightFilter)
546538

547539
case Or(left, right) =>
548540
for {
549-
leftFilter <- translateFilterV2WithMapping(
550-
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
551-
rightFilter <- translateFilterV2WithMapping(
552-
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
541+
leftFilter <- translateFilterV2WithMapping(left, translatedFilterToExpr)
542+
rightFilter <- translateFilterV2WithMapping(right, translatedFilterToExpr)
553543
} yield new V2Or(leftFilter, rightFilter)
554544

555545
case Not(child) =>
556-
translateFilterV2WithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled)
557-
.map(new V2Not(_))
546+
translateFilterV2WithMapping(child, translatedFilterToExpr).map(new V2Not(_))
558547

559548
case other =>
560-
val filter = translateLeafNodeFilterV2(other, nestedPredicatePushdownEnabled)
549+
val filter = translateLeafNodeFilterV2(other)
561550
if (filter.isDefined && translatedFilterToExpr.isDefined) {
562551
translatedFilterToExpr.get(filter.get) = predicate
563552
}
@@ -589,10 +578,9 @@ private[sql] object DataSourceV2Strategy {
589578
/**
590579
* Get the expression of DS V2 to represent catalyst predicate that can be pushed down.
591580
*/
592-
case class PushablePredicate(nestedPredicatePushdownEnabled: Boolean) {
593-
581+
object PushablePredicate {
594582
def unapply(e: Expression): Option[Predicate] =
595-
new V2ExpressionBuilder(e, nestedPredicatePushdownEnabled, true).build().map { v =>
583+
new V2ExpressionBuilder(e, true).build().map { v =>
596584
assert(v.isInstanceOf[Predicate])
597585
v.asInstanceOf[Predicate]
598586
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ object PushDownUtils extends PredicateHelper {
8080
for (filterExpr <- filters) {
8181
val translated =
8282
DataSourceV2Strategy.translateFilterV2WithMapping(
83-
filterExpr, Some(translatedFilterToExpr), nestedPredicatePushdownEnabled = true)
83+
filterExpr, Some(translatedFilterToExpr))
8484
if (translated.isEmpty) {
8585
untranslatableExprs += filterExpr
8686
} else {

0 commit comments

Comments
 (0)