Skip to content

Commit b725efd

Browse files
committed
do not use MutableColumnarRow in ColumnarBatch
1 parent cfe236f commit b725efd

File tree

2 files changed

+189
-30
lines changed

2 files changed

+189
-30
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.spark.sql.vectorized.ColumnarBatch;
2727
import org.apache.spark.sql.vectorized.ColumnarMap;
2828
import org.apache.spark.sql.vectorized.ColumnarRow;
29-
import org.apache.spark.sql.vectorized.ColumnVector;
3029
import org.apache.spark.unsafe.types.CalendarInterval;
3130
import org.apache.spark.unsafe.types.UTF8String;
3231

@@ -39,17 +38,10 @@
3938
*/
4039
public final class MutableColumnarRow extends InternalRow {
4140
public int rowId;
42-
private final ColumnVector[] columns;
43-
private final WritableColumnVector[] writableColumns;
44-
45-
public MutableColumnarRow(ColumnVector[] columns) {
46-
this.columns = columns;
47-
this.writableColumns = null;
48-
}
41+
private final WritableColumnVector[] columns;
4942

5043
public MutableColumnarRow(WritableColumnVector[] writableColumns) {
5144
this.columns = writableColumns;
52-
this.writableColumns = writableColumns;
5345
}
5446

5547
@Override
@@ -228,54 +220,54 @@ public void update(int ordinal, Object value) {
228220

229221
@Override
230222
public void setNullAt(int ordinal) {
231-
writableColumns[ordinal].putNull(rowId);
223+
columns[ordinal].putNull(rowId);
232224
}
233225

234226
@Override
235227
public void setBoolean(int ordinal, boolean value) {
236-
writableColumns[ordinal].putNotNull(rowId);
237-
writableColumns[ordinal].putBoolean(rowId, value);
228+
columns[ordinal].putNotNull(rowId);
229+
columns[ordinal].putBoolean(rowId, value);
238230
}
239231

240232
@Override
241233
public void setByte(int ordinal, byte value) {
242-
writableColumns[ordinal].putNotNull(rowId);
243-
writableColumns[ordinal].putByte(rowId, value);
234+
columns[ordinal].putNotNull(rowId);
235+
columns[ordinal].putByte(rowId, value);
244236
}
245237

246238
@Override
247239
public void setShort(int ordinal, short value) {
248-
writableColumns[ordinal].putNotNull(rowId);
249-
writableColumns[ordinal].putShort(rowId, value);
240+
columns[ordinal].putNotNull(rowId);
241+
columns[ordinal].putShort(rowId, value);
250242
}
251243

252244
@Override
253245
public void setInt(int ordinal, int value) {
254-
writableColumns[ordinal].putNotNull(rowId);
255-
writableColumns[ordinal].putInt(rowId, value);
246+
columns[ordinal].putNotNull(rowId);
247+
columns[ordinal].putInt(rowId, value);
256248
}
257249

258250
@Override
259251
public void setLong(int ordinal, long value) {
260-
writableColumns[ordinal].putNotNull(rowId);
261-
writableColumns[ordinal].putLong(rowId, value);
252+
columns[ordinal].putNotNull(rowId);
253+
columns[ordinal].putLong(rowId, value);
262254
}
263255

264256
@Override
265257
public void setFloat(int ordinal, float value) {
266-
writableColumns[ordinal].putNotNull(rowId);
267-
writableColumns[ordinal].putFloat(rowId, value);
258+
columns[ordinal].putNotNull(rowId);
259+
columns[ordinal].putFloat(rowId, value);
268260
}
269261

270262
@Override
271263
public void setDouble(int ordinal, double value) {
272-
writableColumns[ordinal].putNotNull(rowId);
273-
writableColumns[ordinal].putDouble(rowId, value);
264+
columns[ordinal].putNotNull(rowId);
265+
columns[ordinal].putDouble(rowId, value);
274266
}
275267

276268
@Override
277269
public void setDecimal(int ordinal, Decimal value, int precision) {
278-
writableColumns[ordinal].putNotNull(rowId);
279-
writableColumns[ordinal].putDecimal(rowId, value, precision);
270+
columns[ordinal].putNotNull(rowId);
271+
columns[ordinal].putDecimal(rowId, value, precision);
280272
}
281273
}

sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatch.java

Lines changed: 171 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020

2121
import org.apache.spark.annotation.Evolving;
2222
import org.apache.spark.sql.catalyst.InternalRow;
23-
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow;
23+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
24+
import org.apache.spark.sql.types.*;
25+
import org.apache.spark.unsafe.types.CalendarInterval;
26+
import org.apache.spark.unsafe.types.UTF8String;
2427

2528
/**
2629
* This class wraps multiple ColumnVectors as a row-wise table. It provides a row view of this
@@ -33,7 +36,7 @@ public final class ColumnarBatch {
3336
private final ColumnVector[] columns;
3437

3538
// Staging row returned from `getRow`.
36-
private final MutableColumnarRow row;
39+
private final ColumnarBatchRow row;
3740

3841
/**
3942
* Called to close all the columns in this batch. It is not valid to access the data after
@@ -50,7 +53,7 @@ public void close() {
5053
*/
5154
public Iterator<InternalRow> rowIterator() {
5255
final int maxRows = numRows;
53-
final MutableColumnarRow row = new MutableColumnarRow(columns);
56+
final ColumnarBatchRow row = new ColumnarBatchRow(columns);
5457
return new Iterator<InternalRow>() {
5558
int rowId = 0;
5659

@@ -108,6 +111,170 @@ public InternalRow getRow(int rowId) {
108111

109112
public ColumnarBatch(ColumnVector[] columns) {
110113
this.columns = columns;
111-
this.row = new MutableColumnarRow(columns);
114+
this.row = new ColumnarBatchRow(columns);
112115
}
113116
}
117+
118+
/**
119+
* An internal class, which wraps an array of {@link ColumnVector} and provides a row view.
120+
*/
121+
class ColumnarBatchRow extends InternalRow {
122+
public int rowId;
123+
private final ColumnVector[] columns;
124+
125+
ColumnarBatchRow(ColumnVector[] columns) {
126+
this.columns = columns;
127+
}
128+
129+
@Override
130+
public int numFields() { return columns.length; }
131+
132+
@Override
133+
public InternalRow copy() {
134+
GenericInternalRow row = new GenericInternalRow(columns.length);
135+
for (int i = 0; i < numFields(); i++) {
136+
if (isNullAt(i)) {
137+
row.setNullAt(i);
138+
} else {
139+
DataType dt = columns[i].dataType();
140+
if (dt instanceof BooleanType) {
141+
row.setBoolean(i, getBoolean(i));
142+
} else if (dt instanceof ByteType) {
143+
row.setByte(i, getByte(i));
144+
} else if (dt instanceof ShortType) {
145+
row.setShort(i, getShort(i));
146+
} else if (dt instanceof IntegerType) {
147+
row.setInt(i, getInt(i));
148+
} else if (dt instanceof LongType) {
149+
row.setLong(i, getLong(i));
150+
} else if (dt instanceof FloatType) {
151+
row.setFloat(i, getFloat(i));
152+
} else if (dt instanceof DoubleType) {
153+
row.setDouble(i, getDouble(i));
154+
} else if (dt instanceof StringType) {
155+
row.update(i, getUTF8String(i).copy());
156+
} else if (dt instanceof BinaryType) {
157+
row.update(i, getBinary(i));
158+
} else if (dt instanceof DecimalType) {
159+
DecimalType t = (DecimalType)dt;
160+
row.setDecimal(i, getDecimal(i, t.precision(), t.scale()), t.precision());
161+
} else if (dt instanceof DateType) {
162+
row.setInt(i, getInt(i));
163+
} else if (dt instanceof TimestampType) {
164+
row.setLong(i, getLong(i));
165+
} else {
166+
throw new RuntimeException("Not implemented. " + dt);
167+
}
168+
}
169+
}
170+
return row;
171+
}
172+
173+
@Override
174+
public boolean anyNull() {
175+
throw new UnsupportedOperationException();
176+
}
177+
178+
@Override
179+
public boolean isNullAt(int ordinal) { return columns[ordinal].isNullAt(rowId); }
180+
181+
@Override
182+
public boolean getBoolean(int ordinal) { return columns[ordinal].getBoolean(rowId); }
183+
184+
@Override
185+
public byte getByte(int ordinal) { return columns[ordinal].getByte(rowId); }
186+
187+
@Override
188+
public short getShort(int ordinal) { return columns[ordinal].getShort(rowId); }
189+
190+
@Override
191+
public int getInt(int ordinal) { return columns[ordinal].getInt(rowId); }
192+
193+
@Override
194+
public long getLong(int ordinal) { return columns[ordinal].getLong(rowId); }
195+
196+
@Override
197+
public float getFloat(int ordinal) { return columns[ordinal].getFloat(rowId); }
198+
199+
@Override
200+
public double getDouble(int ordinal) { return columns[ordinal].getDouble(rowId); }
201+
202+
@Override
203+
public Decimal getDecimal(int ordinal, int precision, int scale) {
204+
return columns[ordinal].getDecimal(rowId, precision, scale);
205+
}
206+
207+
@Override
208+
public UTF8String getUTF8String(int ordinal) {
209+
return columns[ordinal].getUTF8String(rowId);
210+
}
211+
212+
@Override
213+
public byte[] getBinary(int ordinal) {
214+
return columns[ordinal].getBinary(rowId);
215+
}
216+
217+
@Override
218+
public CalendarInterval getInterval(int ordinal) {
219+
return columns[ordinal].getInterval(rowId);
220+
}
221+
222+
@Override
223+
public ColumnarRow getStruct(int ordinal, int numFields) {
224+
return columns[ordinal].getStruct(rowId);
225+
}
226+
227+
@Override
228+
public ColumnarArray getArray(int ordinal) {
229+
return columns[ordinal].getArray(rowId);
230+
}
231+
232+
@Override
233+
public ColumnarMap getMap(int ordinal) {
234+
return columns[ordinal].getMap(rowId);
235+
}
236+
237+
@Override
238+
public Object get(int ordinal, DataType dataType) {
239+
if (dataType instanceof BooleanType) {
240+
return getBoolean(ordinal);
241+
} else if (dataType instanceof ByteType) {
242+
return getByte(ordinal);
243+
} else if (dataType instanceof ShortType) {
244+
return getShort(ordinal);
245+
} else if (dataType instanceof IntegerType) {
246+
return getInt(ordinal);
247+
} else if (dataType instanceof LongType) {
248+
return getLong(ordinal);
249+
} else if (dataType instanceof FloatType) {
250+
return getFloat(ordinal);
251+
} else if (dataType instanceof DoubleType) {
252+
return getDouble(ordinal);
253+
} else if (dataType instanceof StringType) {
254+
return getUTF8String(ordinal);
255+
} else if (dataType instanceof BinaryType) {
256+
return getBinary(ordinal);
257+
} else if (dataType instanceof DecimalType) {
258+
DecimalType t = (DecimalType) dataType;
259+
return getDecimal(ordinal, t.precision(), t.scale());
260+
} else if (dataType instanceof DateType) {
261+
return getInt(ordinal);
262+
} else if (dataType instanceof TimestampType) {
263+
return getLong(ordinal);
264+
} else if (dataType instanceof ArrayType) {
265+
return getArray(ordinal);
266+
} else if (dataType instanceof StructType) {
267+
return getStruct(ordinal, ((StructType)dataType).fields().length);
268+
} else if (dataType instanceof MapType) {
269+
return getMap(ordinal);
270+
} else {
271+
throw new UnsupportedOperationException("Datatype not supported " + dataType);
272+
}
273+
}
274+
275+
@Override
276+
public void update(int ordinal, Object value) { throw new UnsupportedOperationException(); }
277+
278+
@Override
279+
public void setNullAt(int ordinal) { throw new UnsupportedOperationException(); }
280+
}

0 commit comments

Comments
 (0)