Skip to content

Commit 064ddc6

Browse files
author
jian.feng
committed
fix 'Not a valid schema field: ts' error in HoodieFlinkCompactor, if precombine field is not ts
1 parent 6b02877 commit 064ddc6

2 files changed

Lines changed: 16 additions & 0 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, Str
173173
// set table schema
174174
CompactionUtil.setAvroSchema(conf, metaClient);
175175

176+
CompactionUtil.setPreCombineField(conf, metaClient);
177+
176178
// infer changelog mode
177179
CompactionUtil.inferChangelogMode(conf, metaClient);
178180

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,20 @@ public static void setAvroSchema(HoodieWriteConfig writeConfig, HoodieTableMetaC
119119
writeConfig.setSchema(tableAvroSchema.toString());
120120
}
121121

122+
/**
123+
* Sets up the preCombine field into the given configuration {@code conf}
124+
* through reading from the hoodie table metadata.
125+
*
126+
* This value is non-null as compaction can only be performed on MOR tables.
127+
* Of which, MOR tables will have non-null precombine fields.
128+
*
129+
* @param conf The configuration
130+
*/
131+
public static void setPreCombineField(Configuration conf, HoodieTableMetaClient metaClient) {
132+
String preCombineField = metaClient.getTableConfig().getPreCombineField();
133+
conf.setString(FlinkOptions.PRECOMBINE_FIELD, preCombineField);
134+
}
135+
122136
/**
123137
* Infers the changelog mode based on the data file schema(including metadata fields).
124138
*

0 commit comments

Comments
 (0)