Skip to content

Commit 86da866

Browse files
wangwei1025lirui-apache
authored andcommitted
[FLINK-17783][orc] Add array,map,row types support for orc row writer
This closes #15746
1 parent 42f9d6e commit 86da866

2 files changed

Lines changed: 417 additions & 0 deletions

File tree

flink-formats/flink-orc/src/main/java/org/apache/flink/orc/vector/RowDataVectorizer.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,27 @@
1818

1919
package org.apache.flink.orc.vector;
2020

21+
import org.apache.flink.table.data.ArrayData;
22+
import org.apache.flink.table.data.GenericRowData;
23+
import org.apache.flink.table.data.MapData;
2124
import org.apache.flink.table.data.RowData;
25+
import org.apache.flink.table.types.logical.ArrayType;
2226
import org.apache.flink.table.types.logical.DecimalType;
2327
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
2428
import org.apache.flink.table.types.logical.LogicalType;
29+
import org.apache.flink.table.types.logical.MapType;
30+
import org.apache.flink.table.types.logical.RowType;
2531
import org.apache.flink.table.types.logical.TimestampType;
2632

2733
import org.apache.hadoop.hive.common.type.HiveDecimal;
2834
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
2935
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
3036
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
3137
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
38+
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
3239
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
40+
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
41+
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
3342
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
3443
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
3544

@@ -151,8 +160,118 @@ private static void setColumn(
151160
vector.set(rowId, timestamp);
152161
break;
153162
}
163+
case ARRAY:
164+
{
165+
ListColumnVector listColumnVector = (ListColumnVector) column;
166+
setColumn(rowId, listColumnVector, type, row, columnId);
167+
break;
168+
}
169+
case MAP:
170+
{
171+
MapColumnVector mapColumnVector = (MapColumnVector) column;
172+
setColumn(rowId, mapColumnVector, type, row, columnId);
173+
break;
174+
}
175+
case ROW:
176+
{
177+
StructColumnVector structColumnVector = (StructColumnVector) column;
178+
setColumn(rowId, structColumnVector, type, row, columnId);
179+
break;
180+
}
154181
default:
155182
throw new UnsupportedOperationException("Unsupported type: " + type);
156183
}
157184
}
185+
186+
private static void setColumn(
187+
int rowId,
188+
ListColumnVector listColumnVector,
189+
LogicalType type,
190+
RowData row,
191+
int columnId) {
192+
ArrayData arrayData = row.getArray(columnId);
193+
ArrayType arrayType = (ArrayType) type;
194+
listColumnVector.lengths[rowId] = arrayData.size();
195+
listColumnVector.offsets[rowId] = listColumnVector.childCount;
196+
listColumnVector.childCount += listColumnVector.lengths[rowId];
197+
listColumnVector.child.ensureSize(
198+
listColumnVector.childCount, listColumnVector.offsets[rowId] != 0);
199+
200+
RowData convertedRowData = convert(arrayData, arrayType.getElementType());
201+
for (int i = 0; i < arrayData.size(); i++) {
202+
setColumn(
203+
(int) listColumnVector.offsets[rowId] + i,
204+
listColumnVector.child,
205+
arrayType.getElementType(),
206+
convertedRowData,
207+
i);
208+
}
209+
}
210+
211+
private static void setColumn(
212+
int rowId,
213+
MapColumnVector mapColumnVector,
214+
LogicalType type,
215+
RowData row,
216+
int columnId) {
217+
MapData mapData = row.getMap(columnId);
218+
MapType mapType = (MapType) type;
219+
ArrayData keyArray = mapData.keyArray();
220+
ArrayData valueArray = mapData.valueArray();
221+
mapColumnVector.lengths[rowId] = mapData.size();
222+
mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
223+
mapColumnVector.childCount += mapColumnVector.lengths[rowId];
224+
mapColumnVector.keys.ensureSize(
225+
mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 0);
226+
mapColumnVector.values.ensureSize(
227+
mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 0);
228+
229+
RowData convertedKeyRowData = convert(keyArray, mapType.getKeyType());
230+
RowData convertedValueRowData = convert(valueArray, mapType.getValueType());
231+
for (int i = 0; i < keyArray.size(); i++) {
232+
setColumn(
233+
(int) mapColumnVector.offsets[rowId] + i,
234+
mapColumnVector.keys,
235+
mapType.getKeyType(),
236+
convertedKeyRowData,
237+
i);
238+
setColumn(
239+
(int) mapColumnVector.offsets[rowId] + i,
240+
mapColumnVector.values,
241+
mapType.getValueType(),
242+
convertedValueRowData,
243+
i);
244+
}
245+
}
246+
247+
private static void setColumn(
248+
int rowId,
249+
StructColumnVector structColumnVector,
250+
LogicalType type,
251+
RowData row,
252+
int columnId) {
253+
RowData structRow = row.getRow(columnId, structColumnVector.fields.length);
254+
RowType rowType = (RowType) type;
255+
for (int i = 0; i < structRow.getArity(); i++) {
256+
ColumnVector cv = structColumnVector.fields[i];
257+
setColumn(rowId, cv, rowType.getTypeAt(i), structRow, i);
258+
}
259+
}
260+
261+
/**
262+
* Converting ArrayData to RowData for calling {@link RowDataVectorizer#setColumn(int,
263+
* ColumnVector, LogicalType, RowData, int)} recursively with array.
264+
*
265+
* @param arrayData input ArrayData.
266+
* @param arrayFieldType LogicalType of input ArrayData.
267+
* @return RowData.
268+
*/
269+
private static RowData convert(ArrayData arrayData, LogicalType arrayFieldType) {
270+
GenericRowData rowData = new GenericRowData(arrayData.size());
271+
ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(arrayFieldType);
272+
for (int i = 0; i < arrayData.size(); i++) {
273+
rowData.setField(i, elementGetter.getElementOrNull(arrayData, i));
274+
}
275+
return rowData;
276+
}
158277
}

0 commit comments

Comments
 (0)