Skip to content

Commit c0c52dd

Browse files
ulysses-youcloud-fan
authored andcommitted
[SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter
### What changes were proposed in this pull request? Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`. ### Why are the changes needed? It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said. This pr fixes the issue. ### Does this PR introduce _any_ user-facing change? No, not released ### How was this patch tested? Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path. Closes #35998 from ulysses-you/SPARK-32268-FOllOWUP. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 8fab597 commit c0c52dd

2 files changed

Lines changed: 14 additions & 2 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ class SparkOptimizer(
4444
Batch("PartitionPruning", Once,
4545
PartitionPruning) :+
4646
Batch("InjectRuntimeFilter", FixedPoint(1),
47-
InjectRuntimeFilter) :+
47+
InjectRuntimeFilter,
48+
RewritePredicateSubquery) :+
4849
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
4950
PushDownPredicates) :+
5051
Batch("Cleanup filters that cannot be pushed down", Once,

sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ package org.apache.spark.sql
1919

2020
import org.apache.spark.sql.catalyst.expressions.{Alias, BloomFilterMightContain, Literal}
2121
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate}
22-
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan}
22+
import org.apache.spark.sql.catalyst.plans.LeftSemi
23+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan}
2324
import org.apache.spark.sql.internal.SQLConf
2425
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
2526
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -213,6 +214,15 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
213214
super.afterAll()
214215
}
215216

217+
private def ensureLeftSemiJoinExists(plan: LogicalPlan): Unit = {
218+
assert(
219+
plan.find {
220+
case j: Join if j.joinType == LeftSemi => true
221+
case _ => false
222+
}.isDefined
223+
)
224+
}
225+
216226
def checkWithAndWithoutFeatureEnabled(query: String, testSemiJoin: Boolean,
217227
shouldReplace: Boolean): Unit = {
218228
var planDisabled: LogicalPlan = null
@@ -234,6 +244,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
234244
if (shouldReplace) {
235245
val normalizedEnabled = normalizePlan(normalizeExprIds(planEnabled))
236246
val normalizedDisabled = normalizePlan(normalizeExprIds(planDisabled))
247+
ensureLeftSemiJoinExists(planEnabled)
237248
assert(normalizedEnabled != normalizedDisabled)
238249
} else {
239250
comparePlans(planDisabled, planEnabled)

0 commit comments

Comments
 (0)