Skip to content

Commit c0b1606

Browse files
committed
[FLINK-22471][flink-table-runtime-blink] Use proper Description for connector options
1 parent ed2a46a commit c0b1606

1 file changed

Lines changed: 78 additions & 49 deletions

File tree

flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java

Lines changed: 78 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
import org.apache.flink.configuration.ConfigOption;
2222
import org.apache.flink.configuration.MemorySize;
23+
import org.apache.flink.configuration.description.Description;
2324
import org.apache.flink.table.factories.FactoryUtil;
2425

2526
import java.time.Duration;
2627

2728
import static org.apache.flink.configuration.ConfigOptions.key;
29+
import static org.apache.flink.configuration.description.TextElement.text;
2830

2931
/** This class holds configuration constants used by filesystem(Including hive) connector. */
3032
public class FileSystemOptions {
@@ -77,21 +79,27 @@ public class FileSystemOptions {
7779
.booleanType()
7880
.defaultValue(false)
7981
.withDescription(
80-
"Enable streaming source or not.\n"
81-
+ " NOTES: Please make sure that each partition/file should be written"
82-
+ " atomically, otherwise the reader may get incomplete data.");
82+
Description.builder()
83+
.text("Enable streaming source or not.")
84+
.linebreak()
85+
.text(
86+
" NOTES: Please make sure that each partition/file should be written"
87+
+ " atomically, otherwise the reader may get incomplete data.")
88+
.build());
8389

8490
public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE =
8591
key("streaming-source.partition.include")
8692
.stringType()
8793
.defaultValue("all")
8894
.withDescription(
89-
"Option to set the partitions to read, the supported values "
90-
+ "are \"all\" and \"latest\","
91-
+ " the \"all\" means read all partitions; the \"latest\" means read latest "
92-
+ "partition in order of streaming-source.partition.order, the \"latest\" only works"
93-
+ " when the streaming hive source table used as temporal table. "
94-
+ "By default the option is \"all\".\n.");
95+
Description.builder()
96+
.text(
97+
"Option to set the partitions to read, supported values are")
98+
.list(
99+
text("all (read all partitions)"),
100+
text(
101+
"latest (read latest partition in order of 'streaming-source.partition.order', this only works when a streaming Hive source table is used as a temporal table)"))
102+
.build());
95103

96104
public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL =
97105
key("streaming-source.monitor-interval")
@@ -105,39 +113,47 @@ public class FileSystemOptions {
105113
.defaultValue("partition-name")
106114
.withDeprecatedKeys("streaming-source.consume-order")
107115
.withDescription(
108-
"The partition order of streaming source,"
109-
+ " support \"create-time\", \"partition-time\" and \"partition-name\"."
110-
+ " \"create-time\" compares partition/file creation time, this is not the"
111-
+ " partition create time in Hive metaStore, but the folder/file modification"
112-
+ " time in filesystem, if the partition folder somehow gets updated,"
113-
+ " e.g. add new file into folder, it can affect how the data is consumed."
114-
+ " \"partition-time\" compares the time extracted from partition name."
115-
+ " \"partition-name\" compares partition name's alphabetical order."
116-
+ " This option is equality with deprecated option \"streaming-source.consume-order\".");
116+
Description.builder()
117+
.text(
118+
"The partition order of the streaming source, supported values are")
119+
.list(
120+
text(
121+
"create-time (compares partition/file creation time, which is not the partition creation time in the Hive metastore, "
122+
+ "but the folder/file modification time in the filesystem; e.g., adding a new file into "
123+
+ "the folder may affect how the data is consumed)"),
124+
text(
125+
"partition-time (compares the time extracted from the partition name)"),
126+
text(
127+
"partition-name (compares partition names lexicographically)"))
128+
.text(
129+
"This is a synonym for the deprecated 'streaming-source.consume-order' option.")
130+
.build());
117131

118132
public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET =
119133
key("streaming-source.consume-start-offset")
120134
.stringType()
121135
.noDefaultValue()
122136
.withDescription(
123-
"Start offset for streaming consuming."
124-
+ " How to parse and compare offsets depends on your order."
125-
+ " For create-time and partition-time, should be a timestamp"
126-
+ " string (yyyy-[m]m-[d]d [hh:mm:ss])."
127-
+ " For partition-time, will use partition time extractor to"
128-
+ " extract time from partition."
129-
+ " For partition-name, is the partition name string, e.g.:"
130-
+ " pt_year=2020/pt_mon=10/pt_day=01");
137+
Description.builder()
138+
.text(
139+
"Start offset for streaming consuming. How to parse and compare offsets depends on 'streaming-source.partition-order'.")
140+
.list(
141+
text(
142+
"For 'create-time' and 'partition-time' it should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss])."),
143+
text(
144+
"For 'partition-time' it will use a partition time extractor to extract the time from the partition."),
145+
text(
146+
"For 'partition-name' it is the name of the partition, e.g. 'pt_year=2020/pt_mon=10/pt_day=01'."))
147+
.build());
131148

