Skip to content

Commit 0deddbf

Browse files
danny0405XuQianJin-Stars
authored andcommitted
[HUDI-4531] Wrong partition path for flink hive catalog when the partition fields are not in the last (apache#6292)
(cherry picked from commit e22b6c1)
1 parent d0636da commit 0deddbf

5 files changed

Lines changed: 83 additions & 36 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.hudi.table.catalog;
2020

2121
import org.apache.hudi.common.util.StringUtils;
22+
import org.apache.hudi.common.util.collection.Pair;
2223
import org.apache.hudi.configuration.FlinkOptions;
2324

2425
import org.apache.flink.table.api.DataTypes;
@@ -40,6 +41,7 @@
4041

4142
import java.util.ArrayList;
4243
import java.util.List;
44+
import java.util.stream.Collectors;
4345

4446
import static org.apache.flink.util.Preconditions.checkNotNull;
4547

@@ -49,11 +51,7 @@
4951
public class HiveSchemaUtils {
5052
/** Get field names from field schemas. */
5153
public static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
52-
List<String> names = new ArrayList<>(fieldSchemas.size());
53-
for (FieldSchema fs : fieldSchemas) {
54-
names.add(fs.getName());
55-
}
56-
return names;
54+
return fieldSchemas.stream().map(FieldSchema::getName).collect(Collectors.toList());
5755
}
5856

5957
public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
@@ -204,4 +202,27 @@ public static TypeInfo toHiveTypeInfo(DataType dataType) {
204202
LogicalType logicalType = dataType.getLogicalType();
205203
return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
206204
}
205+
206+
/**
207+
* Split the field schemas by given partition keys.
208+
*
209+
* @param fieldSchemas The Hive field schemas.
210+
* @param partitionKeys The partition keys.
211+
*
212+
* @return The pair of (regular columns, partition columns) schema fields
213+
*/
214+
public static Pair<List<FieldSchema>, List<FieldSchema>> splitSchemaByPartitionKeys(
215+
List<FieldSchema> fieldSchemas,
216+
List<String> partitionKeys) {
217+
List<FieldSchema> regularColumns = new ArrayList<>();
218+
List<FieldSchema> partitionColumns = new ArrayList<>();
219+
for (FieldSchema fieldSchema : fieldSchemas) {
220+
if (partitionKeys.contains(fieldSchema.getName())) {
221+
partitionColumns.add(fieldSchema);
222+
} else {
223+
regularColumns.add(fieldSchema);
224+
}
225+
}
226+
return Pair.of(regularColumns, partitionColumns);
227+
}
207228
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
package org.apache.hudi.table.catalog;
2020

21+
import org.apache.hudi.configuration.FlinkOptions;
2122
import org.apache.hudi.configuration.HadoopConfigurations;
2223

24+
import org.apache.flink.table.catalog.CatalogTable;
2325
import org.apache.flink.table.catalog.exceptions.CatalogException;
2426
import org.apache.hadoop.conf.Configuration;
2527
import org.apache.hadoop.fs.Path;
@@ -33,6 +35,10 @@
3335
import java.io.IOException;
3436
import java.io.InputStream;
3537
import java.net.URL;
38+
import java.util.Arrays;
39+
import java.util.Collections;
40+
import java.util.List;
41+
import java.util.stream.Collectors;
3642

3743
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
3844
import static org.apache.hudi.table.catalog.CatalogOptions.HIVE_SITE_FILE;
@@ -93,4 +99,18 @@ public static HiveConf createHiveConf(@Nullable String hiveConfDir) {
9399
public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
94100
return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
95101
}
102+
103+
/**
104+
* Returns the partition key list with given table.
105+
*/
106+
public static List<String> getPartitionKeys(CatalogTable table) {
107+
// the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD
108+
if (table.isPartitioned()) {
109+
return table.getPartitionKeys();
110+
} else if (table.getOptions().containsKey(FlinkOptions.PARTITION_PATH_FIELD.key())) {
111+
return Arrays.stream(table.getOptions().get(FlinkOptions.PARTITION_PATH_FIELD.key()).split(","))
112+
.collect(Collectors.toList());
113+
}
114+
return Collections.emptyList();
115+
}
96116
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hudi.common.model.HoodieFileFormat;
2323
import org.apache.hudi.common.table.HoodieTableMetaClient;
2424
import org.apache.hudi.common.util.StringUtils;
25+
import org.apache.hudi.common.util.collection.Pair;
2526
import org.apache.hudi.configuration.FlinkOptions;
2627
import org.apache.hudi.configuration.OptionsResolver;
2728
import org.apache.hudi.exception.HoodieCatalogException;
@@ -86,7 +87,6 @@
8687
import org.slf4j.LoggerFactory;
8788

8889
import java.io.IOException;
89-
import java.util.ArrayList;
9090
import java.util.Arrays;
9191
import java.util.Collections;
9292
import java.util.HashMap;
@@ -539,25 +539,19 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
539539
List<FieldSchema> allColumns = HiveSchemaUtils.createHiveColumns(table.getSchema());
540540

541541
// Table columns and partition keys
542-
if (table instanceof CatalogTable) {
543-
CatalogTable catalogTable = (CatalogTable) table;
544-
545-
if (catalogTable.isPartitioned()) {
546-
int partitionKeySize = catalogTable.getPartitionKeys().size();
547-
List<FieldSchema> regularColumns =
548-
allColumns.subList(0, allColumns.size() - partitionKeySize);
549-
List<FieldSchema> partitionColumns =
550-
allColumns.subList(
551-
allColumns.size() - partitionKeySize, allColumns.size());
552-
553-
sd.setCols(regularColumns);
554-
hiveTable.setPartitionKeys(partitionColumns);
555-
} else {
556-
sd.setCols(allColumns);
557-
hiveTable.setPartitionKeys(new ArrayList<>());
558-
}
542+
CatalogTable catalogTable = (CatalogTable) table;
543+
544+
final List<String> partitionKeys = HoodieCatalogUtil.getPartitionKeys(catalogTable);
545+
if (partitionKeys.size() > 0) {
546+
Pair<List<FieldSchema>, List<FieldSchema>> splitSchemas = HiveSchemaUtils.splitSchemaByPartitionKeys(allColumns, partitionKeys);
547+
List<FieldSchema> regularColumns = splitSchemas.getLeft();
548+
List<FieldSchema> partitionColumns = splitSchemas.getRight();
549+
550+
sd.setCols(regularColumns);
551+
hiveTable.setPartitionKeys(partitionColumns);
559552
} else {
560553
sd.setCols(allColumns);
554+
hiveTable.setPartitionKeys(Collections.emptyList());
561555
}
562556

563557
HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
@@ -572,7 +566,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
572566
serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat));
573567
serdeProperties.put("serialization.format", "1");
574568

