Skip to content

Commit c5a973a

Browse files
danny0405Alexey Kudinkin
authored andcommitted
[HUDI-5046] Support all the hive sync options for flink sql (apache#6985)
1 parent 1aafffb commit c5a973a

3 files changed

Lines changed: 18 additions & 9 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,9 +703,10 @@ private FlinkOptions() {
703703
// ------------------------------------------------------------------------
704704

705705
public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
706-
.key("hive_sync.enable")
706+
.key("hive_sync.enabled")
707707
.booleanType()
708708
.defaultValue(false)
709+
.withFallbackKeys("hive_sync.enable")
709710
.withDescription("Asynchronously sync Hive meta to HMS, default false");
710711

711712
public static final ConfigOption<String> HIVE_SYNC_DB = ConfigOptions

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hudi.hive.HiveSyncTool;
2727
import org.apache.hudi.hive.ddl.HiveSyncMode;
2828
import org.apache.hudi.table.format.FilePathUtils;
29+
import org.apache.hudi.util.StreamerUtil;
2930

3031
import org.apache.flink.annotation.VisibleForTesting;
3132
import org.apache.flink.configuration.Configuration;
@@ -93,7 +94,7 @@ public static HiveSyncContext create(Configuration conf, SerializableConfigurati
9394

9495
@VisibleForTesting
9596
public static Properties buildSyncConfig(Configuration conf) {
96-
TypedProperties props = new TypedProperties();
97+
TypedProperties props = StreamerUtil.flinkConf2TypedProperties(conf);
9798
props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), conf.getString(FlinkOptions.PATH));
9899
props.setPropertyIfNonNull(META_SYNC_BASE_FILE_FORMAT.key(), conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT));
99100
props.setPropertyIfNonNull(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,26 @@
1919
package org.apache.hudi.sink.utils;
2020

2121
import org.apache.hudi.configuration.FlinkOptions;
22+
import org.apache.hudi.hive.HiveSyncConfig;
2223

2324
import org.apache.flink.configuration.Configuration;
2425
import org.junit.jupiter.api.Test;
2526

26-
import java.lang.reflect.Method;
2727
import java.util.Properties;
2828

2929
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
3030
import static org.junit.jupiter.api.Assertions.assertEquals;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
3132

3233
/**
3334
* Test cases for {@link HiveSyncContext}.
3435
*/
3536
public class TestHiveSyncContext {
3637
/**
37-
* Test that the file ids generated by the task can finally shuffled to itself.
38+
* Test partition path fields sync.
3839
*/
3940
@Test
40-
public void testBuildSyncConfig() throws Exception {
41+
void testSyncedPartitions() {
4142
Configuration configuration1 = new Configuration();
4243
Configuration configuration2 = new Configuration();
4344
String hiveSyncPartitionField = "hiveSyncPartitionField";
@@ -48,15 +49,21 @@ public void testBuildSyncConfig() throws Exception {
4849

4950
configuration2.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPathField);
5051

51-
Class<?> threadClazz = Class.forName("org.apache.hudi.sink.utils.HiveSyncContext");
52-
Method buildSyncConfigMethod = threadClazz.getDeclaredMethod("buildSyncConfig", Configuration.class);
53-
buildSyncConfigMethod.setAccessible(true);
54-
5552
Properties props1 = HiveSyncContext.buildSyncConfig(configuration1);
5653
Properties props2 = HiveSyncContext.buildSyncConfig(configuration2);
5754

5855
assertEquals(hiveSyncPartitionField, props1.getProperty(META_SYNC_PARTITION_FIELDS.key()));
5956
assertEquals(partitionPathField, props2.getProperty(META_SYNC_PARTITION_FIELDS.key()));
57+
}
6058

59+
/**
60+
* Test an option that has no shortcut key.
61+
*/
62+
@Test
63+
void testOptionWithoutShortcutKey() {
64+
Configuration configuration3 = new Configuration();
65+
configuration3.setBoolean(HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key(), true);
66+
Properties props3 = HiveSyncContext.buildSyncConfig(configuration3);
67+
assertTrue(Boolean.parseBoolean(props3.getProperty(HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key(), "false")));
6168
}
6269
}

0 commit comments

Comments
 (0)