Skip to content

Commit 49ed03f

Browse files
danny0405yuzhaojing
authored andcommitted
[HUDI-4638] Rename payload clazz and preCombine field options for flink sql (#6434)
1 parent 78b4a99 commit 49ed03f

1 file changed

Lines changed: 45 additions & 30 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ private FlinkOptions() {
6767
// ------------------------------------------------------------------------
6868
// Base Options
6969
// ------------------------------------------------------------------------
70+
7071
public static final ConfigOption<String> PATH = ConfigOptions
7172
.key("path")
7273
.stringType()
@@ -79,6 +80,38 @@ private FlinkOptions() {
7980
// Common Options
8081
// ------------------------------------------------------------------------
8182

83+
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
84+
.key(HoodieWriteConfig.TBL_NAME.key())
85+
.stringType()
86+
.noDefaultValue()
87+
.withDescription("Table name to register to Hive metastore");
88+
89+
public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name();
90+
public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name();
91+
public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
92+
.key("table.type")
93+
.stringType()
94+
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
95+
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
96+
97+
public static final String NO_PRE_COMBINE = "no_precombine";
98+
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
99+
.key("payload.ordering.field")
100+
.stringType()
101+
.defaultValue("ts")
102+
.withFallbackKeys("write.precombine.field")
103+
.withDescription("Field used in preCombining before actual write. When two records have the same\n"
104+
+ "key value, we will pick the one with the largest value for the precombine field,\n"
105+
+ "determined by Object.compareTo(..)");
106+
107+
public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
108+
.key("payload.class")
109+
.stringType()
110+
.defaultValue(EventTimeAvroPayload.class.getName())
111+
.withFallbackKeys("write.payload.class")
112+
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
113+
+ "This will render any value set for the option in-effective");
114+
82115
public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
83116
.key("partition.default_name")
84117
.stringType()
@@ -116,6 +149,7 @@ private FlinkOptions() {
116149
// ------------------------------------------------------------------------
117150
// Index Options
118151
// ------------------------------------------------------------------------
152+
119153
public static final ConfigOption<String> INDEX_TYPE = ConfigOptions
120154
.key("index.type")
121155
.stringType()
@@ -150,6 +184,7 @@ private FlinkOptions() {
150184
// ------------------------------------------------------------------------
151185
// Read Options
152186
// ------------------------------------------------------------------------
187+
153188
public static final ConfigOption<Integer> READ_TASKS = ConfigOptions
154189
.key("read.tasks")
155190
.intType()
@@ -247,19 +282,6 @@ private FlinkOptions() {
247282
// ------------------------------------------------------------------------
248283
// Write Options
249284
// ------------------------------------------------------------------------
250-
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
251-
.key(HoodieWriteConfig.TBL_NAME.key())
252-
.stringType()
253-
.noDefaultValue()
254-
.withDescription("Table name to register to Hive metastore");
255-
256-
public static final String TABLE_TYPE_COPY_ON_WRITE = HoodieTableType.COPY_ON_WRITE.name();
257-
public static final String TABLE_TYPE_MERGE_ON_READ = HoodieTableType.MERGE_ON_READ.name();
258-
public static final ConfigOption<String> TABLE_TYPE = ConfigOptions
259-
.key("table.type")
260-
.stringType()
261-
.defaultValue(TABLE_TYPE_COPY_ON_WRITE)
262-
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
263285

264286
public static final ConfigOption<Boolean> INSERT_CLUSTER = ConfigOptions
265287
.key("write.insert.cluster")
@@ -275,22 +297,6 @@ private FlinkOptions() {
275297
.defaultValue("upsert")
276298
.withDescription("The write operation, that this write should do");
277299

278-
public static final String NO_PRE_COMBINE = "no_precombine";
279-
public static final ConfigOption<String> PRECOMBINE_FIELD = ConfigOptions
280-
.key("write.precombine.field")
281-
.stringType()
282-
.defaultValue("ts")
283-
.withDescription("Field used in preCombining before actual write. When two records have the same\n"
284-
+ "key value, we will pick the one with the largest value for the precombine field,\n"
285-
+ "determined by Object.compareTo(..)");
286-
287-
public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
288-
.key("write.payload.class")
289-
.stringType()
290-
.defaultValue(EventTimeAvroPayload.class.getName())
291-
.withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n"
292-
+ "This will render any value set for the option in-effective");
293-
294300
/**
295301
* Flag to indicate whether to drop duplicates before insert/upsert.
296302
* By default false to gain extra performance.
@@ -395,7 +401,7 @@ private FlinkOptions() {
395401
.key("write.index_bootstrap.tasks")
396402
.intType()
397403
.noDefaultValue()
398-
.withDescription("Parallelism of tasks that do index bootstrap, default same as the sink parallelism");
404+
.withDescription("Parallelism of tasks that do index bootstrap, default same as the write task parallelism");
399405

400406
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
401407
.key("write.bucket_assign.tasks")
@@ -579,6 +585,14 @@ private FlinkOptions() {
579585
.withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
580586
+ "This also directly translates into how much you can incrementally pull on this table, default 30");
581587

588+
public static final ConfigOption<Integer> CLEAN_RETAIN_HOURS = ConfigOptions
589+
.key("clean.retain_hours")
590+
.intType()
591+
.defaultValue(24)// default 24 hours
592+
.withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as"
593+
+ "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
594+
+ " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
595+
582596
public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions
583597
.key("clean.retain_file_versions")
584598
.intType()
@@ -683,6 +697,7 @@ private FlinkOptions() {
683697
// ------------------------------------------------------------------------
684698
// Hive Sync Options
685699
// ------------------------------------------------------------------------
700+
686701
public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
687702
.key("hive_sync.enable")
688703
.booleanType()

0 commit comments

Comments
 (0)