partObjects = new LinkedHashMap<>();
- partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
+ partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition(
partDefaultName.equals(v) ? null : v,
fullFieldTypes[fieldNameList.indexOf(k)])));
@@ -122,7 +120,7 @@ public void open(FileInputSplit fileSplit) throws IOException {
fullFieldTypes,
partObjects,
selectedFields,
- DEFAULT_SIZE,
+ 2048,
fileSplit.getPath(),
fileSplit.getStart(),
fileSplit.getLength());
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 8eaa9d0b886f4..76e9e60ee0dc4 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -35,6 +35,7 @@
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StringToRowDataConverter;
@@ -65,8 +66,6 @@
import java.util.Set;
import java.util.stream.IntStream;
-import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
-import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
@@ -74,8 +73,7 @@
/**
* The base InputFormat class to read from Hoodie data + log files.
*
- * Use {@link org.apache.flink.formats.parquet.utils.ParquetRecordReader}
- * to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream},
+ *
Use {@code ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream},
* overrides {@link #createInputSplits(int)} and {@link #close()} to change the behaviors.
*/
public class MergeOnReadInputFormat
@@ -299,7 +297,7 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos)
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
FilePathUtils.extractPartitionKeys(this.conf));
LinkedHashMap partObjects = new LinkedHashMap<>();
- partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
+ partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition(
defaultPartName.equals(v) ? null : v,
fieldTypes.get(fieldNames.indexOf(k)))));
@@ -311,7 +309,7 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos)
fieldTypes.toArray(new DataType[0]),
partObjects,
requiredPos,
- DEFAULT_SIZE,
+ 2048,
new org.apache.flink.core.fs.Path(path),
0,
Long.MAX_VALUE); // read the whole file
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
index 11324038ab943..050d1728f7c37 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
@@ -36,7 +36,6 @@
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TypeInformationRawType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import java.util.List;
import java.util.stream.Collectors;
@@ -339,7 +338,7 @@ public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
keyType = multisetType.getElementType();
valueType = new IntType();
}
- if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+ if (!DataTypeUtils.isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
throw new UnsupportedOperationException(
"Avro format doesn't support non-string as key type of map. "
+ "The key type is: "
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
index 60d457370a9f1..d63cd7689b0ef 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
@@ -22,10 +22,14 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
import java.util.Arrays;
/**
@@ -69,4 +73,51 @@ public static RowType.RowField[] projectRowFields(RowType rowType, String[] name
int [] fieldIndices = Arrays.stream(names).mapToInt(rowType::getFieldIndex).toArray();
return Arrays.stream(fieldIndices).mapToObj(i -> rowType.getFields().get(i)).toArray(RowType.RowField[]::new);
}
+
+ /**
+ * Returns whether the given logical type belongs to the family.
+ */
+ public static boolean isFamily(LogicalType logicalType, LogicalTypeFamily family) {
+ return logicalType.getTypeRoot().getFamilies().contains(family);
+ }
+
+ /**
+ * Resolves the partition path string into value obj with given data type.
+ */
+ public static Object resolvePartition(String partition, DataType type) {
+ if (partition == null) {
+ return null;
+ }
+
+ LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot();
+ switch (typeRoot) {
+ case CHAR:
+ case VARCHAR:
+ return partition;
+ case BOOLEAN:
+ return Boolean.parseBoolean(partition);
+ case TINYINT:
+ return Integer.valueOf(partition).byteValue();
+ case SMALLINT:
+ return Short.valueOf(partition);
+ case INTEGER:
+ return Integer.valueOf(partition);
+ case BIGINT:
+ return Long.valueOf(partition);
+ case FLOAT:
+ return Float.valueOf(partition);
+ case DOUBLE:
+ return Double.valueOf(partition);
+ case DATE:
+ return LocalDate.parse(partition);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return LocalDateTime.parse(partition);
+ case DECIMAL:
+ return new BigDecimal(partition);
+ default:
+ throw new RuntimeException(
+ String.format(
+ "Can not convert %s to type %s for partition value", partition, type));
+ }
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
index d23a278876d1e..c7b9382655325 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java
@@ -18,6 +18,7 @@
package org.apache.hudi.util;
+import org.apache.hudi.adapter.Utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTableFactory;
@@ -223,8 +224,7 @@ private static String getCreateHoodieTableDDL(
* @param isBounded A flag indicating whether the input data stream is bounded
*/
private static DataStreamSink> sink(DataStream input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
- FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
- Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
+ FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));
HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
@@ -239,8 +239,7 @@ private static DataStreamSink> sink(DataStream input, ObjectIdentifie
* @param catalogTable The hoodie catalog table
*/
private static DataStream source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable) {
- FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
- Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
+ FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));
HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory
.createDynamicTableSource(context))
@@ -252,8 +251,8 @@ private static DataStream source(StreamExecutionEnvironment execEnv, Ob
* A POJO that contains tableId and resolvedCatalogTable.
*/
public static class TableDescriptor {
- private ObjectIdentifier tableId;
- private ResolvedCatalogTable resolvedCatalogTable;
+ private final ObjectIdentifier tableId;
+ private final ResolvedCatalogTable resolvedCatalogTable;
public TableDescriptor(ObjectIdentifier tableId, ResolvedCatalogTable resolvedCatalogTable) {
this.tableId = tableId;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index c1a24a270128d..a8104efb322f9 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1350,7 +1350,7 @@ private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish
try {
- tableResult.getJobClient().get().getJobExecutionResult().get();
+ tableResult.await();
} catch (InterruptedException | ExecutionException ex) {
// ignored
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
index d38aad60c3452..2830eefef013f 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java
@@ -18,6 +18,8 @@
package org.apache.hudi.utils.source;
+import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
+
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
@@ -28,7 +30,6 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
@@ -74,7 +75,7 @@ public ContinuousFileSource(
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
- return new DataStreamScanProvider() {
+ return new DataStreamScanProviderAdapter() {
@Override
public boolean isBounded() {
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/pom.xml b/hudi-flink-datasource/hudi-flink1.13.x/pom.xml
index ff60a89490444..35024b1f18652 100644
--- a/hudi-flink-datasource/hudi-flink1.13.x/pom.xml
+++ b/hudi-flink-datasource/hudi-flink1.13.x/pom.xml
@@ -33,6 +33,17 @@
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ provided
+
org.apache.flink
flink-table-runtime-blink_${scala.binary.version}
@@ -51,6 +62,18 @@
${flink1.13.version}
provided
+
+ org.apache.flink
+ flink-parquet_${scala.binary.version}
+ ${flink1.13.version}
+ provided
+
+
+ org.apache.flink
+ flink-json
+ ${flink1.13.version}
+ provided
+
org.apache.flink
flink-runtime_${scala.binary.version}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java
new file mode 100644
index 0000000000000..867395c43f199
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+
+/**
+ * Adapter clazz for {@code DataStreamScanProvider}.
+ */
+public interface DataStreamScanProviderAdapter extends DataStreamScanProvider {
+}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java
new file mode 100644
index 0000000000000..e8eaa3c62d441
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+
+/**
+ * Adapter clazz for {@code DataStreamSinkProvider}.
+ */
+public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {
+}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/Utils.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/Utils.java
index 9eb52c6765896..1f9ebb582394c 100644
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.adapter;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
@@ -25,6 +26,9 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.factories.FactoryUtil;
/**
* Adapter utils.
@@ -45,4 +49,12 @@ public static SourceFunction.SourceContext getSourceContext(
watermarkInterval,
-1);
}
+
+ public static FactoryUtil.DefaultDynamicTableContext getTableContext(
+ ObjectIdentifier tablePath,
+ ResolvedCatalogTable catalogTable,
+ ReadableConfig conf) {
+ return new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
+ conf, Thread.currentThread().getContextClassLoader(), false);
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/BaseVectorizedColumnReader.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/FixedLenBytesColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/FixedLenBytesColumnReader.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/FixedLenBytesColumnReader.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/FixedLenBytesColumnReader.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/Int64TimestampColumnReader.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReader.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
similarity index 100%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RunLengthDecoder.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RunLengthDecoder.java
similarity index 98%
rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RunLengthDecoder.java
rename to hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RunLengthDecoder.java
index f13340ceddf92..3266f835e4d1c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RunLengthDecoder.java
+++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RunLengthDecoder.java
@@ -58,7 +58,7 @@ final class RunLengthDecoder {
private BytePacker packer;
// Current decoding mode and values
- RunLengthDecoder.MODE mode;
+ MODE mode;
int currentCount;
int currentValue;
@@ -102,7 +102,7 @@ void initFromStream(int valueCount, ByteBufferInputStream in) throws IOException
}
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
- this.mode = RunLengthDecoder.MODE.RLE;
+ this.mode = MODE.RLE;
this.currentCount = valueCount;
this.currentValue = 0;
} else {
@@ -266,7 +266,7 @@ private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
void readNextGroup() {
try {
int header = readUnsignedVarInt();
- this.mode = (header & 1) == 0 ? RunLengthDecoder.MODE.RLE : RunLengthDecoder.MODE.PACKED;
+ this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
case RLE:
this.currentCount = header >>> 1;
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/pom.xml b/hudi-flink-datasource/hudi-flink1.14.x/pom.xml
index ed7f1b9a1edba..990321b15b863 100644
--- a/hudi-flink-datasource/hudi-flink1.14.x/pom.xml
+++ b/hudi-flink-datasource/hudi-flink1.14.x/pom.xml
@@ -33,6 +33,17 @@
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ provided
+
org.apache.flink
flink-table-api-java
@@ -45,6 +56,12 @@
${flink1.14.version}
provided
+
+ org.apache.flink
+ flink-table-runtime_${scala.binary.version}
+ ${flink1.14.version}
+ provided
+
org.apache.flink
flink-shaded-guava
@@ -63,6 +80,18 @@
${flink1.14.version}
provided
+
+ org.apache.flink
+ flink-parquet_${scala.binary.version}
+ ${flink1.14.version}
+ provided
+
+
+ org.apache.flink
+ flink-json
+ ${flink1.14.version}
+ provided
+
org.apache.flink
flink-runtime
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java
new file mode 100644
index 0000000000000..867395c43f199
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+
+/**
+ * Adapter clazz for {@code DataStreamScanProvider}.
+ */
+public interface DataStreamScanProviderAdapter extends DataStreamScanProvider {
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java
new file mode 100644
index 0000000000000..e8eaa3c62d441
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+
+/**
+ * Adapter clazz for {@code DataStreamSinkProvider}.
+ */
+public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/Utils.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/Utils.java
index 41ac0ffcee576..30c6a22bfd8ea 100644
--- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.adapter;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
@@ -25,6 +26,9 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.factories.FactoryUtil;
/**
* Adapter utils.
@@ -45,4 +49,12 @@ public static SourceFunction.SourceContext getSourceContext(
-1,
true);
}
+
+ public static FactoryUtil.DefaultDynamicTableContext getTableContext(
+ ObjectIdentifier tablePath,
+ ResolvedCatalogTable catalogTable,
+ ReadableConfig conf) {
+ return new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
+ conf, Thread.currentThread().getContextClassLoader(), false);
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
new file mode 100644
index 0000000000000..c636b36100fea
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
+import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
+import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
+import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
+import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.BytesColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.DoubleColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.FloatColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.IntColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.LongColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader;
+import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.dateToInternal;
+import static org.apache.parquet.Preconditions.checkArgument;
+
+/**
+ * Util for generating {@link ParquetColumnarRowSplitReader}.
+ *
+ * NOTE: reference from Flink release 1.11.2 {@code ParquetSplitReaderUtil}, modify to support INT64
+ * based TIMESTAMP_MILLIS as ConvertedType, should remove when Flink supports that.
+ */
+public class ParquetSplitReaderUtil {
+
+ /**
+ * Util for generating partitioned {@link ParquetColumnarRowSplitReader}.
+ */
+ public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
+ boolean utcTimestamp,
+ boolean caseSensitive,
+ Configuration conf,
+ String[] fullFieldNames,
+ DataType[] fullFieldTypes,
+ Map partitionSpec,
+ int[] selectedFields,
+ int batchSize,
+ Path path,
+ long splitStart,
+ long splitLength) throws IOException {
+ List selNonPartNames = Arrays.stream(selectedFields)
+ .mapToObj(i -> fullFieldNames[i])
+ .filter(n -> !partitionSpec.containsKey(n))
+ .collect(Collectors.toList());
+
+ int[] selParquetFields = Arrays.stream(selectedFields)
+ .filter(i -> !partitionSpec.containsKey(fullFieldNames[i]))
+ .toArray();
+
+ ParquetColumnarRowSplitReader.ColumnBatchGenerator gen = readVectors -> {
+ // create and initialize the row batch
+ ColumnVector[] vectors = new ColumnVector[selectedFields.length];
+ for (int i = 0; i < vectors.length; i++) {
+ String name = fullFieldNames[selectedFields[i]];
+ LogicalType type = fullFieldTypes[selectedFields[i]].getLogicalType();
+ vectors[i] = createVector(readVectors, selNonPartNames, name, type, partitionSpec, batchSize);
+ }
+ return new VectorizedColumnBatch(vectors);
+ };
+
+ return new ParquetColumnarRowSplitReader(
+ utcTimestamp,
+ caseSensitive,
+ conf,
+ Arrays.stream(selParquetFields)
+ .mapToObj(i -> fullFieldTypes[i].getLogicalType())
+ .toArray(LogicalType[]::new),
+ selNonPartNames.toArray(new String[0]),
+ gen,
+ batchSize,
+ new org.apache.hadoop.fs.Path(path.toUri()),
+ splitStart,
+ splitLength);
+ }
+
+ private static ColumnVector createVector(
+ ColumnVector[] readVectors,
+ List selNonPartNames,
+ String name,
+ LogicalType type,
+ Map partitionSpec,
+ int batchSize) {
+ if (partitionSpec.containsKey(name)) {
+ return createVectorFromConstant(type, partitionSpec.get(name), batchSize);
+ }
+ ColumnVector readVector = readVectors[selNonPartNames.indexOf(name)];
+ if (readVector == null) {
+ // when the read vector is null, use a constant null vector instead
+ readVector = createVectorFromConstant(type, null, batchSize);
+ }
+ return readVector;
+ }
+
+ private static ColumnVector createVectorFromConstant(
+ LogicalType type,
+ Object value,
+ int batchSize) {
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ HeapBytesVector bsv = new HeapBytesVector(batchSize);
+ if (value == null) {
+ bsv.fillWithNulls();
+ } else {
+ bsv.fill(value instanceof byte[]
+ ? (byte[]) value
+ : value.toString().getBytes(StandardCharsets.UTF_8));
+ }
+ return bsv;
+ case BOOLEAN:
+ HeapBooleanVector bv = new HeapBooleanVector(batchSize);
+ if (value == null) {
+ bv.fillWithNulls();
+ } else {
+ bv.fill((boolean) value);
+ }
+ return bv;
+ case TINYINT:
+ HeapByteVector byteVector = new HeapByteVector(batchSize);
+ if (value == null) {
+ byteVector.fillWithNulls();
+ } else {
+ byteVector.fill(((Number) value).byteValue());
+ }
+ return byteVector;
+ case SMALLINT:
+ HeapShortVector sv = new HeapShortVector(batchSize);
+ if (value == null) {
+ sv.fillWithNulls();
+ } else {
+ sv.fill(((Number) value).shortValue());
+ }
+ return sv;
+ case INTEGER:
+ HeapIntVector iv = new HeapIntVector(batchSize);
+ if (value == null) {
+ iv.fillWithNulls();
+ } else {
+ iv.fill(((Number) value).intValue());
+ }
+ return iv;
+ case BIGINT:
+ HeapLongVector lv = new HeapLongVector(batchSize);
+ if (value == null) {
+ lv.fillWithNulls();
+ } else {
+ lv.fill(((Number) value).longValue());
+ }
+ return lv;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ DecimalData decimal = value == null
+ ? null
+ : Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, precision, scale));
+ ColumnVector internalVector = createVectorFromConstant(
+ new VarBinaryType(),
+ decimal == null ? null : decimal.toUnscaledBytes(),
+ batchSize);
+ return new ParquetDecimalVector(internalVector);
+ case FLOAT:
+ HeapFloatVector fv = new HeapFloatVector(batchSize);
+ if (value == null) {
+ fv.fillWithNulls();
+ } else {
+ fv.fill(((Number) value).floatValue());
+ }
+ return fv;
+ case DOUBLE:
+ HeapDoubleVector dv = new HeapDoubleVector(batchSize);
+ if (value == null) {
+ dv.fillWithNulls();
+ } else {
+ dv.fill(((Number) value).doubleValue());
+ }
+ return dv;
+ case DATE:
+ if (value instanceof LocalDate) {
+ value = Date.valueOf((LocalDate) value);
+ }
+ return createVectorFromConstant(
+ new IntType(),
+ value == null ? null : dateToInternal((Date) value),
+ batchSize);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ HeapTimestampVector tv = new HeapTimestampVector(batchSize);
+ if (value == null) {
+ tv.fillWithNulls();
+ } else {
+ tv.fill(TimestampData.fromLocalDateTime((LocalDateTime) value));
+ }
+ return tv;
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private static List filterDescriptors(int depth, Type type, List columns) throws ParquetRuntimeException {
+ List filtered = new ArrayList<>();
+ for (ColumnDescriptor descriptor : columns) {
+ if (depth >= descriptor.getPath().length) {
+ throw new InvalidSchemaException("Expect depth " + depth + " for schema: " + descriptor);
+ }
+ if (type.getName().equals(descriptor.getPath()[depth])) {
+ filtered.add(descriptor);
+ }
+ }
+ ValidationUtils.checkState(filtered.size() > 0, "Corrupted Parquet schema");
+ return filtered;
+ }
+
+ public static ColumnReader createColumnReader(
+ boolean utcTimestamp,
+ LogicalType fieldType,
+ Type physicalType,
+ List descriptors,
+ PageReadStore pages) throws IOException {
+ return createColumnReader(utcTimestamp, fieldType, physicalType, descriptors,
+ pages, 0);
+ }
+
+ private static ColumnReader createColumnReader(
+ boolean utcTimestamp,
+ LogicalType fieldType,
+ Type physicalType,
+ List columns,
+ PageReadStore pages,
+ int depth) throws IOException {
+ List descriptors = filterDescriptors(depth, physicalType, columns);
+ ColumnDescriptor descriptor = descriptors.get(0);
+ PageReader pageReader = pages.getPageReader(descriptor);
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ return new BooleanColumnReader(descriptor, pageReader);
+ case TINYINT:
+ return new ByteColumnReader(descriptor, pageReader);
+ case DOUBLE:
+ return new DoubleColumnReader(descriptor, pageReader);
+ case FLOAT:
+ return new FloatColumnReader(descriptor, pageReader);
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return new IntColumnReader(descriptor, pageReader);
+ case BIGINT:
+ return new LongColumnReader(descriptor, pageReader);
+ case SMALLINT:
+ return new ShortColumnReader(descriptor, pageReader);
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return new BytesColumnReader(descriptor, pageReader);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT64:
+ return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
+ case INT96:
+ return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
+ default:
+ throw new AssertionError();
+ }
+ case DECIMAL:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return new IntColumnReader(descriptor, pageReader);
+ case INT64:
+ return new LongColumnReader(descriptor, pageReader);
+ case BINARY:
+ return new BytesColumnReader(descriptor, pageReader);
+ case FIXED_LEN_BYTE_ARRAY:
+ return new FixedLenBytesColumnReader(
+ descriptor, pageReader, ((DecimalType) fieldType).getPrecision());
+ default:
+ throw new AssertionError();
+ }
+ case ARRAY:
+ return new ArrayColumnReader(
+ descriptor,
+ pageReader,
+ utcTimestamp,
+ descriptor.getPrimitiveType(),
+ fieldType);
+ case MAP:
+ MapType mapType = (MapType) fieldType;
+ ArrayColumnReader keyReader =
+ new ArrayColumnReader(
+ descriptor,
+ pageReader,
+ utcTimestamp,
+ descriptor.getPrimitiveType(),
+ new ArrayType(mapType.getKeyType()));
+ ArrayColumnReader valueReader =
+ new ArrayColumnReader(
+ descriptors.get(1),
+ pages.getPageReader(descriptors.get(1)),
+ utcTimestamp,
+ descriptors.get(1).getPrimitiveType(),
+ new ArrayType(mapType.getValueType()));
+ return new MapColumnReader(keyReader, valueReader, fieldType);
+ case ROW:
+ RowType rowType = (RowType) fieldType;
+ GroupType groupType = physicalType.asGroupType();
+ List fieldReaders = new ArrayList<>();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ fieldReaders.add(
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(i),
+ descriptors,
+ pages,
+ depth + 1));
+ }
+ return new RowColumnReader(fieldReaders);
+ default:
+ throw new UnsupportedOperationException(fieldType + " is not supported now.");
+ }
+ }
+
+ public static WritableColumnVector createWritableColumnVector(
+ int batchSize,
+ LogicalType fieldType,
+ Type physicalType,
+ List descriptors) {
+ return createWritableColumnVector(batchSize, fieldType, physicalType, descriptors, 0);
+ }
+
+ private static WritableColumnVector createWritableColumnVector(
+ int batchSize,
+ LogicalType fieldType,
+ Type physicalType,
+ List columns,
+ int depth) {
+ List descriptors = filterDescriptors(depth, physicalType, columns);
+ PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
+ PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN,
+ "Unexpected type: %s", typeName);
+ return new HeapBooleanVector(batchSize);
+ case TINYINT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ "Unexpected type: %s", typeName);
+ return new HeapByteVector(batchSize);
+ case DOUBLE:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.DOUBLE,
+ "Unexpected type: %s", typeName);
+ return new HeapDoubleVector(batchSize);
+ case FLOAT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.FLOAT,
+ "Unexpected type: %s", typeName);
+ return new HeapFloatVector(batchSize);
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ "Unexpected type: %s", typeName);
+ return new HeapIntVector(batchSize);
+ case BIGINT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT64,
+ "Unexpected type: %s", typeName);
+ return new HeapLongVector(batchSize);
+ case SMALLINT:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.INT32,
+ "Unexpected type: %s", typeName);
+ return new HeapShortVector(batchSize);
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ checkArgument(
+ typeName == PrimitiveType.PrimitiveTypeName.BINARY,
+ "Unexpected type: %s", typeName);
+ return new HeapBytesVector(batchSize);
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
+ "TIME_MICROS original type is not ");
+ return new HeapTimestampVector(batchSize);
+ case DECIMAL:
+ checkArgument(
+ (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
+ && primitiveType.getOriginalType() == OriginalType.DECIMAL,
+ "Unexpected type: %s", typeName);
+ return new HeapBytesVector(batchSize);
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) fieldType;
+ return new HeapArrayVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ arrayType.getElementType(),
+ physicalType,
+ descriptors,
+ depth));
+ case MAP:
+ MapType mapType = (MapType) fieldType;
+ GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
+ // the map column has three level paths.
+ return new HeapMapColumnVector(
+ batchSize,
+ createWritableColumnVector(
+ batchSize,
+ mapType.getKeyType(),
+ repeatedType.getType(0),
+ descriptors,
+ depth + 2),
+ createWritableColumnVector(
+ batchSize,
+ mapType.getValueType(),
+ repeatedType.getType(1),
+ descriptors,
+ depth + 2));
+ case ROW:
+ RowType rowType = (RowType) fieldType;
+ GroupType groupType = physicalType.asGroupType();
+ WritableColumnVector[] columnVectors =
+ new WritableColumnVector[rowType.getFieldCount()];
+ for (int i = 0; i < columnVectors.length; i++) {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(i),
+ descriptors,
+ depth + 1);
+ }
+ return new HeapRowColumnVector(batchSize, columnVectors);
+ default:
+ throw new UnsupportedOperationException(fieldType + " is not supported now.");
+ }
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
new file mode 100644
index 0000000000000..edd90714c87a7
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.ColumnarArrayData;
+import org.apache.flink.table.data.vector.ArrayColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap array column vector.
+ */
+public class HeapArrayVector extends AbstractHeapVector
+ implements WritableColumnVector, ArrayColumnVector {
+
+ public long[] offsets;
+ public long[] lengths;
+ public ColumnVector child;
+ private int size;
+
+ public HeapArrayVector(int len) {
+ super(len);
+ offsets = new long[len];
+ lengths = new long[len];
+ }
+
+ public HeapArrayVector(int len, ColumnVector vector) {
+ super(len);
+ offsets = new long[len];
+ lengths = new long[len];
+ this.child = vector;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ public int getLen() {
+ return this.isNull.length;
+ }
+
+ @Override
+ public ArrayData getArray(int i) {
+ long offset = offsets[i];
+ long length = lengths[i];
+ return new ColumnarArrayData(child, (int) offset, (int) length);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
new file mode 100644
index 0000000000000..2b34a02f116b3
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ColumnarMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.MapColumnVector;
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap map column vector.
+ */
+public class HeapMapColumnVector extends AbstractHeapVector
+ implements WritableColumnVector, MapColumnVector {
+
+ private long[] offsets;
+ private long[] lengths;
+ private int size;
+ private ColumnVector keys;
+ private ColumnVector values;
+
+ public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) {
+ super(len);
+ size = 0;
+ offsets = new long[len];
+ lengths = new long[len];
+ this.keys = keys;
+ this.values = values;
+ }
+
+ public void setOffsets(long[] offsets) {
+ this.offsets = offsets;
+ }
+
+ public void setLengths(long[] lengths) {
+ this.lengths = lengths;
+ }
+
+ public void setKeys(ColumnVector keys) {
+ this.keys = keys;
+ }
+
+ public void setValues(ColumnVector values) {
+ this.values = values;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public MapData getMap(int i) {
+ long offset = offsets[i];
+ long length = lengths[i];
+ return new ColumnarMapData(keys, values, (int) offset, (int) length);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
new file mode 100644
index 0000000000000..0193e6cbb1d22
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.ColumnarRowData;
+import org.apache.flink.table.data.vector.RowColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+/**
+ * This class represents a nullable heap row column vector.
+ */
+public class HeapRowColumnVector extends AbstractHeapVector
+ implements WritableColumnVector, RowColumnVector {
+
+ public WritableColumnVector[] vectors;
+
+ public HeapRowColumnVector(int len, WritableColumnVector... vectors) {
+ super(len);
+ this.vectors = vectors;
+ }
+
+ @Override
+ public ColumnarRowData getRow(int i) {
+ ColumnarRowData columnarRowData = new ColumnarRowData(new VectorizedColumnBatch(vectors));
+ columnarRowData.setRowId(i);
+ return columnarRowData;
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java
new file mode 100644
index 0000000000000..a2f6d5b0cd74c
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+
+/**
+ * Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to
+ * provide {@link DecimalColumnVector} interface.
+ *
+ * Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.ParquetDecimalVector}
+ * because it is not public.
+ */
+public class ParquetDecimalVector implements DecimalColumnVector {
+
+ public final ColumnVector vector;
+
+ public ParquetDecimalVector(ColumnVector vector) {
+ this.vector = vector;
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromUnscaledBytes(
+ ((BytesColumnVector) vector).getBytes(i).getBytes(),
+ precision,
+ scale);
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return vector.isNullAt(i);
+ }
+}
+
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
new file mode 100644
index 0000000000000..07416a371715c
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.ParquetDictionary;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+
+/**
+ * Abstract {@link ColumnReader}.
+ * See {@link org.apache.parquet.column.impl.ColumnReaderImpl},
+ * part of the code is referred from Apache Spark and Apache Parquet.
+ *
+ *
Note: Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader}
+ * because some of the package scope methods.
+ */
+public abstract class AbstractColumnReader
+ implements ColumnReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.class);
+
+ private final PageReader pageReader;
+
+ /**
+ * The dictionary, if this column has dictionary encoding.
+ */
+ protected final Dictionary dictionary;
+
+ /**
+ * Maximum definition level for this column.
+ */
+ protected final int maxDefLevel;
+
+ protected final ColumnDescriptor descriptor;
+
+ /**
+ * Total number of values read.
+ */
+ private long valuesRead;
+
+ /**
+ * value that indicates the end of the current page. That is, if valuesRead ==
+ * endOfPageValueCount, we are at the end of the page.
+ */
+ private long endOfPageValueCount;
+
+ /**
+ * If true, the current page is dictionary encoded.
+ */
+ private boolean isCurrentPageDictionaryEncoded;
+
+ /**
+ * Total values in the current page.
+ */
+ private int pageValueCount;
+
+ /*
+ * Input streams:
+ * 1.Run length encoder to encode every data, so we have run length stream to get
+ * run length information.
+ * 2.Data maybe is real data, maybe is dictionary ids which need be decode to real
+ * data from Dictionary.
+ *
+ * Run length stream ------> Data stream
+ * |
+ * ------> Dictionary ids stream
+ */
+
+ /**
+ * Run length decoder for data and dictionary.
+ */
+ protected RunLengthDecoder runLenDecoder;
+
+ /**
+ * Data input stream.
+ */
+ ByteBufferInputStream dataInputStream;
+
+ /**
+ * Dictionary decoder to wrap dictionary ids input stream.
+ */
+ private RunLengthDecoder dictionaryIdsDecoder;
+
+ public AbstractColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader) throws IOException {
+ this.descriptor = descriptor;
+ this.pageReader = pageReader;
+ this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+ this.isCurrentPageDictionaryEncoded = true;
+ } catch (IOException e) {
+ throw new IOException("could not decode the dictionary for " + descriptor, e);
+ }
+ } else {
+ this.dictionary = null;
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+ /*
+ * Total number of values in this column (in this row group).
+ */
+ long totalValueCount = pageReader.getTotalValueCount();
+ if (totalValueCount == 0) {
+ throw new IOException("totalValueCount == 0");
+ }
+ }
+
+ protected void checkTypeName(PrimitiveType.PrimitiveTypeName expectedName) {
+ PrimitiveType.PrimitiveTypeName actualName = descriptor.getPrimitiveType().getPrimitiveTypeName();
+ Preconditions.checkArgument(
+ actualName == expectedName,
+ "Expected type name: %s, actual type name: %s",
+ expectedName,
+ actualName);
+ }
+
+ /**
+ * Reads `total` values from this columnReader into column.
+ */
+ @Override
+ public final void readToVector(int readNumber, V vector) throws IOException {
+ int rowId = 0;
+ WritableIntVector dictionaryIds = null;
+ if (dictionary != null) {
+ dictionaryIds = vector.reserveDictionaryIds(readNumber);
+ }
+ while (readNumber > 0) {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ DataPage page = pageReader.readPage();
+ if (page instanceof DataPageV1) {
+ readPageV1((DataPageV1) page);
+ } else if (page instanceof DataPageV2) {
+ readPageV2((DataPageV2) page);
+ } else {
+ throw new RuntimeException("Unsupported page type: " + page.getClass());
+ }
+ leftInPage = (int) (endOfPageValueCount - valuesRead);
+ }
+ int num = Math.min(readNumber, leftInPage);
+ if (isCurrentPageDictionaryEncoded) {
+ // Read and decode dictionary ids.
+ runLenDecoder.readDictionaryIds(
+ num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);
+
+ if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
+ // Column vector supports lazy decoding of dictionary values so just set the dictionary.
+ // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
+ // non-dictionary encoded values have already been added).
+ vector.setDictionary(new ParquetDictionary(dictionary));
+ } else {
+ readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
+ }
+ } else {
+ if (vector.hasDictionary() && rowId != 0) {
+ // This batch already has dictionary encoded values but this new page is not. The batch
+ // does not support a mix of dictionary and not so we will decode the dictionary.
+ readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
+ }
+ vector.setDictionary(null);
+ readBatch(rowId, num, vector);
+ }
+
+ valuesRead += num;
+ rowId += num;
+ readNumber -= num;
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+
+ // Initialize the decoders.
+ if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
+ }
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ this.runLenDecoder = new RunLengthDecoder(bitWidth);
+ try {
+ BytesInput bytes = page.getBytes();
+ ByteBufferInputStream in = bytes.toInputStream();
+ rlReader.initFromPage(pageValueCount, in);
+ this.runLenDecoder.initFromStream(pageValueCount, in);
+ prepareNewPage(page.getValueEncoding(), in);
+ } catch (IOException e) {
+ throw new IOException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
+
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ // do not read the length from the stream. v2 pages handle dividing the page bytes.
+ this.runLenDecoder = new RunLengthDecoder(bitWidth, false);
+ this.runLenDecoder.initFromStream(
+ this.pageValueCount, page.getDefinitionLevels().toInputStream());
+ try {
+ prepareNewPage(page.getDataEncoding(), page.getData().toInputStream());
+ } catch (IOException e) {
+ throw new IOException("could not read page " + page + " in col " + descriptor, e);
+ }
+ }
+
+ private void prepareNewPage(
+ Encoding dataEncoding,
+ ByteBufferInputStream in) throws IOException {
+ this.endOfPageValueCount = valuesRead + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ if (dictionary == null) {
+ throw new IOException("Could not read page in col "
+ + descriptor
+ + " as the dictionary was missing for encoding "
+ + dataEncoding);
+ }
+ @SuppressWarnings("deprecation")
+ Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
+ if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+ }
+ this.dataInputStream = null;
+ this.dictionaryIdsDecoder = new RunLengthDecoder();
+ try {
+ this.dictionaryIdsDecoder.initFromStream(pageValueCount, in);
+ } catch (IOException e) {
+ throw new IOException("could not read dictionary in col " + descriptor, e);
+ }
+ this.isCurrentPageDictionaryEncoded = true;
+ } else {
+ if (dataEncoding != Encoding.PLAIN) {
+ throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+ }
+ this.dictionaryIdsDecoder = null;
+ LOG.debug("init from page at offset {} for length {}", in.position(), in.available());
+ this.dataInputStream = in.remainingStream();
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+
+ afterReadPage();
+ }
+
+ final ByteBuffer readDataBuffer(int length) {
+ try {
+ return dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
+ }
+ }
+
+ /**
+ * After read a page, we may need some initialization.
+ */
+ protected void afterReadPage() {
+ }
+
+ /**
+ * Support lazy dictionary ids decode. See more in {@link ParquetDictionary}.
+ * If return false, we will decode all the data first.
+ */
+ protected boolean supportLazyDecode() {
+ return true;
+ }
+
+ /**
+ * Read batch from {@link #runLenDecoder} and {@link #dataInputStream}.
+ */
+ protected abstract void readBatch(int rowId, int num, V column);
+
+ /**
+ * Decode dictionary ids to data.
+ * From {@link #runLenDecoder} and {@link #dictionaryIdsDecoder}.
+ */
+ protected abstract void readBatchFromDictionaryIds(
+ int rowId,
+ int num,
+ V column,
+ WritableIntVector dictionaryIds);
+}
+
diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
new file mode 100644
index 0000000000000..d94c1e1da4bb6
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Array {@link ColumnReader}.
+ */
+public class ArrayColumnReader extends BaseVectorizedColumnReader {
+
+ // The value read in last time
+ private Object lastValue;
+
+ // flag to indicate if there is no data in parquet data page
+ private boolean eof = false;
+
+ // flag to indicate if it's the first time to read parquet data page with this instance
+ boolean isFirstRow = true;
+
+ public ArrayColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean isUtcTimestamp,
+ Type type,
+ LogicalType logicalType)
+ throws IOException {
+ super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
+ HeapArrayVector lcv = (HeapArrayVector) vector;
+ // before readBatch, initial the size of offsets & lengths as the default value,
+ // the actual size will be assigned in setChildrenInfo() after reading complete.
+ lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+ lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+ // Because the length of ListColumnVector.child can't be known now,
+ // the valueList will save all data for ListColumnVector temporary.
+ List
org.apache.flink
- flink-hadoop-compatibility_${scala.binary.version}
+ ${flink.hadoop.compatibility.artifactId}
${flink.version}
diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml
index 59f382ded125f..3d0553312e603 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -25,7 +25,7 @@
../../pom.xml
4.0.0
- hudi-flink${flink.bundle.version}-bundle_${scala.binary.version}
+ hudi-flink${flink.bundle.version}-bundle
jar
@@ -133,14 +133,14 @@
com.esotericsoftware:kryo-shaded
- org.apache.flink:flink-hadoop-compatibility_${scala.binary.version}
+ org.apache.flink:${flink.hadoop.compatibility.artifactId}
org.apache.flink:flink-json
- org.apache.flink:flink-parquet_${scala.binary.version}
+ org.apache.flink:${flink.parquet.artifactId}
org.apache.hive:hive-common
org.apache.hive:hive-service
org.apache.hive:hive-service-rpc
- org.apache.hive:hive-exec
+ org.apache.hive:hive-exec
org.apache.hive:hive-standalone-metastore
org.apache.hive:hive-metastore
org.apache.hive:hive-jdbc
@@ -416,13 +416,13 @@
org.apache.flink
- flink-hadoop-compatibility_${scala.binary.version}
+ ${flink.hadoop.compatibility.artifactId}
${flink.version}
compile
org.apache.flink
- flink-parquet_${scala.binary.version}
+ ${flink.parquet.artifactId}
${flink.version}
compile
diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml
index 29eccea0f3f0f..a57b8d88311bb 100644
--- a/packaging/hudi-kafka-connect-bundle/pom.xml
+++ b/packaging/hudi-kafka-connect-bundle/pom.xml
@@ -90,7 +90,7 @@
org.apache.hudi:flink-core
org.apache.hudi:hudi-flink-client
org.apache.flink:flink-core
- org.apache.flink:flink-hadoop-compatibility_${scala.binary.version}
+ ${flink.hadoop.compatibility.artifactId}
com.github.davidmoten:guava-mini
com.github.davidmoten:hilbert-curve
diff --git a/pom.xml b/pom.xml
index f5b3413160bd1..7ff6add7b189c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,15 +123,23 @@
2.4.4
3.2.1
+ 1.15.0
1.14.4
1.13.6
- ${flink1.14.version}
- hudi-flink1.14.x
- 1.14
+ ${flink1.13.version}
+ hudi-flink1.13.x
+ 1.13
+ 1.12.2
flink-runtime
flink-table-runtime_${scala.binary.version}
flink-table-planner_${scala.binary.version}
- 1.12.2
+ flink-parquet
+ flink-statebackend-rocksdb
+ flink-test-utils
+ flink-streaming-java
+ flink-clients
+ flink-connector-kafka
+ flink-hadoop-compatibility_${scala.binary.version}
3.1.3
3.2.1
hudi-spark2
@@ -644,19 +652,19 @@
org.apache.flink
- flink-streaming-java_${scala.binary.version}
+ ${flink.streaming.java.artifactId}
${flink.version}
provided
org.apache.flink
- flink-clients_${scala.binary.version}
+ ${flink.clients.artifactId}
${flink.version}
provided
org.apache.flink
- flink-connector-kafka_${scala.binary.version}
+ ${flink.connector.kafka.artifactId}
${flink.version}
provided
@@ -1759,8 +1767,43 @@
+
+ flink1.15
+
+ ${flink1.15.version}
+ flink-table-runtime
+ flink-parquet
+ flink-statebackend-rocksdb
+ flink-test-utils
+ flink-streaming-java
+ flink-clients
+ flink-connector-kafka
+
+ flink-hadoop-compatibility_2.12
+ hudi-flink1.15.x
+ 1.15
+
+
+
+ flink1.15
+
+
+
flink1.14
+
+ ${flink1.14.version}
+ flink-table-runtime_${scala.binary.version}
+ flink-parquet_${scala.binary.version}
+ flink-statebackend-rocksdb_${scala.binary.version}
+ flink-test-utils_${scala.binary.version}
+ flink-streaming-java_${scala.binary.version}
+ flink-clients_${scala.binary.version}
+ flink-connector-kafka_${scala.binary.version}
+ flink-hadoop-compatibility_${scala.binary.version}
+ hudi-flink1.14.x
+ 1.14
+
true
@@ -1773,10 +1816,18 @@
flink1.13
+ 2.11
${flink1.13.version}
flink-runtime_${scala.binary.version}
flink-table-runtime-blink_${scala.binary.version}
flink-table-planner-blink_${scala.binary.version}
+ flink-parquet_${scala.binary.version}
+ flink-statebackend-rocksdb_${scala.binary.version}
+ flink-test-utils_${scala.binary.version}
+ flink-streaming-java_${scala.binary.version}
+ flink-clients_${scala.binary.version}
+ flink-connector-kafka_${scala.binary.version}
+ flink-hadoop-compatibility_${scala.binary.version}
hudi-flink1.13.x
1.13
true