Skip to content

Commit 76ca6c6

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into gen_defer
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
2 parents 62682b2 + a6e2bd3 commit 76ca6c6

12 files changed

Lines changed: 342 additions & 98 deletions

File tree

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ private[ui] class ExecutorsPage(
8686
<th>Failed Tasks</th>
8787
<th>Complete Tasks</th>
8888
<th>Total Tasks</th>
89-
<th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th>
89+
<th><span data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</span></th>
9090
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
9191
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
9292
<th>
@@ -109,13 +109,8 @@ private[ui] class ExecutorsPage(
109109
val content =
110110
<div class="row">
111111
<div class="span12">
112-
<h4>Dead Executors({deadExecutorInfo.size})</h4>
113-
</div>
114-
</div>
115-
<div class="row">
116-
<div class="span12">
117-
<h4>Active Executors({activeExecutorInfo.size})</h4>
118-
{execSummary(activeExecutorInfo)}
112+
<h4>Summary</h4>
113+
{execSummary(activeExecutorInfo, deadExecutorInfo)}
119114
</div>
120115
</div>
121116
<div class = "row">
@@ -198,7 +193,7 @@ private[ui] class ExecutorsPage(
198193
</tr>
199194
}
200195

201-
private def execSummary(execInfo: Seq[ExecutorSummary]): Seq[Node] = {
196+
private def execSummaryRow(execInfo: Seq[ExecutorSummary], rowName: String): Seq[Node] = {
202197
val maximumMemory = execInfo.map(_.maxMemory).sum
203198
val memoryUsed = execInfo.map(_.memoryUsed).sum
204199
val diskUsed = execInfo.map(_.diskUsed).sum
@@ -207,37 +202,46 @@ private[ui] class ExecutorsPage(
207202
val totalShuffleRead = execInfo.map(_.totalShuffleRead).sum
208203
val totalShuffleWrite = execInfo.map(_.totalShuffleWrite).sum
209204

210-
val sumContent =
211-
<tr>
212-
<td>{execInfo.map(_.rddBlocks).sum}</td>
213-
<td sorttable_customkey={memoryUsed.toString}>
214-
{Utils.bytesToString(memoryUsed)} /
215-
{Utils.bytesToString(maximumMemory)}
216-
</td>
217-
<td sorttable_customkey={diskUsed.toString}>
218-
{Utils.bytesToString(diskUsed)}
219-
</td>
220-
<td>{totalCores}</td>
221-
{taskData(execInfo.map(_.maxTasks).sum,
222-
execInfo.map(_.activeTasks).sum,
223-
execInfo.map(_.failedTasks).sum,
224-
execInfo.map(_.completedTasks).sum,
225-
execInfo.map(_.totalTasks).sum,
226-
execInfo.map(_.totalDuration).sum,
227-
execInfo.map(_.totalGCTime).sum)}
228-
<td sorttable_customkey={totalInputBytes.toString}>
229-
{Utils.bytesToString(totalInputBytes)}
230-
</td>
231-
<td sorttable_customkey={totalShuffleRead.toString}>
232-
{Utils.bytesToString(totalShuffleRead)}
233-
</td>
234-
<td sorttable_customkey={totalShuffleWrite.toString}>
235-
{Utils.bytesToString(totalShuffleWrite)}
236-
</td>
237-
</tr>;
205+
<tr>
206+
<td><b>{rowName}({execInfo.size})</b></td>
207+
<td>{execInfo.map(_.rddBlocks).sum}</td>
208+
<td sorttable_customkey={memoryUsed.toString}>
209+
{Utils.bytesToString(memoryUsed)} /
210+
{Utils.bytesToString(maximumMemory)}
211+
</td>
212+
<td sorttable_customkey={diskUsed.toString}>
213+
{Utils.bytesToString(diskUsed)}
214+
</td>
215+
<td>{totalCores}</td>
216+
{taskData(execInfo.map(_.maxTasks).sum,
217+
execInfo.map(_.activeTasks).sum,
218+
execInfo.map(_.failedTasks).sum,
219+
execInfo.map(_.completedTasks).sum,
220+
execInfo.map(_.totalTasks).sum,
221+
execInfo.map(_.totalDuration).sum,
222+
execInfo.map(_.totalGCTime).sum)}
223+
<td sorttable_customkey={totalInputBytes.toString}>
224+
{Utils.bytesToString(totalInputBytes)}
225+
</td>
226+
<td sorttable_customkey={totalShuffleRead.toString}>
227+
{Utils.bytesToString(totalShuffleRead)}
228+
</td>
229+
<td sorttable_customkey={totalShuffleWrite.toString}>
230+
{Utils.bytesToString(totalShuffleWrite)}
231+
</td>
232+
</tr>
233+
}
234+
235+
private def execSummary(activeExecInfo: Seq[ExecutorSummary], deadExecInfo: Seq[ExecutorSummary]):
236+
Seq[Node] = {
237+
val totalExecInfo = activeExecInfo ++ deadExecInfo
238+
val activeRow = execSummaryRow(activeExecInfo, "Active");
239+
val deadRow = execSummaryRow(deadExecInfo, "Dead");
240+
val totalRow = execSummaryRow(totalExecInfo, "Total");
238241

239242
<table class={UIUtils.TABLE_CLASS_STRIPED}>
240243
<thead>
244+
<th></th>
241245
<th>RDD Blocks</th>
242246
<th><span data-toggle="tooltip" title={ToolTips.STORAGE_MEMORY}>Storage Memory</span></th>
243247
<th>Disk Used</th>
@@ -246,7 +250,7 @@ private[ui] class ExecutorsPage(
246250
<th>Failed Tasks</th>
247251
<th>Complete Tasks</th>
248252
<th>Total Tasks</th>
249-
<th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th>
253+
<th><span data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</span></th>
250254
<th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
251255
<th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
252256
<th>
@@ -256,7 +260,9 @@ private[ui] class ExecutorsPage(
256260
</th>
257261
</thead>
258262
<tbody>
259-
{sumContent}
263+
{activeRow}
264+
{deadRow}
265+
{totalRow}
260266
</tbody>
261267
</table>
262268
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.parquet.schema.Type;
3838

3939
import org.apache.spark.memory.MemoryMode;
40-
import org.apache.spark.sql.catalyst.InternalRow;
4140
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
4241
import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
4342
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
@@ -57,10 +56,14 @@
5756
*
5857
* TODO: handle complex types, decimal requiring more than 8 bytes, INT96. Schema mismatch.
5958
* All of these can be handled efficiently and easily with codegen.
59+
*
60+
* This class can either return InternalRows or ColumnarBatches. With whole stage codegen
61+
* enabled, this class returns ColumnarBatches which offers significant performance gains.
62+
* TODO: make this always return ColumnarBatches.
6063
*/
61-
public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBase<InternalRow> {
64+
public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBase<Object> {
6265
/**
63-
* Batch of unsafe rows that we assemble and the current index we've returned. Everytime this
66+
* Batch of unsafe rows that we assemble and the current index we've returned. Every time this
6467
* batch is used up (batchIdx == numBatched), we populated the batch.
6568
*/
6669
private UnsafeRow[] rows = new UnsafeRow[64];
@@ -115,11 +118,15 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
115118
* code between the path that uses the MR decoders and the vectorized ones.
116119
*
117120
* TODOs:
118-
* - Implement all the encodings to support vectorized.
119121
* - Implement v2 page formats (just make sure we create the correct decoders).
120122
*/
121123
private ColumnarBatch columnarBatch;
122124

125+
/**
126+
* If true, this class returns batches instead of rows.
127+
*/
128+
private boolean returnColumnarBatch;
129+
123130
/**
124131
* The default config on whether columnarBatch should be offheap.
125132
*/
@@ -169,6 +176,8 @@ public void close() throws IOException {
169176

170177
@Override
171178
public boolean nextKeyValue() throws IOException, InterruptedException {
179+
if (returnColumnarBatch) return nextBatch();
180+
172181
if (batchIdx >= numBatched) {
173182
if (vectorizedDecode()) {
174183
if (!nextBatch()) return false;
@@ -181,7 +190,9 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
181190
}
182191

183192
@Override
184-
public InternalRow getCurrentValue() throws IOException, InterruptedException {
193+
public Object getCurrentValue() throws IOException, InterruptedException {
194+
if (returnColumnarBatch) return columnarBatch;
195+
185196
if (vectorizedDecode()) {
186197
return columnarBatch.getRow(batchIdx - 1);
187198
} else {
@@ -210,6 +221,14 @@ public ColumnarBatch resultBatch(MemoryMode memMode) {
210221
return columnarBatch;
211222
}
212223

224+
/**
225+
* Can be called before any rows are returned to enable returning columnar batches directly.
226+
*/
227+
public void enableReturningBatches() {
228+
assert(vectorizedDecode());
229+
returnColumnarBatch = true;
230+
}
231+
213232
/**
214233
* Advances to the next batch of rows. Returns false if there are no more.
215234
*/

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,73 @@
2626

2727
import org.apache.spark.memory.MemoryMode;
2828
import org.apache.spark.sql.Row;
29+
import org.apache.spark.sql.catalyst.InternalRow;
2930
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
3031
import org.apache.spark.sql.types.*;
3132
import org.apache.spark.unsafe.types.CalendarInterval;
33+
import org.apache.spark.unsafe.types.UTF8String;
3234

3335
/**
3436
* Utilities to help manipulate data associate with ColumnVectors. These should be used mostly
3537
* for debugging or other non-performance critical paths.
3638
* These utilities are mostly used to convert ColumnVectors into other formats.
3739
*/
3840
public class ColumnVectorUtils {
41+
/**
42+
* Populates the entire `col` with `row[fieldIdx]`
43+
*/
44+
public static void populate(ColumnVector col, InternalRow row, int fieldIdx) {
45+
int capacity = col.capacity;
46+
DataType t = col.dataType();
47+
48+
if (row.isNullAt(fieldIdx)) {
49+
col.putNulls(0, capacity);
50+
} else {
51+
if (t == DataTypes.BooleanType) {
52+
col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
53+
} else if (t == DataTypes.ByteType) {
54+
col.putBytes(0, capacity, row.getByte(fieldIdx));
55+
} else if (t == DataTypes.ShortType) {
56+
col.putShorts(0, capacity, row.getShort(fieldIdx));
57+
} else if (t == DataTypes.IntegerType) {
58+
col.putInts(0, capacity, row.getInt(fieldIdx));
59+
} else if (t == DataTypes.LongType) {
60+
col.putLongs(0, capacity, row.getLong(fieldIdx));
61+
} else if (t == DataTypes.FloatType) {
62+
col.putFloats(0, capacity, row.getFloat(fieldIdx));
63+
} else if (t == DataTypes.DoubleType) {
64+
col.putDoubles(0, capacity, row.getDouble(fieldIdx));
65+
} else if (t == DataTypes.StringType) {
66+
UTF8String v = row.getUTF8String(fieldIdx);
67+
byte[] bytes = v.getBytes();
68+
for (int i = 0; i < capacity; i++) {
69+
col.putByteArray(i, bytes);
70+
}
71+
} else if (t instanceof DecimalType) {
72+
DecimalType dt = (DecimalType)t;
73+
Decimal d = row.getDecimal(fieldIdx, dt.precision(), dt.scale());
74+
if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
75+
col.putInts(0, capacity, (int)d.toUnscaledLong());
76+
} else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
77+
col.putLongs(0, capacity, d.toUnscaledLong());
78+
} else {
79+
final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
80+
byte[] bytes = integer.toByteArray();
81+
for (int i = 0; i < capacity; i++) {
82+
col.putByteArray(i, bytes, 0, bytes.length);
83+
}
84+
}
85+
} else if (t instanceof CalendarIntervalType) {
86+
CalendarInterval c = (CalendarInterval)row.get(fieldIdx, t);
87+
col.getChildColumn(0).putInts(0, capacity, c.months);
88+
col.getChildColumn(1).putLongs(0, capacity, c.microseconds);
89+
} else if (t instanceof DateType) {
90+
Date date = (Date)row.get(fieldIdx, t);
91+
col.putInts(0, capacity, DateTimeUtils.fromJavaDate(date));
92+
}
93+
}
94+
}
95+
3996
/**
4097
* Returns the array data as the java primitive array.
4198
* For example, an array of IntegerType will return an int[].

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.commons.lang.NotImplementedException;
2323

2424
import org.apache.spark.memory.MemoryMode;
25+
import org.apache.spark.sql.Column;
2526
import org.apache.spark.sql.catalyst.InternalRow;
2627
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
2728
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
@@ -315,6 +316,17 @@ public int numValidRows() {
315316
*/
316317
public ColumnVector column(int ordinal) { return columns[ordinal]; }
317318

319+
/**
320+
* Sets (replaces) the column at `ordinal` with column. This can be used to do very efficient
321+
* projections.
322+
*/
323+
public void setColumn(int ordinal, ColumnVector column) {
324+
if (column instanceof OffHeapColumnVector) {
325+
throw new NotImplementedException("Need to ref count columns.");
326+
}
327+
columns[ordinal] = column;
328+
}
329+
318330
/**
319331
* Returns the row in this batch at `rowId`. Returned row is reused across calls.
320332
*/

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ public final long nullsNativeAddress() {
6262

6363
@Override
6464
public final void close() {
65-
nulls = null;
66-
intData = null;
67-
doubleData = null;
6865
}
6966

7067
//

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,23 +139,76 @@ private[sql] case class PhysicalRDD(
139139
// Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
140140
// never requires UnsafeRow as input.
141141
override protected def doProduce(ctx: CodegenContext): String = {
142+
val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
142143
val input = ctx.freshName("input")
144+
val idx = ctx.freshName("batchIdx")
145+
val batch = ctx.freshName("batch")
143146
// PhysicalRDD always just has one input
144147
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
148+
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
149+
ctx.addMutableState("int", idx, s"$idx = 0;")
145150

146151
val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
147152
val row = ctx.freshName("row")
148153
val numOutputRows = metricTerm(ctx, "numOutputRows")
149154
ctx.INPUT_ROW = row
150155
ctx.currentVars = null
151156
val columns = exprs.map(_.gen(ctx))
157+
158+
// The input RDD can either return (all) ColumnarBatches or InternalRows. We determine this
159+
// by looking at the first value of the RDD and then calling the function which will process
160+
// the remaining. It is faster to return batches.
161+
// TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
162+
// here which path to use. Fix this.
163+
164+
165+
val scanBatches = ctx.freshName("processBatches")
166+
ctx.addNewFunction(scanBatches,
167+
s"""
168+
| private void $scanBatches() throws java.io.IOException {
169+
| while (true) {
170+
| int numRows = $batch.numRows();
171+
| if ($idx == 0) $numOutputRows.add(numRows);
172+
|
173+
| while ($idx < numRows) {
174+
| InternalRow $row = $batch.getRow($idx++);
175+
| ${consume(ctx, columns).trim}
176+
| if (shouldStop()) return;
177+
| }
178+
|
179+
| if (!$input.hasNext()) {
180+
| $batch = null;
181+
| break;
182+
| }
183+
| $batch = ($columnarBatchClz)$input.next();
184+
| $idx = 0;
185+
| }
186+
| }""".stripMargin)
187+
188+
val scanRows = ctx.freshName("processRows")
189+
ctx.addNewFunction(scanRows,
190+
s"""
191+
| private void $scanRows(InternalRow $row) throws java.io.IOException {
192+
| while (true) {
193+
| $numOutputRows.add(1);
194+
| ${consume(ctx, columns).trim}
195+
| if (shouldStop()) return;
196+
| if (!$input.hasNext()) break;
197+
| $row = (InternalRow)$input.next();
198+
| }
199+
| }""".stripMargin)
200+
201+
val value = ctx.freshName("value")
152202
s"""
153-
| while ($input.hasNext()) {
154-
| InternalRow $row = (InternalRow) $input.next();
155-
| $numOutputRows.add(1);
156-
| ${consume(ctx, columns).trim}
157-
| if (shouldStop()) {
158-
| return;
203+
| if ($batch != null) {
204+
| $scanBatches();
205+
| } else if ($input.hasNext()) {
206+
| Object $value = $input.next();
207+
| if ($value instanceof $columnarBatchClz) {
208+
| $batch = ($columnarBatchClz)$value;
209+
| $scanBatches();
210+
| } else {
211+
| $scanRows((InternalRow) $value);
159212
| }
160213
| }
161214
""".stripMargin

0 commit comments

Comments
 (0)