From aab4622b75c7c5adb5a9d225dacc73b49ac6336f Mon Sep 17 00:00:00 2001 From: waywtdcc Date: Tue, 25 Oct 2022 14:29:46 +0800 Subject: [PATCH 1/3] 1.hive sync error --- .../hudi/table/catalog/HiveSchemaUtils.java | 16 ++++++++++++++-- .../hudi/table/catalog/HoodieHiveCatalog.java | 4 ++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index a057c02f2cca3..8559c1e7401bf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -24,8 +24,10 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -42,6 +44,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -177,10 +180,19 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) { /** * Create Hive field schemas from Flink table schema including the hoodie metadata fields. + * + * @param table */ - public static List toHiveFieldSchema(TableSchema schema) { + public static List toHiveFieldSchema(CatalogBaseTable table) { + TableSchema schema = table.getSchema(); + Configuration configuration = Configuration.fromMap(table.getOptions()); + Boolean changelogEnable = configuration.getBoolean(FlinkOptions.CHANGELOG_ENABLED); + Collection hoodieMetaColumns = HoodieRecord.HOODIE_META_COLUMNS; + if (changelogEnable) { + hoodieMetaColumns = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION; + } List columns = new ArrayList<>(); - for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) { + for (String metaField : hoodieMetaColumns) { columns.add(new FieldSchema(metaField, "string", null)); } columns.addAll(createHiveColumns(schema)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index c0cd386793b77..bbfbeaefbee97 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -546,7 +546,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, // because since Hive 3.x, there is validation when altering table, // when the metadata fields are synced through the hive sync tool, // a compatability issue would be reported. - List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema()); + List allColumns = HiveSchemaUtils.toHiveFieldSchema(table); // Table columns and partition keys CatalogTable catalogTable = (CatalogTable) table; @@ -799,7 +799,7 @@ public void dropPartition( try (HoodieFlinkWriteClient writeClient = createWriteClient(tablePath, table)) { boolean hiveStylePartitioning = Boolean.parseBoolean(table.getOptions().get(FlinkOptions.HIVE_STYLE_PARTITIONING.key())); writeClient.deletePartitions( - Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)), + Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)), HoodieActiveTimeline.createNewInstantTime()) .forEach(writeStatus -> { if (writeStatus.hasErrors()) { From 04cc5970603743e3c0eb7d25767195dc2f267152 Mon Sep 17 00:00:00 2001 From: waywtdcc Date: Mon, 31 Oct 2022 14:32:56 +0800 Subject: [PATCH 2/3] hive sync error patch --- .../hudi/table/catalog/HiveSchemaUtils.java | 18 +++++------------- .../hudi/table/catalog/HoodieHiveCatalog.java | 3 ++- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index 8559c1e7401bf..4383b42e9f8d9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -24,10 +24,8 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -180,19 +178,13 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) { /** * Create Hive field schemas from Flink table schema including the hoodie metadata fields. - * - * @param table */ - public static List toHiveFieldSchema(CatalogBaseTable table) { - TableSchema schema = table.getSchema(); - Configuration configuration = Configuration.fromMap(table.getOptions()); - Boolean changelogEnable = configuration.getBoolean(FlinkOptions.CHANGELOG_ENABLED); - Collection hoodieMetaColumns = HoodieRecord.HOODIE_META_COLUMNS; - if (changelogEnable) { - hoodieMetaColumns = HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION; - } + public static List toHiveFieldSchema(TableSchema schema, boolean withOperationField) { List columns = new ArrayList<>(); - for (String metaField : hoodieMetaColumns) { + Collection metaFields = withOperationField + ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence + : HoodieRecord.HOODIE_META_COLUMNS; + for (String metaField : metaFields) { columns.add(new FieldSchema(metaField, "string", null)); } columns.addAll(createHiveColumns(schema)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index bbfbeaefbee97..a73db42b955bb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -546,7 +546,8 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, // because since Hive 3.x, there is validation when altering table, // when the metadata fields are synced through the hive sync tool, // a compatability issue would be reported. - List allColumns = HiveSchemaUtils.toHiveFieldSchema(table); + boolean withOperationField = Configuration.fromMap(table.getOptions()).getBoolean(FlinkOptions.CHANGELOG_ENABLED); + List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField); // Table columns and partition keys CatalogTable catalogTable = (CatalogTable) table; From 7f52a3126d7112279ba3f5643b8e525617c8fbe7 Mon Sep 17 00:00:00 2001 From: waywtdcc Date: Mon, 31 Oct 2022 15:35:48 +0800 Subject: [PATCH 3/3] hive sync error patch2 --- .../java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index a73db42b955bb..00846c8143e04 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -546,7 +546,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, // because since Hive 3.x, there is validation when altering table, // when the metadata fields are synced through the hive sync tool, // a compatability issue would be reported. - boolean withOperationField = Configuration.fromMap(table.getOptions()).getBoolean(FlinkOptions.CHANGELOG_ENABLED); + boolean withOperationField = Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(), "false")); List allColumns = HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField); // Table columns and partition keys @@ -800,7 +800,7 @@ public void dropPartition( try (HoodieFlinkWriteClient writeClient = createWriteClient(tablePath, table)) { boolean hiveStylePartitioning = Boolean.parseBoolean(table.getOptions().get(FlinkOptions.HIVE_STYLE_PARTITIONING.key())); writeClient.deletePartitions( - Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)), + Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)), HoodieActiveTimeline.createNewInstantTime()) .forEach(writeStatus -> { if (writeStatus.hasErrors()) {