Skip to content

Commit 802cd0d

Browse files
committed
Support hoodie catalog
1 parent 489d71f commit 802cd0d

7 files changed

Lines changed: 449 additions & 27 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.hudi.util.StreamerUtil;
3333

3434
import org.apache.avro.Schema;
35+
36+
import org.apache.flink.annotation.VisibleForTesting;
3537
import org.apache.flink.configuration.Configuration;
3638
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
3739
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
@@ -47,8 +49,6 @@
4749
import org.apache.flink.table.catalog.CatalogTable;
4850
import org.apache.flink.table.catalog.CatalogView;
4951
import org.apache.flink.table.catalog.ObjectPath;
50-
import org.apache.flink.table.catalog.ResolvedCatalogTable;
51-
import org.apache.flink.table.catalog.ResolvedSchema;
5252
import org.apache.flink.table.catalog.exceptions.CatalogException;
5353
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
5454
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -352,7 +352,8 @@ private org.apache.avro.Schema getLatestTableSchema(String path) {
352352
return null;
353353
}
354354

355-
private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
355+
@VisibleForTesting
356+
public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
356357
try {
357358
Table hiveTable = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
358359
return checkHoodieTable(hiveTable);
@@ -426,7 +427,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
426427
}
427428

428429
if (table instanceof CatalogView) {
429-
throw new HoodieCatalogException("view not supported.");
430+
throw new HoodieCatalogException("Hoodie catalog does not support to CREATE VIEW.");
430431
}
431432

432433
try {
@@ -436,7 +437,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
436437
//create hive table
437438
client.createTable(hiveTable);
438439
//init hoodie metaClient
439-
initTableIfNotExists(tablePath, (ResolvedCatalogTable) table);
440+
initTableIfNotExists(tablePath, (CatalogTable)table);
440441
} catch (AlreadyExistsException e) {
441442
if (!ignoreIfExists) {
442443
throw new TableAlreadyExistException(getName(), tablePath, e);
@@ -447,12 +448,10 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
447448
}
448449
}
449450

450-
private void initTableIfNotExists(ObjectPath tablePath, ResolvedCatalogTable catalogTable) {
451-
Map<String, String> options = catalogTable.getOptions();
452-
Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
451+
private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
452+
Configuration flinkConf = Configuration.fromMap(applyOptionsHook(catalogTable.getOptions()));
453453
flinkConf.addAllToProperties(hiveConf.getAllProperties());
454-
ResolvedSchema resolvedSchema = catalogTable.getResolvedSchema();
455-
final String avroSchema = AvroSchemaConverter.convertToSchema(resolvedSchema.toPhysicalRowDataType().getLogicalType()).toString();
454+
final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()).toString();
456455
flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
457456

458457
// stores two copies of options:
@@ -461,16 +460,23 @@ private void initTableIfNotExists(ObjectPath tablePath, ResolvedCatalogTable cat
461460
// because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it
462461
// when calling #getTable.
463462

464-
final String pkColumns = String.join(",", resolvedSchema.getPrimaryKey().get().getColumns());
465-
flinkConf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
466-
options.put(TableOptionProperties.PK_CONSTRAINT_NAME, resolvedSchema.getPrimaryKey().get().getName());
467-
options.put(TableOptionProperties.PK_COLUMNS, pkColumns);
463+
if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) {
464+
final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
465+
String recordKey = flinkConf.get(FlinkOptions.RECORD_KEY_FIELD);
466+
if (!Objects.equals(pkColumns, recordKey)) {
467+
throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey));
468+
}
469+
}
468470

469471
if (catalogTable.isPartitioned()) {
470472
final String partitions = String.join(",", catalogTable.getPartitionKeys());
471473
flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
472-
options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
473474
}
475+
476+
if (!flinkConf.getOptional(PATH).isPresent()) {
477+
flinkConf.setString(PATH, inferTablePath(tablePath, catalogTable));
478+
}
479+
474480
flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
475481
try {
476482
StreamerUtil.initTableIfNotExists(flinkConf);
@@ -485,14 +491,27 @@ private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
485491
try {
486492
Path dbLocation = new Path(client.getDatabase(tablePath.getDatabaseName()).getLocationUri());
487493
location = new Path(dbLocation, tablePath.getObjectName()).toString();
488-
table.getOptions().put(PATH.key(), location);
489494
} catch (TException e) {
490495
throw new HoodieCatalogException(String.format("Failed to infer hoodie table path for table %s", tablePath), e);
491496
}
492497
}
493498
return location;
494499
}
495500

