Skip to content

Commit 4f4d70a

Browse files
committed
[HUDI-3336][HUDI-FLINK]Support custom hadoop config for flink
1 parent 5dd23dc commit 4f4d70a

5 files changed

Lines changed: 16 additions & 34 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -738,16 +738,14 @@ public static org.apache.hadoop.conf.Configuration getParquetConf(
738738
return copy;
739739
}
740740

741-
// Keep the redundant to avoid too many modifications.
741+
/**
742+
* Create a new hadoop configuration that is initialized with the given flink configuration.
743+
*/
742744
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) {
743-
if (conf == null) {
744-
return FlinkClientUtil.getHadoopConf();
745-
} else {
746-
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
747-
Map<String, String> options = getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX);
748-
options.forEach((k, v) -> hadoopConf.set(k, v));
749-
return hadoopConf;
750-
}
745+
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
746+
Map<String, String> options = getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX);
747+
options.forEach((k, v) -> hadoopConf.set(k, v));
748+
return hadoopConf;
751749
}
752750

753751
/**
@@ -771,9 +769,7 @@ private static boolean hasPropertyOptions(Map<String, String> options, String pr
771769

772770
/**
773771
* Creates a new configuration that is initialized with the options of the given map.
774-
* @deprecated Use {@link Configuration#fromMap(Map)} instead.
775772
*/
776-
@Deprecated
777773
public static Configuration fromMap(Map<String, String> map) {
778774
final Configuration configuration = new Configuration();
779775
map.forEach(configuration::setString);

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) {
113113
public void open(Configuration parameters) throws IOException {
114114
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
115115
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
116-
this.ckpMetadata = CkpMetadata.getInstance(config.getString(FlinkOptions.PATH));
116+
this.ckpMetadata = CkpMetadata.getInstance(config);
117117
this.initInstant = lastPendingInstant();
118118
sendBootstrapEvent();
119119
initWriterHelper();

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package org.apache.hudi.sink.meta;
2020

21+
import org.apache.flink.configuration.Configuration;
2122
import org.apache.hudi.common.fs.FSUtils;
2223
import org.apache.hudi.common.table.HoodieTableMetaClient;
2324
import org.apache.hudi.common.util.ValidationUtils;
25+
import org.apache.hudi.configuration.FlinkOptions;
2426
import org.apache.hudi.exception.HoodieException;
2527
import org.apache.hudi.util.StreamerUtil;
2628

@@ -70,8 +72,8 @@ public class CkpMetadata implements Serializable {
7072
private List<CkpMessage> messages;
7173
private List<String> instantCache;
7274

73-
private CkpMetadata(String basePath) {
74-
this(FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()), basePath);
75+
private CkpMetadata(Configuration config) {
76+
this(FSUtils.getFs(config.getString(FlinkOptions.PATH), FlinkOptions.getHadoopConf(config)), config.getString(FlinkOptions.PATH));
7577
}
7678

7779
private CkpMetadata(FileSystem fs, String basePath) {
@@ -196,8 +198,8 @@ public boolean isAborted(String instant) {
196198
// -------------------------------------------------------------------------
197199
// Utilities
198200
// -------------------------------------------------------------------------
199-
public static CkpMetadata getInstance(String basePath) {
200-
return new CkpMetadata(basePath);
201+
public static CkpMetadata getInstance(Configuration config) {
202+
return new CkpMetadata(config);
201203
}
202204

203205
public static CkpMetadata getInstance(FileSystem fs, String basePath) {

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,6 @@ public static DFSPropertiesConfiguration readConfig(org.apache.hadoop.conf.Confi
140140
return conf;
141141
}
142142

143-
/**
144-
* Keep the redundant to avoid too many modifications.
145-
* @deprecated Use {@link FlinkOptions#getHadoopConf(Configuration)} instead.
146-
*/
147-
@Deprecated
148-
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
149-
return FlinkOptions.getHadoopConf(new Configuration());
150-
}
151-
152143
/**
153144
* Mainly used for tests.
154145
*/
@@ -352,14 +343,6 @@ public static HoodieTableMetaClient createMetaClient(String basePath, org.apache
352343
return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build();
353344
}
354345

355-
/**
356-
* Creates the meta client.
357-
*/
358-
@Deprecated
359-
public static HoodieTableMetaClient createMetaClient(String basePath) {
360-
return createMetaClient(basePath, FlinkClientUtil.getHadoopConf());
361-
}
362-
363346
/**
364347
* Creates the meta client.
365348
*/

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.sink.meta;
2020

2121
import org.apache.hudi.common.fs.FSUtils;
22+
import org.apache.hudi.configuration.FlinkOptions;
2223
import org.apache.hudi.util.StreamerUtil;
2324
import org.apache.hudi.utils.TestConfigurations;
2425

@@ -47,7 +48,7 @@ public class TestCkpMetadata {
4748
@BeforeEach
4849
public void beforeEach() throws Exception {
4950
String basePath = tempFile.getAbsolutePath();
50-
FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), StreamerUtil.getHadoopConf());
51+
FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), FlinkOptions.getHadoopConf(new Configuration()));
5152

5253
Configuration conf = TestConfigurations.getDefaultConf(basePath);
5354
StreamerUtil.initTableIfNotExists(conf);

0 commit comments

Comments
 (0)