Skip to content

Commit 4d799a4

Browse files
committed
[HUDI-5049] HoodieHiveCatalog supports the implementation of dropPartition
1 parent eb8629f commit 4d799a4

4 files changed

Lines changed: 180 additions & 13 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec catalo
395395
boolean hiveStylePartitioning = Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "false"));
396396
return StreamerUtil.partitionExists(
397397
inferTablePath(catalogPathStr, tablePath),
398-
inferPartitionPath(hiveStylePartitioning, catalogPartitionSpec),
398+
HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, catalogPartitionSpec),
399399
hadoopConf);
400400
}
401401

@@ -419,7 +419,7 @@ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPart
419419
String tablePathStr = inferTablePath(catalogPathStr, tablePath);
420420
Map<String, String> options = TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
421421
boolean hiveStylePartitioning = Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "false"));
422-
String partitionPathStr = inferPartitionPath(hiveStylePartitioning, catalogPartitionSpec);
422+
String partitionPathStr = HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, catalogPartitionSpec);
423423

424424
if (!StreamerUtil.partitionExists(tablePathStr, partitionPathStr, hadoopConf)) {
425425
if (ignoreIfNotExists) {
@@ -568,13 +568,4 @@ private HoodieFlinkWriteClient<?> createWriteClient(
568568
protected String inferTablePath(String catalogPath, ObjectPath tablePath) {
569569
return String.format("%s/%s/%s", catalogPath, tablePath.getDatabaseName(), tablePath.getObjectName());
570570
}
571-
572-
private String inferPartitionPath(boolean hiveStylePartitioning, CatalogPartitionSpec catalogPartitionSpec) {
573-
return catalogPartitionSpec.getPartitionSpec().entrySet()
574-
.stream().map(entry ->
575-
hiveStylePartitioning
576-
? String.format("%s=%s", entry.getKey(), entry.getValue())
577-
: entry.getValue())
578-
.collect(Collectors.joining("/"));
579-
}
580571
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hudi.configuration.FlinkOptions;
2222
import org.apache.hudi.configuration.HadoopConfigurations;
2323

24+
import org.apache.flink.table.catalog.CatalogPartitionSpec;
2425
import org.apache.flink.table.catalog.CatalogTable;
2526
import org.apache.flink.table.catalog.exceptions.CatalogException;
2627
import org.apache.hadoop.conf.Configuration;
@@ -113,4 +114,16 @@ public static List<String> getPartitionKeys(CatalogTable table) {
113114
}
114115
return Collections.emptyList();
115116
}
117+
118+
/**
119+
* Returns the partition path with given {@link CatalogPartitionSpec}.
120+
*/
121+
public static String inferPartitionPath(boolean hiveStylePartitioning, CatalogPartitionSpec catalogPartitionSpec) {
122+
return catalogPartitionSpec.getPartitionSpec().entrySet()
123+
.stream().map(entry ->
124+
hiveStylePartitioning
125+
? String.format("%s=%s", entry.getKey(), entry.getValue())
126+
: entry.getValue())
127+
.collect(Collectors.joining("/"));
128+
}
116129
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
package org.apache.hudi.table.catalog;
2020

21+
import org.apache.hudi.client.HoodieFlinkWriteClient;
2122
import org.apache.hudi.common.fs.FSUtils;
2223
import org.apache.hudi.common.model.HoodieFileFormat;
2324
import org.apache.hudi.common.table.HoodieTableMetaClient;
25+
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
2426
import org.apache.hudi.common.util.StringUtils;
2527
import org.apache.hudi.common.util.collection.Pair;
28+
import org.apache.hudi.config.HoodieWriteConfig;
2629
import org.apache.hudi.configuration.FlinkOptions;
2730
import org.apache.hudi.configuration.OptionsResolver;
2831
import org.apache.hudi.exception.HoodieCatalogException;
32+
import org.apache.hudi.exception.HoodieMetadataException;
2933
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
3034
import org.apache.hudi.sync.common.util.ConfigUtils;
3135
import org.apache.hudi.table.format.FilePathUtils;
@@ -74,6 +78,7 @@
7478
import org.apache.hadoop.hive.metastore.api.Database;
7579
import org.apache.hadoop.hive.metastore.api.FieldSchema;
7680
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
81+
import org.apache.hadoop.hive.metastore.api.MetaException;
7782
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
7883
import org.apache.hadoop.hive.metastore.api.PrincipalType;
7984
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -87,6 +92,7 @@
8792
import org.slf4j.LoggerFactory;
8893

8994
import java.io.IOException;
95+
import java.util.ArrayList;
9096
import java.util.Arrays;
9197
import java.util.Collections;
9298
import java.util.HashMap;
@@ -488,7 +494,8 @@ private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTabl
488494
}
489495
}
490496

