Skip to content

Commit fe4873e

Browse files
combine plan
1 parent 27f6b5a commit fe4873e

3 files changed

Lines changed: 47 additions & 130 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -300,26 +300,40 @@ case class AdaptiveSparkPlanExec(
300300
maxFields,
301301
printNodeId,
302302
indent)
303-
generateTreeStringWithHeader(
304-
if (isFinalPlan) "Final Plan" else "Current Plan",
305-
currentPhysicalPlan,
306-
depth,
307-
lastChildren,
308-
append,
309-
verbose,
310-
maxFields,
311-
printNodeId)
312-
generateTreeStringWithHeader(
313-
"Initial Plan",
314-
initialPlan,
315-
depth,
316-
lastChildren,
317-
append,
318-
verbose,
319-
maxFields,
320-
printNodeId)
303+
if (currentPhysicalPlan.fastEquals(initialPlan)) {
304+
currentPhysicalPlan.generateTreeString(
305+
depth + 1,
306+
lastChildren :+ true,
307+
append,
308+
verbose,
309+
prefix = "",
310+
addSuffix = false,
311+
maxFields,
312+
printNodeId,
313+
indent)
314+
} else {
315+
generateTreeStringWithHeader(
316+
if (isFinalPlan) "Final Plan" else "Current Plan",
317+
currentPhysicalPlan,
318+
depth,
319+
lastChildren,
320+
append,
321+
verbose,
322+
maxFields,
323+
printNodeId)
324+
generateTreeStringWithHeader(
325+
"Initial Plan",
326+
initialPlan,
327+
depth,
328+
lastChildren,
329+
append,
330+
verbose,
331+
maxFields,
332+
printNodeId)
333+
}
321334
}
322335

