Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,14 @@ public void open(FileInputSplit fileSplit) throws IOException {
LinkedHashMap<String, String> partSpec = PartitionPathUtils.extractPartitionSpecFromPath(
fileSplit.getPath());
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition(
partDefaultName.equals(v) ? null : v,
fullFieldTypes[fieldNameList.indexOf(k)])));
partSpec.forEach((k, v) -> {
DataType fieldType = fullFieldTypes[fieldNameList.indexOf(k)];
if (!DataTypeUtils.isDatetimeType(fieldType)) {
// date time type partition field is formatted specifically,
// read directly from the data file to avoid format mismatch or precision loss
partObjects.put(k, DataTypeUtils.resolvePartition(partDefaultName.equals(v) ? null : v, fieldType));
}
});

this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
utcTimestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,14 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos)
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
FilePathUtils.extractPartitionKeys(this.conf));
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition(
defaultPartName.equals(v) ? null : v,
fieldTypes.get(fieldNames.indexOf(k)))));
partSpec.forEach((k, v) -> {
DataType fieldType = fieldTypes.get(fieldNames.indexOf(k));
if (!DataTypeUtils.isDatetimeType(fieldType)) {
// date time type partition field is formatted specifically,
// read directly from the data file to avoid format mismatch or precision loss
partObjects.put(k, DataTypeUtils.resolvePartition(defaultPartName.equals(v) ? null : v, fieldType));
}
});

return ParquetSplitReaderUtil.genPartColumnarRowReader(
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,15 +1017,12 @@ void testAppendWrite(boolean clustering) {
}

@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
// can not read the hive style and timestamp based partitioning table
// in batch mode, the code path in CopyOnWriteInputFormat relies on
// the value on the partition path to recover the partition value,
// but the date format has changed(milliseconds switch to hours).
@MethodSource("executionModeAndPartitioningParams")
void testWriteAndReadWithTimestampPartitioning(ExecMode execMode, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.partitionField("ts") // use timestamp as partition path field
.end();
tableEnv.executeSql(hoodieTableDDL);
Expand All @@ -1044,6 +1041,26 @@ void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}

@Test
void testMergeOnReadCompactionWithTimestampPartitioning() {
TableEnvironment tableEnv = batchTableEnv;

String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.option(FlinkOptions.COMPACTION_TASKS, 1)
.partitionField("ts")
.end();
tableEnv.executeSql(hoodieTableDDL);
execInsertSql(tableEnv, TestSQL.INSERT_T1);

List<Row> rows = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());

assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}

@ParameterizedTest
@ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DAY, FlinkOptions.PARTITION_FORMAT_DASHED_DAY})
void testWriteAndReadWithDatePartitioning(String partitionFormat) {
Expand Down