491-
private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
497+
@VisibleForTesting
498+
public String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
492499
String location = table.getOptions().getOrDefault(PATH.key(), "");
493500
if (StringUtils.isNullOrEmpty(location)) {
494501
try {
@@ -777,7 +784,44 @@ public void createPartition(
777784
public void dropPartition(
778785
ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
779786
throws PartitionNotExistException, CatalogException {
780-
throw new HoodieCatalogException("Not supported.");
787+
checkNotNull(tablePath, "Table path cannot be null");
788+
checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null");
789+
790+
HoodieFlinkWriteClient<?> writeClient = null;
791+
try {
792+
CatalogBaseTable table = getTable(tablePath);
793+
writeClient = createWriteClient(tablePath, table);
794+
boolean hiveStylePartitioning = Boolean.parseBoolean(table.getOptions().get(FlinkOptions.HIVE_STYLE_PARTITIONING.key()));
795+
writeClient.deletePartitions(
796+
Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, partitionSpec)),
797+
HoodieActiveTimeline.createNewInstantTime())
798+
.forEach(writeStatus -> {
799+
if (writeStatus.hasErrors()) {
800+
throw new HoodieMetadataException(String.format("Failed to commit metadata table records at file id %s.", writeStatus.getFileId()));
801+
}
802+
});
803+
804+
client.dropPartition(
805+
tablePath.getDatabaseName(),
806+
tablePath.getObjectName(),
807+
getOrderedFullPartitionValues(
808+
partitionSpec, HiveSchemaUtils.getFieldNames(getHiveTable(tablePath).getPartitionKeys()), tablePath),
809+
true);
810+
} catch (NoSuchObjectException e) {
811+
if (!ignoreIfNotExists) {
812+
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
813+
}
814+
} catch (MetaException | TableNotExistException | PartitionSpecInvalidException e) {
815+
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
816+
} catch (Exception e) {
817+
throw new CatalogException(
818+
String.format(
819+
"Failed to drop partition %s of table %s", partitionSpec, tablePath));
820+
} finally {
821+
if (writeClient != null) {
822+
writeClient.close();
823+
}
824+
}
781825
}
782826

783827
@Override
@@ -790,6 +834,45 @@ public void alterPartition(
790834
throw new HoodieCatalogException("Not supported.");
791835
}
792836

837+
/**
838+
* Get a list of ordered partition values by re-arranging them based on the given list of
839+
* partition keys. If the partition value is null, it'll be converted into default partition
840+
* name.
841+
*
842+
* @param partitionSpec a partition spec.
843+
* @param partitionKeys a list of partition keys.
844+
* @param tablePath path of the table to which the partition belongs.
845+
* @return A list of partition values ordered according to partitionKeys.
846+
* @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have
847+
* different sizes, or any key in partitionKeys doesn't exist in partitionSpec.
848+
*/
849+
@VisibleForTesting
850+
public List<String> getOrderedFullPartitionValues(
851+
CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath)
852+
throws PartitionSpecInvalidException {
853+
Map<String, String> spec = partitionSpec.getPartitionSpec();
854+
if (spec.size() != partitionKeys.size()) {
855+
throw new PartitionSpecInvalidException(
856+
getName(), partitionKeys, tablePath, partitionSpec);
857+
}
858+
859+
List<String> values = new ArrayList<>(spec.size());
860+
for (String key : partitionKeys) {
861+
if (!spec.containsKey(key)) {
862+
throw new PartitionSpecInvalidException(
863+
getName(), partitionKeys, tablePath, partitionSpec);
864+
} else {
865+
String value = spec.get(key);
866+
if (value == null) {
867+
value = getHiveConf().getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
868+
}
869+
values.add(value);
870+
}
871+
}
872+
873+
return values;
874+
}
875+
793876
@Override
794877
public List<String> listFunctions(String databaseName)
795878
throws DatabaseNotExistException, CatalogException {
@@ -906,4 +989,23 @@ private Map<String, String> supplementOptions(
906989
return newOptions;
907990
}
908991
}
992+
993+
private HoodieFlinkWriteClient<?> createWriteClient(
994+
ObjectPath tablePath,
995+
CatalogBaseTable table) throws Exception {
996+
Map<String, String> options = table.getOptions();
997+
// enable auto-commit though ~
998+
options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
999+
return StreamerUtil.createWriteClient(
1000+
Configuration.fromMap(options)
1001+
.set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
1002+
.set(FlinkOptions.SOURCE_AVRO_SCHEMA,
1003+
HoodieTableMetaClient.builder().setBasePath(inferTablePath(tablePath, table)).setConf(hiveConf).build()
1004+
.getTableConfig().getTableCreateSchema().get().toString()));
1005+
}
1006+
1007+
@VisibleForTesting
1008+
public IMetaStoreClient getClient() {
1009+
return client;
1010+
}
9091011
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,34 @@
1919
package org.apache.hudi.table.catalog;
2020

2121
import org.apache.hudi.common.fs.FSUtils;
22+
import org.apache.hudi.common.model.HoodieCommitMetadata;
23+
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
2224
import org.apache.hudi.common.model.HoodieTableType;
25+
import org.apache.hudi.common.table.HoodieTableMetaClient;
26+
import org.apache.hudi.common.table.timeline.HoodieInstant;
2327
import org.apache.hudi.configuration.FlinkOptions;
2428
import org.apache.hudi.exception.HoodieCatalogException;
29+
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
2530
import org.apache.hudi.util.StreamerUtil;
2631

2732
import org.apache.flink.table.api.DataTypes;
2833
import org.apache.flink.table.api.Schema;
2934
import org.apache.flink.table.api.TableSchema;
3035
import org.apache.flink.table.catalog.CatalogBaseTable;
36+
import org.apache.flink.table.catalog.CatalogPartitionSpec;
3137
import org.apache.flink.table.catalog.CatalogTable;
3238
import org.apache.flink.table.catalog.CatalogTableImpl;
3339
import org.apache.flink.table.catalog.ObjectPath;
3440
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
41+
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
3542
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
3643
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
3744
import org.apache.flink.table.factories.FactoryUtil;
3845
import org.apache.hadoop.conf.Configuration;
3946
import org.apache.hadoop.fs.Path;
47+
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
48+
import org.apache.hadoop.hive.metastore.api.Partition;
49+
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
4050
import org.apache.hadoop.hive.metastore.api.Table;
4151
import org.junit.jupiter.api.AfterAll;
4252
import org.junit.jupiter.api.AfterEach;
@@ -53,8 +63,13 @@
5363
import java.util.stream.Collectors;
5464

5565
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
66+
import static org.hamcrest.CoreMatchers.instanceOf;
67+
import static org.hamcrest.CoreMatchers.is;
68+
import static org.hamcrest.MatcherAssert.assertThat;
5669
import static org.junit.jupiter.api.Assertions.assertEquals;
70+
import static org.junit.jupiter.api.Assertions.assertNotNull;
5771
import static org.junit.jupiter.api.Assertions.assertNull;
72+
import static org.junit.jupiter.api.Assertions.assertThrows;
5873
import static org.junit.jupiter.api.Assertions.assertTrue;
5974

6075
/**
@@ -210,4 +225,50 @@ public void testRenameTable() throws Exception {
210225

211226
hoodieCatalog.renameTable(new ObjectPath("default", "test1"), "test", false);
212227
}
228+
229+
@Test
230+
public void testDropPartition() throws Exception {
231+
Map<String, String> options = new HashMap<>();
232+
options.put(FactoryUtil.CONNECTOR.key(), "hudi");
233+
CatalogTable table =
234+
new CatalogTableImpl(schema, partitions, options, "hudi table");
235+
hoodieCatalog.createTable(tablePath, table, false);
236+
237+
CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(new HashMap<String, String>() {
238+
{
239+
put("par1", "20221020");
240+
}
241+
});
242+
// drop non-exist partition
243+
assertThrows(PartitionNotExistException.class,
244+
() -> hoodieCatalog.dropPartition(tablePath, partitionSpec, false));
245+
246+
Table hiveTable = hoodieCatalog.getHiveTable(tablePath);
247+
StorageDescriptor partitionSd = new StorageDescriptor(hiveTable.getSd());
248+
partitionSd.setLocation(new Path(partitionSd.getLocation(), HoodieCatalogUtil.inferPartitionPath(true, partitionSpec)).toString());
249+
hoodieCatalog.getClient().add_partition(new Partition(Collections.singletonList("20221020"),
250+
tablePath.getDatabaseName(), tablePath.getObjectName(), 0, 0, partitionSd, null));
251+
assertNotNull(getHivePartition(partitionSpec, hiveTable));
252+
253+
// drop partition 'par1'
254+
hoodieCatalog.dropPartition(tablePath, partitionSpec, false);
255+
256+
String tablePathStr = hoodieCatalog.inferTablePath(tablePath, hoodieCatalog.getTable(tablePath));
257+
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tablePathStr, hoodieCatalog.getHiveConf());
258+
HoodieInstant latestInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().orElse(null);
259+
assertNotNull(latestInstant, "Delete partition commit should be completed");
260+
HoodieCommitMetadata commitMetadata = WriteProfiles.getCommitMetadata("tb1", new org.apache.flink.core.fs.Path(tablePathStr),
261+
latestInstant, metaClient.getActiveTimeline());
262+
assertThat(commitMetadata, instanceOf(HoodieReplaceCommitMetadata.class));
263+
HoodieReplaceCommitMetadata replaceCommitMetadata = (HoodieReplaceCommitMetadata) commitMetadata;
264+
assertThat(replaceCommitMetadata.getPartitionToReplaceFileIds().size(), is(1));
265+
assertThrows(NoSuchObjectException.class, () -> getHivePartition(partitionSpec, hiveTable));
266+
}
267+
268+
private Partition getHivePartition(CatalogPartitionSpec partitionSpec, Table hiveTable) throws Exception {
269+
return hoodieCatalog.getClient().getPartition(tablePath.getDatabaseName(),
270+
tablePath.getObjectName(),
271+
hoodieCatalog.getOrderedFullPartitionValues(
272+
partitionSpec, HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), tablePath));
273+
}
213274
}

0 commit comments

Comments
 (0)