Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,14 @@ public void open(FileInputSplit fileSplit) throws IOException {
LinkedHashMap<String, String> partSpec = PartitionPathUtils.extractPartitionSpecFromPath(
fileSplit.getPath());
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition(
partDefaultName.equals(v) ? null : v,
fullFieldTypes[fieldNameList.indexOf(k)])));
partSpec.forEach((k, v) -> {
DataType fieldType = fullFieldTypes[fieldNameList.indexOf(k)];
if (!DataTypeUtils.isDatetimeType(fieldType)) {
// date time type partition field is formatted specifically,
// read directly from the data file to avoid format mismatch or precision loss
partObjects.put(k, DataTypeUtils.resolvePartition(partDefaultName.equals(v) ? null : v, fieldType));
}
});

this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
utcTimestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,14 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos)
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING),
FilePathUtils.extractPartitionKeys(this.conf));
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> partObjects.put(k, DataTypeUtils.resolvePartition(
defaultPartName.equals(v) ? null : v,
fieldTypes.get(fieldNames.indexOf(k)))));
partSpec.forEach((k, v) -> {
DataType fieldType = fieldTypes.get(fieldNames.indexOf(k));
if (!DataTypeUtils.isDatetimeType(fieldType)) {
// date time type partition field is formatted specifically,
// read directly from the data file to avoid format mismatch or precision loss
partObjects.put(k, DataTypeUtils.resolvePartition(defaultPartName.equals(v) ? null : v, fieldType));
}
});

return ParquetSplitReaderUtil.genPartColumnarRowReader(
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* Utilities for {@link org.apache.flink.table.types.DataType}.
*/
public class DataTypeUtils {

/**
* Returns whether the given type is TIMESTAMP type.
*/
Expand Down Expand Up @@ -85,11 +86,15 @@ public static boolean isFamily(LogicalType logicalType, LogicalTypeFamily family
* Resolves the partition path string into value obj with given data type.
*/
public static Object resolvePartition(String partition, DataType type) {
return resolvePartition(partition, type.getLogicalType());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to this clazz is unnecessary.

public static Object resolvePartition(String partition, LogicalType type) {
if (partition == null) {
return null;
}

LogicalTypeRoot typeRoot = type.getLogicalType().getTypeRoot();
LogicalTypeRoot typeRoot = type.getTypeRoot();
switch (typeRoot) {
case CHAR:
case VARCHAR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,15 +1017,12 @@ void testAppendWrite(boolean clustering) {
}

@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
// can not read the hive style and timestamp based partitioning table
// in batch mode, the code path in CopyOnWriteInputFormat relies on
// the value on the partition path to recover the partition value,
// but the date format has changed(milliseconds switch to hours).
@MethodSource("executionModeAndPartitioningParams")
void testWriteAndReadWithTimestampPartitioning(ExecMode execMode, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.partitionField("ts") // use timestamp as partition path field
.end();
tableEnv.executeSql(hoodieTableDDL);
Expand All @@ -1044,6 +1041,26 @@ void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}

@Test
void testMergeOnReadCompactionWithTimestampPartitioning() {
TableEnvironment tableEnv = batchTableEnv;

String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
.option(FlinkOptions.COMPACTION_TASKS, 1)
.partitionField("ts")
.end();
tableEnv.executeSql(hoodieTableDDL);
execInsertSql(tableEnv, TestSQL.INSERT_T1);

List<Row> rows = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());

assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}

@ParameterizedTest
@ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DAY, FlinkOptions.PARTITION_FORMAT_DASHED_DAY})
void testWriteAndReadWithDatePartitioning(String partitionFormat) {
Expand Down