Skip to content

Commit ff2b5a4

Browse files
author
Davies Liu
committed
improve readability by @rxin
1 parent e1fd87d commit ff2b5a4

1 file changed

Lines changed: 74 additions & 38 deletions

File tree

  • sql/core/src/main/scala/org/apache/spark/sql/execution

sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala

Lines changed: 74 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -96,69 +96,105 @@ case class Expand(
9696
}
9797

9898
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
99-
// Some columns have the same expression in all the projections, so collect the unique
100-
// expressions.
101-
val columnUniqueExpressions: IndexedSeq[Set[Expression]] = output.indices.map { i =>
102-
projections.map(p => p(i)).toSet
103-
}
104-
105-
// Create the variables for output
99+
/*
100+
* When the projections list looks like:
101+
* expr1A, exprB, expr1C
102+
* expr2A, exprB, expr2C
103+
* ...
104+
* expr(N-1)A, exprB, expr(N-1)C
105+
*
106+
* i.e. column A and C have different values for each output row, but column B stays constant.
107+
*
108+
* The generated code looks something like (note that B is only computed once in declaration):
109+
*
110+
* // part 1: declare all the columns
111+
* colA = ...
112+
* colB = ...
113+
* colC = ...
114+
*
115+
* // part 2: code that computes the columns
116+
* for (row = 0; row < N; row++) {
117+
* switch (row) {
118+
* case 0:
119+
* colA = ...
120+
* colC = ...
121+
* case 1:
122+
* colA = ...
123+
* colC = ...
124+
* ...
125+
* case N - 1:
126+
* colA = ...
127+
* colC = ...
128+
* }
129+
* // increment metrics and consume output values
130+
* }
131+
*
132+
* We use a for loop here so we only includes one copy of the consume code and avoid code
133+
* size explosion.
134+
*/
135+
136+
// Set input variables
106137
ctx.currentVars = input
107-
val resultVars = columnUniqueExpressions.zipWithIndex.map { case (exprs, i) =>
108-
val firstExpr = exprs.head
109-
if (exprs.size == 1) {
110-
// The value of this column will not change, use the variables directly.
138+
139+
// Tracks whether a column has the same output for all rows.
140+
// Size of sameOutput array should equal N.
141+
// If sameOutput(i) is true, then the i-th column has the same value for all output rows given
142+
// an input row.
143+
val sameOutput: Array[Boolean] = output.indices.map { colIndex =>
144+
projections.map(p => p(colIndex)).toSet.size == 1
145+
}.toArray
146+
147+
// Part 1: declare variables for each column
148+
// If a column has the same value for all output rows, then we also generate its computation
149+
// right after declaration. Otherwise its value is computed in the part 2.
150+
val outputColumns = output.indices.map { col =>
151+
val firstExpr = projections.head(col)
152+
if (sameOutput(col)) {
153+
// This column is the same across all output rows. Just generate code for it here.
111154
BindReferences.bindReference(firstExpr, child.output).gen(ctx)
112155
} else {
113-
// The value of this column will change, so create new variables for them, because they
114-
// could be constants in some expressions.
115156
val isNull = ctx.freshName("isNull")
116157
val value = ctx.freshName("value")
117158
val code = s"""
118-
|boolean $isNull = true;
119-
|${ctx.javaType(firstExpr.dataType)} $value = ${ctx.defaultValue(firstExpr.dataType)};
159+
|boolean $isNull = true;
160+
|${ctx.javaType(firstExpr.dataType)} $value = ${ctx.defaultValue(firstExpr.dataType)};
120161
""".stripMargin
121162
ExprCode(code, isNull, value)
122163
}
123164
}
124165

125-
// The source code returned by `consume()` could be huge, we can't call `consume()` for each of
126-
// the projections, otherwise the generated code will have lots of duplicated codes. Instead,
127-
// we should generate a loop for all the projections, use switch/case to select a projection
128-
// based on the loop index, then only call `consume()` once.
129-
//
130-
// These output variables will be created before the loop, their values will be updated in
131-
// switch/case inside the loop.
132-
val cases = projections.zipWithIndex.map { case (exprs, i) =>
133-
val changes: Seq[(Expression, Int)] = exprs.zipWithIndex.filter { case (e, j) =>
134-
// the column with single unique expression does not need to be updated inside the loop.
135-
columnUniqueExpressions(j).size > 1
136-
}
137-
val updates = changes.map { case (e, j) =>
138-
val ev = BindReferences.bindReference(e, child.output).gen(ctx)
139-
s"""
140-
|${ev.code}
141-
|${resultVars(j).isNull} = ${ev.isNull};
142-
|${resultVars(j).value} = ${ev.value};
143-
""".stripMargin
166+
// Part 2: switch/case statements
167+
val cases = projections.zipWithIndex.map { case (exprs, row) =>
168+
var updateCode = ""
169+
for (col <- exprs.indices) {
170+
if (!sameOutput(col)) {
171+
val ev = BindReferences.bindReference(exprs(col), child.output).gen(ctx)
172+
updateCode +=
173+
s"""
174+
|${ev.code}
175+
|${outputColumns(col).isNull} = ${ev.isNull};
176+
|${outputColumns(col).value} = ${ev.value};
177+
""".stripMargin
178+
}
144179
}
180+
145181
s"""
146-
|case $i:
147-
| ${updates.mkString("\n").trim}
182+
|case $row:
183+
| ${updateCode.trim}
148184
| break;
149185
""".stripMargin
150186
}
151187

152188
val numOutput = metricTerm(ctx, "numOutputRows")
153189
val i = ctx.freshName("i")
154190
s"""
155-
|${resultVars.map(_.code).mkString("\n").trim}
191+
|${outputColumns.map(_.code).mkString("\n").trim}
156192
|for (int $i = 0; $i < ${projections.length}; $i ++) {
157193
| switch ($i) {
158194
| ${cases.mkString("\n").trim}
159195
| }
160196
| $numOutput.add(1);
161-
| ${consume(ctx, resultVars)}
197+
| ${consume(ctx, outputColumns)}
162198
|}
163199
""".stripMargin
164200
}

0 commit comments

Comments
 (0)