Skip to content

Commit 1c804a8

Browse files
linfey90feiyang2022
authored andcommitted
[HUDI-4611] Fix the duplicate creation of config in HoodieFlinkStreamer (#6369)
Co-authored-by: linfey <[email protected]>
1 parent 4b6696f commit 1c804a8

File tree

2 files changed

+2
-6
lines changed

2 files changed

+2
-6
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,12 @@ public static void main(String[] args) throws Exception {
6969
TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();
7070
kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg));
7171

72+
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
7273
// Read from kafka source
7374
RowType rowType =
74-
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
75+
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
7576
.getLogicalType();
7677

77-
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
7878
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
7979
int parallelism = env.getParallelism();
8080
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,6 @@ public static TypedProperties getProps(FlinkStreamerConfig cfg) {
116116
new Path(cfg.propsFilePath), cfg.configs).getProps();
117117
}
118118

119-
public static Schema getSourceSchema(FlinkStreamerConfig cfg) {
120-
return new FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema();
121-
}
122-
123119
public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
124120
if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
125121
return new FilebasedSchemaProvider(conf).getSourceSchema();

0 commit comments

Comments
 (0)