@@ -27,121 +27,78 @@ import org.apache.spark.sql.internal.SQLConf
2727import org.apache.spark.sql.types.DataType
2828
2929/**
30- * This rule tries to merge multiple subplans that have one row result. This can be either the plan
31- * tree of a [[ScalarSubquery]] expression or the plan tree starting at a non-grouping [[Aggregate]]
32- * node.
30+ * This rule tries to merge multiple non-correlated [[ScalarSubquery]]s to compute multiple scalar
31+ * values once.
3332 *
3433 * The process is the following:
35- * - While traversing through the plan each one row returning subplan is tried to merge into already
36- * seen one row returning subplans using `PlanMerger`s.
34+ * - While traversing through the plan each [[ScalarSubquery]] plan is tried to merge into already
35+ * seen subquery plans using `PlanMerger`s.
3736 * During this first traversal each [[ScalarSubquery]] expression is replaced to a temporal
38- * [[ScalarSubqueryReference]] and each non-grouping [[Aggregate]] node is replaced to a temporal
39- * [[NonGroupingAggregateReference]] pointing to its possible merged version in `PlanMerger`s.
40- * `PlanMerger`s keep track of whether a plan is a result of merging 2 or more subplans, or is an
41- * original unmerged plan.
42- * [[ScalarSubqueryReference]]s and [[NonGroupingAggregateReference]]s contain all the required
43- * information to either restore the original subplan or create a reference to a merged CTE.
44- * - Once the first traversal is complete and all possible merging have been done, a second
45- * traversal removes the references to either restore the original subplans or to replace the
46- * original to a modified ones that reference a CTE with a merged plan.
37+ * [[ScalarSubqueryReference]] pointing to its possible merged version stored in `PlanMerger`s.
38+ * `PlanMerger`s keep track of whether a plan is a result of merging 2 or more plans, or is an
39+ * original unmerged plan. [[ScalarSubqueryReference]]s contain all the required information to
40+ * either restore the original [[ScalarSubquery]] or create a reference to a merged CTE.
41+ * - Once the first traversal is complete and all possible merging have been done a second traversal
42+ * removes the [[ScalarSubqueryReference]]s to either restore the original [[ScalarSubquery]] or
43+ * to replace the original to a modified one that references a CTE with a merged plan.
4744 * A modified [[ScalarSubquery]] is constructed like:
48- * `GetStructField(ScalarSubquery(CTERelationRef to the merged plan), merged output index)`
49- * ans a modified [[Aggregate]] is constructed like:
50- * ```
51- * Project(
52- * Seq(
53- * GetStructField(
54- * ScalarSubquery(CTERelationRef to the merged plan),
55- * merged output index 1),
56- * GetStructField(
57- * ScalarSubquery(CTERelationRef to the merged plan),
58- * merged output index 2),
59- * ...),
60- * OneRowRelation)
61- * ```
62- * where `merged output index`s are the index of the output attributes (of the CTE) that
63- * correspond to the output of the original node.
45+ * `GetStructField(ScalarSubquery(CTERelationRef(...)), outputIndex)` where `outputIndex` is the
46+ * index of the output attribute (of the CTE) that corresponds to the output of the original
47+ * subquery.
6448 * - If there are merged subqueries in `PlanMerger`s then a `WithCTE` node is built from these
65- * queries. The `CTERelationDef` nodes contain the merged subplans in the following form:
66- * `Project(Seq(CreateNamedStruct(name1, attribute1, ...) AS mergedValue), mergedSubplan )`.
67- * The definitions are flagged that they host a subplan , that can return maximum one row.
49+ * queries. The `CTERelationDef` nodes contain the merged subquery in the following form:
50+ * `Project(Seq(CreateNamedStruct(name1, attribute1, ...) AS mergedValue), mergedSubqueryPlan )`.
51+ * The definitions are flagged that they host a subquery , that can return maximum one row.
6852 *
69- * Here are a few examples :
53+ * Eg. the following query :
7054 *
71- * 1. a query with 2 subqueries:
72- * ```
73- * Project [scalar-subquery [] AS scalarsubquery(), scalar-subquery [] AS scalarsubquery()]
74- * : :- Aggregate [min(a) AS min(a)]
75- * : : +- Relation [a, b, c]
76- * : +- Aggregate [sum(b) AS sum(b)]
77- * : +- Relation [a, b, c]
55+ * SELECT
56+ * (SELECT avg(a) FROM t),
57+ * (SELECT sum(b) FROM t)
58+ *
59+ * is optimized from:
60+ *
61+ * == Optimized Logical Plan ==
62+ * Project [scalar-subquery#242 [] AS scalarsubquery()#253,
63+ * scalar-subquery#243 [] AS scalarsubquery()#254L]
64+ * : :- Aggregate [avg(a#244) AS avg(a)#247]
65+ * : : +- Project [a#244]
66+ * : : +- Relation default.t[a#244,b#245] parquet
67+ * : +- Aggregate [sum(a#251) AS sum(a)#250L]
68+ * : +- Project [a#251]
69+ * : +- Relation default.t[a#251,b#252] parquet
7870 * +- OneRowRelation
79- * ```
80- * is optimized to:
81- * ```
82- * WithCTE
83- * :- CTERelationDef 0
84- * : +- Project [named_struct(min(a), min(a), sum(b), sum(b)) AS mergedValue]
85- * : +- Aggregate [min(a) AS min(a), sum(b) AS sum(b)]
86- * : +- Relation [a, b, c]
87- * +- Project [scalar-subquery [].min(a) AS scalarsubquery(),
88- * scalar-subquery [].sum(b) AS scalarsubquery()]
89- * : :- CTERelationRef 0
90- * : +- CTERelationRef 0
91- * +- OneRowRelation
92- * ```
9371 *
94- * 2. a query with 2 non-grouping aggregates:
95- * ```
96- * Join Inner
97- * :- Aggregate [min(a) AS min(a)]
98- * : +- Relation [a, b, c]
99- * +- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
100- * +- Relation [a, b, c]
101- * ```
102- * is optimized to:
103- * ```
104- * WithCTE
105- * :- CTERelationDef 0
106- * : +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c)) AS mergedValue]
107- * : +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
108- * : +- Relation [a, b, c]
109- * +- Join Inner
110- * :- Project [scalar-subquery [].min(a) AS min(a)]
111- * : : +- CTERelationRef 0
112- * : +- OneRowRelation
113- * +- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery [].avg(c) AS avg(c)]
114- * : :- CTERelationRef 0
115- * : +- CTERelationRef 0
116- * +- OneRowRelation
117- * ```
72+ * to:
73+ *
74+ * == Optimized Logical Plan ==
75+ * Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253,
76+ * scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L]
77+ * : :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
78+ * : : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
79+ * : : +- Project [a#244]
80+ * : : +- Relation default.t[a#244,b#245] parquet
81+ * : +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
82+ * : +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
83+ * : +- Project [a#244]
84+ * : +- Relation default.t[a#244,b#245] parquet
85+ * +- OneRowRelation
11886 *
119- * 3. a query with a subquery and a non-grouping aggregate:
120- * ```
121- * Join Inner
122- * :- Project [scalar-subquery [] AS scalarsubquery()]
123- * : : +- Aggregate [min(a) AS min(a)]
124- * : : +- Relation [a, b, c]
125- * : +- OneRowRelation
126- * +- Aggregate [sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
127- * +- Relation [a, b, c]
128- * ```
129- * is optimized to:
130- * ```
131- * WithCTE
132- * :- CTERelationDef 0
133- * : +- Project [named_struct(min(a), min(a), sum(b), sum(b), avg(c), avg(c)) AS mergedValue]
134- * : +- Aggregate [min(a) AS min(a), sum(b) AS sum(b), avg(cast(c as double)) AS avg(c)]
135- * : +- Relation [a, b, c]
136- * +- Join Inner
137- * :- Project [scalar-subquery [].min(a) AS scalarsubquery()]
138- * : : +- CTERelationRef 0
139- * : +- OneRowRelation
140- * +- Project [scalar-subquery [].sum(b) AS sum(b), scalar-subquery [].avg(c) AS avg(c)]
141- * : :- CTERelationRef 0
142- * : +- CTERelationRef 0
143- * +- OneRowRelation
144- * ```
87+ * == Physical Plan ==
88+ * *(1) Project [Subquery scalar-subquery#242, [id=#125].avg(a) AS scalarsubquery()#253,
89+ * ReusedSubquery
90+ * Subquery scalar-subquery#242, [id=#125].sum(a) AS scalarsubquery()#254L]
91+ * : :- Subquery scalar-subquery#242, [id=#125]
92+ * : : +- *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
93+ * : : +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)],
94+ * output=[avg(a)#247, sum(a)#250L])
95+ * : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#120]
96+ * : : +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)],
97+ * output=[sum#262, count#263L, sum#264L])
98+ * : : +- *(1) ColumnarToRow
99+ * : : +- FileScan parquet default.t[a#244] ...
100+ * : +- ReusedSubquery Subquery scalar-subquery#242, [id=#125]
101+ * +- *(1) Scan OneRowRelation[]
145102 */
146103object MergeSubplans extends Rule[LogicalPlan] {
147104 def apply(plan: LogicalPlan): LogicalPlan = {
0 commit comments