501+
private Map<String, String> applyOptionsHook(Map<String, String> options) {
502+
Map<String, String> properties = new HashMap<>(options);
503+
if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD)) {
504+
properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
505+
}
506+
if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD)) {
507+
properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue());
508+
}
509+
if (!options.containsKey(FlinkOptions.TABLE_TYPE)) {
510+
properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue());
511+
}
512+
return properties;
513+
}
514+
496515
private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException {
497516
// let Hive set default parameters for us, e.g. serialization.format
498517
Table hiveTable =
@@ -501,7 +520,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
501520
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
502521
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
503522

504-
Map<String, String> properties = new HashMap<>(table.getOptions());
523+
Map<String, String> properties = applyOptionsHook(table.getOptions());
505524
properties.put("EXTERNAL", "TRUE");
506525
// Table comment
507526
if (table.getComment() != null) {
@@ -551,20 +570,23 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
551570

552571
hiveTable.setSd(sd);
553572

554-
properties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark((ResolvedCatalogTable) table, hiveConf));
573+
properties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark((CatalogTable)table, hiveConf));
555574

556575
//set pk
557-
ResolvedSchema resolvedSchema = ((ResolvedCatalogTable) table).getResolvedSchema();
558-
if (resolvedSchema.getPrimaryKey().isPresent()) {
559-
String pkColumns = String.join(",", resolvedSchema.getPrimaryKey().get().getColumns());
576+
if (table.getUnresolvedSchema().getPrimaryKey().isPresent()) {
577+
String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
560578
String recordKey = properties.getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
561579
if (!Objects.equals(pkColumns, recordKey)) {
562580
throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey));
563581
}
564-
properties.put(PK_CONSTRAINT_NAME, resolvedSchema.getPrimaryKey().get().getName());
582+
properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName());
565583
properties.put(PK_COLUMNS, pkColumns);
566584
}
567585

586+
if (!properties.containsKey(FlinkOptions.PATH.key())) {
587+
properties.put(FlinkOptions.PATH.key(), location);
588+
}
589+
568590
hiveTable.setParameters(properties);
569591
return hiveTable;
570592
}
@@ -588,7 +610,7 @@ public List<String> listTables(String databaseName)
588610
@Override
589611
public List<String> listViews(String databaseName)
590612
throws DatabaseNotExistException, CatalogException {
591-
throw new HoodieCatalogException("view not supported.");
613+
throw new HoodieCatalogException("Hoodie catalog does not support to listViews");
592614
}
593615

594616
@Override
@@ -684,7 +706,7 @@ public void alterTable(
684706
throw new HoodieCatalogException(String.format("The %s is not hoodie table", tablePath.getObjectName()));
685707
}
686708
if (newCatalogTable instanceof CatalogView) {
687-
throw new HoodieCatalogException("view not supported.");
709+
throw new HoodieCatalogException("Hoodie catalog does not support to ALTER VIEW");
688710
}
689711

