Skip to content

Commit 22a77d1

Browse files
committed
[HUDI-4098] Support HMS for flink HoodieCatalog
1 parent 4ad08be commit 22a77d1

File tree

3 files changed

+37
-22
lines changed

3 files changed

+37
-22
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -499,13 +499,13 @@ private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
499499

500500
private Map<String, String> applyOptionsHook(Map<String, String> options) {
501501
Map<String, String> properties = new HashMap<>(options);
502-
if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD)) {
502+
if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
503503
properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
504504
}
505-
if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD)) {
505+
if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD.key())) {
506506
properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue());
507507
}
508-
if (!options.containsKey(FlinkOptions.TABLE_TYPE)) {
508+
if (!options.containsKey(FlinkOptions.TABLE_TYPE.key())) {
509509
properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue());
510510
}
511511
return properties;

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogUtils.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818

1919
package org.apache.hudi.table.catalog;
2020

21+
import org.apache.flink.table.catalog.exceptions.CatalogException;
2122
import org.apache.hadoop.hive.conf.HiveConf;
22-
import org.junit.jupiter.api.io.TempDir;
2323

24-
import java.io.File;
25-
26-
import static org.apache.hudi.table.catalog.HoodieCatalogFactoryOptions.HIVE_SITE_FILE;
24+
import java.io.IOException;
2725

2826
/** Test utils for Hoodie catalog. */
2927
public class TestHoodieCatalogUtils {
@@ -32,8 +30,7 @@ public class TestHoodieCatalogUtils {
3230

3331
private static final String TEST_CATALOG_NAME = "test-catalog";
3432

35-
@TempDir
36-
static File tempFile;
33+
private static final org.junit.rules.TemporaryFolder TEMPORARY_FOLDER = new org.junit.rules.TemporaryFolder();
3734

3835
/** Create a HiveCatalog with an embedded Hive Metastore. */
3936
public static HoodieHiveCatalog createHiveCatalog() {
@@ -50,18 +47,20 @@ public static HoodieHiveCatalog createHiveCatalog(String name) {
5047

5148
public static HiveConf createHiveConf() {
5249
ClassLoader classLoader = TestHoodieCatalogUtils.class.getClassLoader();
50+
try {
51+
TEMPORARY_FOLDER.create();
52+
String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
53+
String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);
5354

54-
new File(tempFile,"db").mkdir();
55-
File metastore_db = new File(tempFile,"/db/metastore_db");
56-
String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, metastore_db.getAbsolutePath());
57-
58-
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_FILE));
59-
60-
File hive_warehouse = new File(tempFile,"/hive_warehouse");
61-
hive_warehouse.mkdir() ;
62-
HiveConf hiveConf = new HiveConf();
63-
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, hive_warehouse.getAbsolutePath());
64-
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);
65-
return hiveConf;
55+
HiveConf.setHiveSiteLocation(classLoader.getResource(HoodieCatalogFactoryOptions.HIVE_SITE_FILE));
56+
HiveConf hiveConf = new HiveConf();
57+
hiveConf.setVar(
58+
HiveConf.ConfVars.METASTOREWAREHOUSE,
59+
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
60+
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);
61+
return hiveConf;
62+
} catch (IOException e) {
63+
throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
64+
}
6665
}
6766
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package org.apache.hudi.table.catalog;
2020

21+
import org.apache.hudi.configuration.FlinkOptions;
22+
import org.apache.hudi.exception.HoodieCatalogException;
23+
2124
import org.apache.flink.table.api.DataTypes;
2225
import org.apache.flink.table.api.TableSchema;
2326
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -29,13 +32,13 @@
2932
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
3033
import org.apache.flink.table.factories.FactoryUtil;
3134
import org.apache.hadoop.hive.metastore.api.Table;
32-
import org.apache.hudi.exception.HoodieCatalogException;
3335
import org.junit.jupiter.api.AfterAll;
3436
import org.junit.jupiter.api.AfterEach;
3537
import org.junit.jupiter.api.BeforeAll;
3638
import org.junit.jupiter.api.Test;
3739

3840
import java.util.Collections;
41+
import java.util.HashMap;
3942
import java.util.Map;
4043

4144
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
@@ -71,6 +74,19 @@ public static void closeCatalog() {
7174
}
7275
}
7376

77+
@Test
78+
public void testCreateAndGetHoodieMORTable() throws Exception {
79+
Map<String, String> originOptions = new HashMap<>();
80+
originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
81+
originOptions.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
82+
CatalogTable table =
83+
new CatalogTableImpl(schema, originOptions, "hudi table");
84+
hoodieCatalog.createTable(tablePath, table, false);
85+
CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
86+
assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi");
87+
assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
88+
}
89+
7490
@Test
7591
public void testCreateAndGetHoodieTable() throws Exception {
7692
Map<String, String> originOptions =

0 commit comments

Comments
 (0)