336+
323337
private def generateTreeStringWithHeader(
324338
header: String,
325339
plan: SparkPlan,

sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out

Lines changed: 13 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,7 @@ struct<plan:string>
5454
-- !query output
5555
== Physical Plan ==
5656
AdaptiveSparkPlan (8)
57-
+- == Current Plan ==
58-
Sort (7)
59-
+- Exchange (6)
60-
+- HashAggregate (5)
61-
+- Exchange (4)
62-
+- HashAggregate (3)
63-
+- Filter (2)
64-
+- Scan parquet default.explain_temp1 (1)
65-
+- == Initial Plan ==
66-
Sort (7)
57+
+- Sort (7)
6758
+- Exchange (6)
6859
+- HashAggregate (5)
6960
+- Exchange (4)
@@ -126,16 +117,7 @@ struct<plan:string>
126117
-- !query output
127118
== Physical Plan ==
128119
AdaptiveSparkPlan (8)
129-
+- == Current Plan ==
130-
Project (7)
131-
+- Filter (6)
132-
+- HashAggregate (5)
133-
+- Exchange (4)
134-
+- HashAggregate (3)
135-
+- Filter (2)
136-
+- Scan parquet default.explain_temp1 (1)
137-
+- == Initial Plan ==
138-
Project (7)
120+
+- Project (7)
139121
+- Filter (6)
140122
+- HashAggregate (5)
141123
+- Exchange (4)
@@ -196,17 +178,7 @@ struct<plan:string>
196178
-- !query output
197179
== Physical Plan ==
198180
AdaptiveSparkPlan (9)
199-
+- == Current Plan ==
200-
HashAggregate (8)
201-
+- Exchange (7)
202-
+- HashAggregate (6)
203-
+- Union (5)
204-
:- Filter (2)
205-
: +- Scan parquet default.explain_temp1 (1)
206-
+- Filter (4)
207-
+- Scan parquet default.explain_temp1 (3)
208-
+- == Initial Plan ==
209-
HashAggregate (8)
181+
+- HashAggregate (8)
210182
+- Exchange (7)
211183
+- HashAggregate (6)
212184
+- Union (5)
@@ -274,15 +246,7 @@ struct<plan:string>
274246
-- !query output
275247
== Physical Plan ==
276248
AdaptiveSparkPlan (7)
277-
+- == Current Plan ==
278-
BroadcastHashJoin Inner BuildRight (6)
279-
:- Filter (2)
280-
: +- Scan parquet default.explain_temp1 (1)
281-
+- BroadcastExchange (5)
282-
+- Filter (4)
283-
+- Scan parquet default.explain_temp2 (3)
284-
+- == Initial Plan ==
285-
BroadcastHashJoin Inner BuildRight (6)
249+
+- BroadcastHashJoin Inner BuildRight (6)
286250
:- Filter (2)
287251
: +- Scan parquet default.explain_temp1 (1)
288252
+- BroadcastExchange (5)
@@ -337,14 +301,7 @@ struct<plan:string>
337301
-- !query output
338302
== Physical Plan ==
339303
AdaptiveSparkPlan (6)
340-
+- == Current Plan ==
341-
BroadcastHashJoin LeftOuter BuildRight (5)
342-
:- Scan parquet default.explain_temp1 (1)
343-
+- BroadcastExchange (4)
344-
+- Filter (3)
345-
+- Scan parquet default.explain_temp2 (2)
346-
+- == Initial Plan ==
347-
BroadcastHashJoin LeftOuter BuildRight (5)
304+
+- BroadcastHashJoin LeftOuter BuildRight (5)
348305
:- Scan parquet default.explain_temp1 (1)
349306
+- BroadcastExchange (4)
350307
+- Filter (3)
@@ -398,11 +355,7 @@ struct<plan:string>
398355
-- !query output
399356
== Physical Plan ==
400357
AdaptiveSparkPlan (3)
401-
+- == Current Plan ==
402-
Filter (2)
403-
+- Scan parquet default.explain_temp1 (1)
404-
+- == Initial Plan ==
405-
Filter (2)
358+
+- Filter (2)
406359
+- Scan parquet default.explain_temp1 (1)
407360

408361

@@ -438,11 +391,7 @@ struct<plan:string>
438391
-- !query output
439392
== Physical Plan ==
440393
AdaptiveSparkPlan (3)
441-
+- == Current Plan ==
442-
Filter (2)
443-
+- Scan parquet default.explain_temp1 (1)
444-
+- == Initial Plan ==
445-
Filter (2)
394+
+- Filter (2)
446395
+- Scan parquet default.explain_temp1 (1)
447396

448397

@@ -470,11 +419,7 @@ struct<plan:string>
470419
-- !query output
471420
== Physical Plan ==
472421
AdaptiveSparkPlan (3)
473-
+- == Current Plan ==
474-
Project (2)
475-
+- Scan parquet default.explain_temp1 (1)
476-
+- == Initial Plan ==
477-
Project (2)
422+
+- Project (2)
478423
+- Scan parquet default.explain_temp1 (1)
479424

480425

@@ -506,15 +451,7 @@ struct<plan:string>
506451
-- !query output
507452
== Physical Plan ==
508453
AdaptiveSparkPlan (7)
509-
+- == Current Plan ==
510-
BroadcastHashJoin Inner BuildRight (6)
511-
:- Filter (2)
512-
: +- Scan parquet default.explain_temp1 (1)
513-
+- BroadcastExchange (5)
514-
+- Filter (4)
515-
+- Scan parquet default.explain_temp1 (3)
516-
+- == Initial Plan ==
517-
BroadcastHashJoin Inner BuildRight (6)
454+
+- BroadcastHashJoin Inner BuildRight (6)
518455
:- Filter (2)
519456
: +- Scan parquet default.explain_temp1 (1)
520457
+- BroadcastExchange (5)
@@ -572,21 +509,7 @@ struct<plan:string>
572509
-- !query output
573510
== Physical Plan ==
574511
AdaptiveSparkPlan (13)
575-
+- == Current Plan ==
576-
BroadcastHashJoin Inner BuildRight (12)
577-
:- HashAggregate (5)
578-
: +- Exchange (4)
579-
: +- HashAggregate (3)
580-
: +- Filter (2)
581-
: +- Scan parquet default.explain_temp1 (1)
582-
+- BroadcastExchange (11)
583-
+- HashAggregate (10)
584-
+- Exchange (9)
585-
+- HashAggregate (8)
586-
+- Filter (7)
587-
+- Scan parquet default.explain_temp1 (6)
588-
+- == Initial Plan ==
589-
BroadcastHashJoin Inner BuildRight (12)
512+
+- BroadcastHashJoin Inner BuildRight (12)
590513
:- HashAggregate (5)
591514
: +- Exchange (4)
592515
: +- HashAggregate (3)
@@ -710,13 +633,7 @@ struct<plan:string>
710633
-- !query output
711634
== Physical Plan ==
712635
AdaptiveSparkPlan (5)
713-
+- == Current Plan ==
714-
HashAggregate (4)
715-
+- Exchange (3)
716-
+- HashAggregate (2)
717-
+- Scan parquet default.explain_temp1 (1)
718-
+- == Initial Plan ==
719-
HashAggregate (4)
636+
+- HashAggregate (4)
720637
+- Exchange (3)
721638
+- HashAggregate (2)
722639
+- Scan parquet default.explain_temp1 (1)
@@ -761,13 +678,7 @@ struct<plan:string>
761678
-- !query output
762679
== Physical Plan ==
763680
AdaptiveSparkPlan (5)
764-
+- == Current Plan ==
765-
ObjectHashAggregate (4)
766-
+- Exchange (3)
767-
+- ObjectHashAggregate (2)
768-
+- Scan parquet default.explain_temp4 (1)
769-
+- == Initial Plan ==
770-
ObjectHashAggregate (4)
681+
+- ObjectHashAggregate (4)
771682
+- Exchange (3)
772683
+- ObjectHashAggregate (2)
773684
+- Scan parquet default.explain_temp4 (1)
@@ -812,15 +723,7 @@ struct<plan:string>
812723
-- !query output
813724
== Physical Plan ==
814725
AdaptiveSparkPlan (7)
815-
+- == Current Plan ==
816-
SortAggregate (6)
817-
+- Sort (5)
818-
+- Exchange (4)
819-
+- SortAggregate (3)
820-
+- Sort (2)
821-
+- Scan parquet default.explain_temp4 (1)
822-
+- == Initial Plan ==
823-
SortAggregate (6)
726+
+- SortAggregate (6)
824727
+- Sort (5)
825728
+- Exchange (4)
826729
+- SortAggregate (3)

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -842,8 +842,8 @@ class AdaptiveQueryExecSuite
842842
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
843843
val df = sql("SELECT * FROM testData join testData2 ON key = a where value = '1'")
844844
val planBefore = df.queryExecution.executedPlan
845-
assert(planBefore.toString.contains("== Current Plan =="))
846-
assert(planBefore.toString.contains("== Initial Plan =="))
845+
assert(!planBefore.toString.contains("== Current Plan =="))
846+
assert(!planBefore.toString.contains("== Initial Plan =="))
847847
df.collect()
848848
val planAfter = df.queryExecution.executedPlan
849849
assert(planAfter.toString.contains("== Final Plan =="))

0 commit comments

Comments
 (0)