Skip to content

Commit 185ec54

Browse files
committed
[HUDI-4098] Support HMS for flink HoodieCatalog
1 parent 8032fca commit 185ec54

File tree

12 files changed

+111
-170
lines changed

12 files changed

+111
-170
lines changed

hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ private MessageType getTableParquetSchemaFromDataFile() {
262262
}
263263
}
264264

265-
public static MessageType convertAvroSchemaToParquetMessageType(Schema schema, Configuration hadoopConf) {
265+
public static MessageType convertAvroSchemaToParquet(Schema schema, Configuration hadoopConf) {
266266
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(hadoopConf);
267267
return avroSchemaConverter.convert(schema);
268268
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.apache.flink.configuration.Configuration;
2424
import org.apache.hadoop.fs.Path;
2525

26+
import java.io.File;
27+
import java.util.ArrayList;
28+
import java.util.List;
2629
import java.util.Map;
2730

2831
/**
@@ -54,6 +57,45 @@ public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration c
5457
return hadoopConf;
5558
}
5659

60+
/**
61+
* Returns a new hadoop configuration that is initialized with the given hadoopConfDir.
62+
*
63+
* @param hadoopConfDir Hadoop conf directory path.
64+
* @return A Hadoop configuration instance.
65+
*/
66+
public static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) {
67+
if (new File(hadoopConfDir).exists()) {
68+
List<File> possiableConfFiles = new ArrayList<File>();
69+
File coreSite = new File(hadoopConfDir, "core-site.xml");
70+
if (coreSite.exists()) {
71+
possiableConfFiles.add(coreSite);
72+
}
73+
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
74+
if (hdfsSite.exists()) {
75+
possiableConfFiles.add(hdfsSite);
76+
}
77+
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
78+
if (yarnSite.exists()) {
79+
possiableConfFiles.add(yarnSite);
80+
}
81+
// Add mapred-site.xml. We need to read configurations like compression codec.
82+
File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
83+
if (mapredSite.exists()) {
84+
possiableConfFiles.add(mapredSite);
85+
}
86+
if (possiableConfFiles.isEmpty()) {
87+
return null;
88+
} else {
89+
org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
90+
for (File confFile : possiableConfFiles) {
91+
hadoopConfiguration.addResource(new Path(confFile.getAbsolutePath()));
92+
}
93+
return hadoopConfiguration;
94+
}
95+
}
96+
return null;
97+
}
98+
5799
/**
58100
* Creates a Hive configuration with configured dir path or empty if no Hive conf dir is set.
59101
*/

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.hudi.table.catalog;
2020

21+
import org.apache.hudi.exception.HoodieCatalogException;
22+
2123
import org.apache.flink.configuration.ConfigOption;
2224
import org.apache.flink.configuration.Configuration;
2325
import org.apache.flink.table.catalog.Catalog;
@@ -26,12 +28,11 @@
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
2830

29-
import java.util.Collections;
3031
import java.util.HashSet;
3132
import java.util.Set;
3233

34+
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
3335
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
34-
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
3536

3637
/**
3738
* A catalog factory impl that creates {@link HoodieCatalog}.
@@ -52,21 +53,36 @@ public Catalog createCatalog(Context context) {
5253
FactoryUtil.createCatalogFactoryHelper(this, context);
5354
helper.validate();
5455

55-
return new HoodieCatalog(
56-
context.getName(),
57-
(Configuration) helper.getOptions());
56+
if (helper.getOptions().get(HoodieCatalogFactoryOptions.TYPE).equalsIgnoreCase("hms")) {
57+
return new HoodieHiveCatalog(
58+
context.getName(),
59+
helper.getOptions().get(HoodieCatalogFactoryOptions.DEFAULT_DATABASE),
60+
helper.getOptions().get(HoodieCatalogFactoryOptions.HIVE_CONF_DIR),
61+
helper.getOptions().get(HoodieCatalogFactoryOptions.HADOOP_CONF_DIR));
62+
} else if (helper.getOptions().get(HoodieCatalogFactoryOptions.TYPE).equalsIgnoreCase("dfs")) {
63+
return new HoodieCatalog(
64+
context.getName(),
65+
(Configuration) helper.getOptions());
66+
} else {
67+
throw new HoodieCatalogException("hoodie catalog supports only the hms and dfs modes.");
68+
}
5869
}
5970

6071
@Override
6172
public Set<ConfigOption<?>> requiredOptions() {
6273
Set<ConfigOption<?>> options = new HashSet<>();
6374
options.add(CATALOG_PATH);
64-
options.add(DEFAULT_DATABASE);
6575
return options;
6676
}
6777

6878
@Override
6979
public Set<ConfigOption<?>> optionalOptions() {
70-
return Collections.emptySet();
80+
final Set<ConfigOption<?>> options = new HashSet<>();
81+
options.add(HoodieCatalogFactoryOptions.DEFAULT_DATABASE);
82+
options.add(PROPERTY_VERSION);
83+
options.add(HoodieCatalogFactoryOptions.HIVE_CONF_DIR);
84+
options.add(HoodieCatalogFactoryOptions.HADOOP_CONF_DIR);
85+
options.add(HoodieCatalogFactoryOptions.TYPE);
86+
return options;
7187
}
7288
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactoryOptions.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
/** {@link ConfigOption}s for {@link HoodieHiveCatalog}. */
2626
public class HoodieCatalogFactoryOptions {
27-
public static final String IDENTIFIER = "hudi-hive";
2827
public static final String DEFAULT_DB = "default";
2928
public static final String HIVE_SITE_FILE = "hive-site.xml";
3029

@@ -39,5 +38,8 @@ public class HoodieCatalogFactoryOptions {
3938
public static final ConfigOption<String> HADOOP_CONF_DIR =
4039
ConfigOptions.key("hadoop-conf-dir").stringType().noDefaultValue();
4140

41+
public static final ConfigOption<String> TYPE =
42+
ConfigOptions.key("type").stringType().defaultValue("dfs");
43+
4244
private HoodieCatalogFactoryOptions() {}
4345
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java

Lines changed: 5 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
package org.apache.hudi.table.catalog;
2020

21-
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
21+
import org.apache.hudi.configuration.HadoopConfigurations;
22+
2223
import org.apache.flink.table.catalog.exceptions.CatalogException;
2324
import org.apache.hadoop.conf.Configuration;
2425
import org.apache.hadoop.fs.Path;
@@ -33,8 +34,6 @@
3334
import java.io.IOException;
3435
import java.io.InputStream;
3536
import java.net.URL;
36-
import java.util.ArrayList;
37-
import java.util.List;
3837

3938
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
4039
import static org.apache.hudi.table.catalog.HoodieCatalogFactoryOptions.HIVE_SITE_FILE;
@@ -46,7 +45,7 @@ public class HoodieCatalogUtil {
4645
private static final Logger LOG = LoggerFactory.getLogger(HoodieCatalogUtil.class);
4746

4847
/**
49-
* Returns a new hiveConfig.
48+
* Returns a new {@code HiveConf}.
5049
*
5150
* @param hiveConfDir Hive conf directory path.
5251
* @param hadoopConfDir Hadoop conf directory path.
@@ -56,16 +55,9 @@ public static HiveConf createHiveConf(@Nullable String hiveConfDir, @Nullable St
5655
// create HiveConf from hadoop configuration with hadoop conf directory configured.
5756
Configuration hadoopConf = null;
5857
if (isNullOrWhitespaceOnly(hadoopConfDir)) {
59-
for (String possibleHadoopConfPath :
60-
HadoopUtils.possibleHadoopConfPaths(
61-
new org.apache.flink.configuration.Configuration())) {
62-
hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
63-
if (hadoopConf != null) {
64-
break;
65-
}
66-
}
58+
hadoopConf = HadoopConfigurations.getHadoopConf(new org.apache.flink.configuration.Configuration());
6759
} else {
68-
hadoopConf = getHadoopConfiguration(hadoopConfDir);
60+
hadoopConf = HadoopConfigurations.getHadoopConfiguration(hadoopConfDir);
6961
if (hadoopConf == null) {
7062
String possiableUsedConfFiles =
7163
"core-site.xml | hdfs-site.xml | yarn-site.xml | mapred-site.xml";
@@ -77,9 +69,6 @@ public static HiveConf createHiveConf(@Nullable String hiveConfDir, @Nullable St
7769
+ ") exist in the folder."));
7870
}
7971
}
80-
if (hadoopConf == null) {
81-
hadoopConf = new Configuration();
82-
}
8372
// ignore all the static conf file URLs that HiveConf may have set
8473
HiveConf.setHiveSiteLocation(null);
8574
HiveConf.setLoadMetastoreConfig(false);
@@ -120,43 +109,4 @@ public static HiveConf createHiveConf(@Nullable String hiveConfDir, @Nullable St
120109
public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
121110
return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
122111
}
123-
124-
/**
125-
* Returns a new Hadoop Configuration object using the path to the hadoop conf configured.
126-
*
127-
* @param hadoopConfDir Hadoop conf directory path.
128-
* @return A Hadoop configuration instance.
129-
*/
130-
public static Configuration getHadoopConfiguration(String hadoopConfDir) {
131-
if (new File(hadoopConfDir).exists()) {
132-
List<File> possiableConfFiles = new ArrayList<File>();
133-
File coreSite = new File(hadoopConfDir, "core-site.xml");
134-
if (coreSite.exists()) {
135-
possiableConfFiles.add(coreSite);
136-
}
137-
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
138-
if (hdfsSite.exists()) {
139-
possiableConfFiles.add(hdfsSite);
140-
}
141-
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
142-
if (yarnSite.exists()) {
143-
possiableConfFiles.add(yarnSite);
144-
}
145-
// Add mapred-site.xml. We need to read configurations like compression codec.
146-
File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
147-
if (mapredSite.exists()) {
148-
possiableConfFiles.add(mapredSite);
149-
}
150-
if (possiableConfFiles.isEmpty()) {
151-
return null;
152-
} else {
153-
Configuration hadoopConfiguration = new Configuration();
154-
for (File confFile : possiableConfFiles) {
155-
hadoopConfiguration.addResource(new Path(confFile.getAbsolutePath()));
156-
}
157-
return hadoopConfiguration;
158-
}
159-
}
160-
return null;
161-
}
162112
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public HoodieHiveCatalog(String catalogName, String defaultDatabase, HiveConf hi
127127
"Embedded metastore is not allowed. Make sure you have set a valid value for "
128128
+ HiveConf.ConfVars.METASTOREURIS.toString());
129129
}
130-
LOG.info("Created HiveCatalog '{}'", catalogName);
130+
LOG.info("Created Hoodie Catalog '{}' in hms mode", catalogName);
131131
}
132132

133133
@Override
@@ -375,7 +375,7 @@ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
375375
boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, hiveConf).listStatus(hoodieTablePath))
376376
.map(fileStatus -> fileStatus.getPath().getName())
377377
.filter(f -> !f.equals(".hoodie") && !f.equals("default"))
378-
.anyMatch(FilePathUtils::hiveStylePartitionMath);
378+
.anyMatch(FilePathUtils::isHiveStylePartitioning);
379379
parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), String.valueOf(hiveStyle));
380380
}
381381
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
@@ -404,7 +404,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
404404
}
405405
schema = builder.build();
406406
} else {
407-
LOG.warn("{} does not have any hoodie schema, and use hive table to covert the catalogBaseTable", tablePath);
407+
LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath);
408408
schema = TableOptionProperties.convertTableSchema(hiveTable);
409409
}
410410
return CatalogTable.of(schema, parameters.get(COMMENT),

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalogFactory.java

Lines changed: 0 additions & 72 deletions
This file was deleted.

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public static Map<String, String> getTableOptions(Map<String, String> options) {
176176

177177
public static Map<String, String> translateFlinkTableProperties2Spark(CatalogTable catalogTable, Configuration hadoopConf) {
178178
Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
179-
MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquetMessageType(schema, hadoopConf);
179+
MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
180180
String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
181181
return SparkDataSourceTableUtils.getSparkTableProperties(catalogTable.getPartitionKeys(), sparkVersion, 4000, messageType);
182182
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ public static String[] extractHivePartitionFields(org.apache.flink.configuration
443443
return conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS).split(",");
444444
}
445445

446-
public static boolean hiveStylePartitionMath(String path) {
446+
public static boolean isHiveStylePartitioning(String path) {
447447
return HIVE_PARTITION_NAME_PATTERN.matcher(path).matches();
448448
}
449449
}

hudi-flink-datasource/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,4 @@
1515
# limitations under the License.
1616

1717
org.apache.hudi.table.HoodieTableFactory
18-
org.apache.hudi.table.catalog.HoodieCatalogFactory
19-
org.apache.hudi.table.catalog.HoodieHiveCatalogFactory
18+
org.apache.hudi.table.catalog.HoodieCatalogFactory

0 commit comments

Comments
 (0)