132149
public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_KIND =
133150
key("partition.time-extractor.kind")
134151
.stringType()
135152
.defaultValue("default")
136153
.withDescription(
137-
"Time extractor to extract time from partition values."
138-
+ " Support default and custom."
139-
+ " For default, can configure timestamp pattern."
140-
+ " For custom, should configure extractor class.");
154+
"Time extractor to extract time from partition values. "
155+
+ "This can either be 'default' or a custom extractor class. "
156+
+ "For 'default', you can configure a timestamp pattern.");
141157

142158
public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_CLASS =
143159
key("partition.time-extractor.class")
@@ -151,43 +167,56 @@ public class FileSystemOptions {
151167
.stringType()
152168
.noDefaultValue()
153169
.withDescription(
154-
"The 'default' construction way allows users to use partition"
155-
+ " fields to get a legal timestamp pattern."
156-
+ " Default support 'yyyy-mm-dd hh:mm:ss' from first field."
157-
+ " If timestamp in partition is single field 'dt', can configure: '$dt'."
158-
+ " If timestamp in partition is year, month, day, hour,"
159-
+ " can configure: '$year-$month-$day $hour:00:00'."
160-
+ " If timestamp in partition is dt and hour, can configure: '$dt $hour:00:00'.");
170+
Description.builder()
171+
.text(
172+
"When 'partition.time-extractor.kind' is set to 'default', "
173+
+ "you can specify a pattern to get a timestamp from partitions.")
174+
.list(
175+
text(
176+
"By default, a format of 'yyyy-mm-dd hh:mm:ss' is read from the first field."),
177+
text(
178+
"If the timestamp in the partition is a single field called 'dt', you can use '$dt'."),
179+
text(
180+
"If it is spread across multiple fields for year, month, day, and hour, you can use '$year-$month-$day $hour:00:00'."),
181+
text(
182+
"If the timestamp is in fields dt and hour, you can use '$dt $hour:00:00'."))
183+
.build());
161184

162185
public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL =
163186
key("lookup.join.cache.ttl")
164187
.durationType()
165188
.defaultValue(Duration.ofMinutes(60))
166189
.withDescription(
167-
"The cache TTL (e.g. 10min) for the build table in lookup join. "
168-
+ "By default the TTL is 60 minutes.");
190+
"The cache TTL (e.g. 10min) for the build table in lookup join.");
169191

170192
public static final ConfigOption<String> SINK_PARTITION_COMMIT_TRIGGER =
171193
key("sink.partition-commit.trigger")
172194
.stringType()
173195
.defaultValue("process-time")
174196
.withDescription(
175-
"Trigger type for partition commit:\n"
176-
+ " 'process-time': based on the time of the machine, it neither requires"
177-
+ " partition time extraction nor watermark generation. Commit partition"
178-
+ " once the 'current system time' passes 'partition creation system time' plus 'delay'.\n"
179-
+ " 'partition-time': based on the time that extracted from partition values,"
180-
+ " it requires watermark generation. Commit partition once the 'watermark'"
181-
+ " passes 'time extracted from partition values' plus 'delay'.");
197+
Description.builder()
198+
.text("Trigger type for partition commit, supported values are")
199+
.list(
200+
text(
201+
"process-time (based on the time of the machine, requires "
202+
+ "neither partition time extraction nor watermark generation; "
203+
+ "commits partition once the current system time passes partition creation system time plus delay)"),
204+
text(
205+
"partition-time (based on the time extracted from partition values, "
206+
+ "requires watermark generation; commits partition once "
207+
+ "the watermark passes the time extracted from partition values plus delay)"))
208+
.build());
182209

183210
public static final ConfigOption<Duration> SINK_PARTITION_COMMIT_DELAY =
184211
key("sink.partition-commit.delay")
185212
.durationType()
186213
.defaultValue(Duration.ofMillis(0))
187214
.withDescription(
188-
"The partition will not commit until the delay time."
189-
+ " if it is a day partition, should be '1 d',"
190-
+ " if it is a hour partition, should be '1 h'");
215+
Description.builder()
216+
.text(
217+
"The partition will not commit until the delay time. "
218+
+ "The value should be '1 d' for day partitions and '1 h' for hour partitions.")
219+
.build());
191220

192221
public static final ConfigOption<String> SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE =
193222
key("sink.partition-commit.watermark-time-zone")

0 commit comments

Comments
 (0)