Skip to content

Commit e0bc621

Browse files
committed
address comments
1 parent 2188b27 commit e0bc621

5 files changed

Lines changed: 13 additions & 8 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
136136
|if ($batch == null) {
137137
| $nextBatchFuncName();
138138
|}
139-
|while ($batch != null$limitNotReachedCond) {
139+
|while ($limitNotReachedCond $batch != null) {
140140
| int $numRows = $batch.numRows();
141141
| int $localEnd = $numRows - $idx;
142142
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
@@ -166,7 +166,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
166166
}
167167
val inputRow = if (needsUnsafeRowConversion) null else row
168168
s"""
169-
|while ($input.hasNext()$limitNotReachedCond) {
169+
|while ($limitNotReachedCond $input.hasNext()) {
170170
| InternalRow $row = (InternalRow) $input.next();
171171
| $numOutputRows.add(1);
172172
| ${consume(ctx, outputVars, inputRow).trim}

sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ case class SortExec(
179179
| $needToSort = false;
180180
| }
181181
|
182-
| while ($sortedIterator.hasNext()$limitNotReachedCond) {
182+
| while ($limitNotReachedCond $sortedIterator.hasNext()) {
183183
| UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next();
184184
| ${consume(ctx, null, outputRow)}
185185
| if (shouldStop()) return;

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ trait CodegenSupport extends SparkPlan {
363363
if (parent.limitNotReachedChecks.isEmpty) {
364364
""
365365
} else {
366-
parent.limitNotReachedChecks.mkString(" && ", " && ", "")
366+
parent.limitNotReachedChecks.mkString("", " && ", " &&")
367367
}
368368
}
369369
}
@@ -402,7 +402,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
402402
forceInline = true)
403403
val row = ctx.freshName("row")
404404
s"""
405-
| while ($input.hasNext()$limitNotReachedCond) {
405+
| while ($limitNotReachedCond $input.hasNext()) {
406406
| InternalRow $row = (InternalRow) $input.next();
407407
| ${consume(ctx, null, row).trim}
408408
| if (shouldStop()) return;

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ case class HashAggregateExec(
712712

713713
def outputFromRegularHashMap: String = {
714714
s"""
715-
|while ($iterTerm.next()$limitNotReachedCond) {
715+
|while ($limitNotReachedCond $iterTerm.next()) {
716716
| UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
717717
| UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
718718
| $outputFunc($keyTerm, $bufferTerm);

sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,11 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
457457
} else {
458458
"// shouldStop check is eliminated"
459459
}
460+
val loopCondition = if (limitNotReachedChecks.isEmpty) {
461+
"true"
462+
} else {
463+
limitNotReachedChecks.mkString(" && ")
464+
}
460465

461466
// An overview of the Range processing.
462467
//
@@ -469,7 +474,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
469474
// with partition start. `batchEnd` tracks the end index of the current batch, initialized
470475
// with `nextIndex`. In the outer loop, we first check if `nextIndex == batchEnd`. If it's true,
471476
// it means the current batch is fully consumed, and we will update `batchEnd` to process the
472-
// next batch. If `batchEnd` reaches partition end, exit the outer loop. finally we enter the
477+
// next batch. If `batchEnd` reaches partition end, exit the outer loop. Finally we enter the
473478
// inner loop. Note that, when we enter inner loop, `nextIndex` must be different from
474479
// `batchEnd`, otherwise we already exit the outer loop.
475480
//
@@ -490,7 +495,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
490495
| $initRangeFuncName(partitionIndex);
491496
| }
492497
|
493-
| while (true$limitNotReachedCond) {
498+
| while ($loopCondition) {
494499
| if ($nextIndex == $batchEnd) {
495500
| long $nextBatchTodo;
496501
| if ($numElementsTodo > ${batchSize}L) {

0 commit comments

Comments
 (0)