575-
serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark((CatalogTable)table, hiveConf, properties));
569+
serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys));
576570

577571
sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
578572

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,16 @@ public static Map<String, String> getTableOptions(Map<String, String> options) {
164164
return copied;
165165
}
166166

167-
public static Map<String, String> translateFlinkTableProperties2Spark(CatalogTable catalogTable, Configuration hadoopConf, Map<String, String> properties) {
167+
public static Map<String, String> translateFlinkTableProperties2Spark(
168+
CatalogTable catalogTable,
169+
Configuration hadoopConf,
170+
Map<String, String> properties,
171+
List<String> partitionKeys) {
168172
Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
169173
MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
170174
String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
171175
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
172-
catalogTable.getPartitionKeys(),
176+
partitionKeys,
173177
sparkVersion,
174178
4000,
175179
messageType);

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ public class TestHoodieHiveCatalog {
6666
.field("uuid", DataTypes.INT().notNull())
6767
.field("name", DataTypes.STRING())
6868
.field("age", DataTypes.INT())
69-
.field("ts", DataTypes.BIGINT())
7069
.field("par1", DataTypes.STRING())
70+
.field("ts", DataTypes.BIGINT())
7171
.primaryKey("uuid")
7272
.build();
7373
List<String> partitions = Collections.singletonList("par1");
@@ -95,21 +95,29 @@ public static void closeCatalog() {
9595
@ParameterizedTest
9696
@EnumSource(value = HoodieTableType.class)
9797
public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Exception {
98-
Map<String, String> originOptions = new HashMap<>();
99-
originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
100-
originOptions.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString());
98+
Map<String, String> options = new HashMap<>();
99+
options.put(FactoryUtil.CONNECTOR.key(), "hudi");
100+
options.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString());
101101

102102
CatalogTable table =
103-
new CatalogTableImpl(schema, partitions, originOptions, "hudi table");
103+
new CatalogTableImpl(schema, partitions, options, "hudi table");
104104
hoodieCatalog.createTable(tablePath, table, false);
105105

106106
CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
107-
assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi");
108-
assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), tableType.toString());
109-
assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), "uuid");
107+
assertEquals("hudi", table1.getOptions().get(CONNECTOR.key()));
108+
assertEquals(tableType.toString(), table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()));
109+
assertEquals("uuid", table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
110110
assertNull(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "preCombine key is not declared");
111-
assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(), Collections.singletonList("uuid"));
112-
assertEquals(((CatalogTable)table1).getPartitionKeys(), Collections.singletonList("par1"));
111+
assertEquals(Collections.singletonList("uuid"), table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
112+
assertEquals(Collections.singletonList("par1"), ((CatalogTable)table1).getPartitionKeys());
113+
114+
// test explicit primary key
115+
options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "id");
116+
table = new CatalogTableImpl(schema, partitions, options, "hudi table");
117+
hoodieCatalog.alterTable(tablePath, table, true);
118+
119+
CatalogBaseTable table2 = hoodieCatalog.getTable(tablePath);
120+
assertEquals("id", table2.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
113121
}
114122

115123
@ParameterizedTest

0 commit comments

Comments
 (0)