690712
try {

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.avro.Schema;
3131
import org.apache.flink.table.api.DataTypes;
3232
import org.apache.flink.table.api.TableSchema;
33+
import org.apache.flink.table.catalog.CatalogBaseTable;
34+
import org.apache.flink.table.catalog.CatalogTable;
3335
import org.apache.flink.table.catalog.ResolvedCatalogTable;
3436
import org.apache.flink.table.catalog.ResolvedSchema;
3537
import org.apache.flink.table.types.DataType;
@@ -175,9 +177,8 @@ public static Map<String, String> getTableOptions(Map<String, String> options) {
175177
return copied;
176178
}
177179

178-
public static Map<String, String> translateFlinkTableProperties2Spark(ResolvedCatalogTable catalogTable, Configuration hadoopConf) {
179-
ResolvedSchema resolvedSchema = catalogTable.getResolvedSchema();
180-
Schema schema = AvroSchemaConverter.convertToSchema(resolvedSchema.toPhysicalRowDataType().getLogicalType());
180+
public static Map<String, String> translateFlinkTableProperties2Spark(CatalogTable catalogTable, Configuration hadoopConf) {
181+
Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
181182
MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquetMessageType(schema, hadoopConf);
182183
String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
183184
return Parquet2SparkSchemaUtils.getSparkTableProperties(catalogTable.getPartitionKeys(), sparkVersion, 4000, messageType);
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.table.catalog;
20+
21+
import org.apache.flink.table.catalog.exceptions.CatalogException;
22+
import org.apache.hadoop.hive.conf.HiveConf;
23+
import org.junit.rules.TemporaryFolder;
24+
25+
import java.io.IOException;
26+
27+
import static org.apache.hudi.table.catalog.HoodieCatalogFactoryOptions.HIVE_SITE_FILE;
28+
29+
/** Test utils for Hoodie catalog. */
30+
public class HoodieCatalogTestUtils {
31+
private static final String HIVE_WAREHOUSE_URI_FORMAT =
32+
"jdbc:derby:;databaseName=%s;create=true";
33+
private static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
34+
private static final String TEST_CATALOG_NAME = "test-catalog";
35+
36+
private static final byte[] SEPARATORS =
37+
new byte[] {
38+
(byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5, (byte) 6, (byte) 7, (byte) 8
39+
};
40+
41+
/** Create a HiveCatalog with an embedded Hive Metastore. */
42+
public static HoodieHiveCatalog createHiveCatalog() {
43+
return createHiveCatalog(TEST_CATALOG_NAME);
44+
}
45+
46+
public static HoodieHiveCatalog createHiveCatalog(String name) {
47+
return new HoodieHiveCatalog(
48+
name,
49+
null,
50+
createHiveConf(),
51+
true);
52+
}
53+
54+
public static HiveConf createHiveConf() {
55+
ClassLoader classLoader = HoodieCatalogTestUtils.class.getClassLoader();
56+
57+
try {
58+
TEMPORARY_FOLDER.create();
59+
String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
60+
String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);
61+
62+
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_FILE));
63+
HiveConf hiveConf = new HiveConf();
64+
hiveConf.setVar(
65+
HiveConf.ConfVars.METASTOREWAREHOUSE,
66+
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
67+
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);
68+
return hiveConf;
69+
} catch (IOException e) {
70+
throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
71+
}
72+
}
73+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.table.catalog;
20+
21+
import org.apache.flink.table.catalog.Catalog;
22+
import org.apache.flink.table.catalog.CommonCatalogOptions;
23+
import org.apache.flink.table.factories.FactoryUtil;
24+
import org.apache.flink.testutils.executor.TestExecutorResource;
25+
26+
import org.apache.hadoop.hive.conf.HiveConf;
27+
import org.junit.ClassRule;
28+
import org.junit.Rule;
29+
import org.junit.Test;
30+
import org.junit.rules.ExpectedException;
31+
import org.junit.rules.TemporaryFolder;
32+
33+
import java.net.URL;
34+
import java.util.HashMap;
35+
import java.util.Map;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.Executors;
38+
39+
import static org.junit.jupiter.api.Assertions.assertEquals;
40+
41+
/**
42+
* Test cases for {@link HoodieHiveCatalogFactory}.
43+
*/
44+
public class HoodieHiveCatalogFactoryTest {
45+
private static final URL CONF_DIR =
46+
Thread.currentThread().getContextClassLoader().getResource("test-catalog-factory-conf");
47+
48+
@ClassRule
49+
public static final TestExecutorResource<ExecutorService> EXECUTOR_RESOURCE =
50+
new TestExecutorResource<>(() -> Executors.newFixedThreadPool(2));
51+
52+
@Rule
53+
public final TemporaryFolder tempFolder = new TemporaryFolder();
54+
55+
@Rule
56+
public ExpectedException expectedException = ExpectedException.none();
57+
58+
@Test
59+
public void testCreateHiveCatalog() {
60+
final String catalogName = "mycatalog";
61+
62+
final HoodieHiveCatalog expectedCatalog = HoodieCatalogTestUtils.createHiveCatalog(catalogName);
63+
64+
final Map<String, String> options = new HashMap<>();
65+
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HoodieCatalogFactoryOptions.IDENTIFIER);
66+
options.put(HoodieCatalogFactoryOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath());
67+
68+
final Catalog actualCatalog =
69+
FactoryUtil.createCatalog(
70+
catalogName, options, null, Thread.currentThread().getContextClassLoader());
71+
72+
assertEquals(
73+
((HoodieHiveCatalog) actualCatalog)
74+
.getHiveConf()
75+
.getVar(HiveConf.ConfVars.METASTOREURIS), "dummy-hms");
76+
checkEquals(expectedCatalog, (HoodieHiveCatalog) actualCatalog);
77+
}
78+
79+
private static void checkEquals(HoodieHiveCatalog c1, HoodieHiveCatalog c2) {
80+
// Only assert a few selected properties for now
81+
assertEquals(c2.getName(), c1.getName());
82+
assertEquals(c2.getDefaultDatabase(), c1.getDefaultDatabase());
83+
}
84+
}

0 commit comments

Comments
 (0)