Skip to content

Commit 72b46a4

Browse files
committed
[HUDI-5049] HoodieCatalog supports the implementation of dropPartition
1 parent 4e9bf13 commit 72b46a4

7 files changed

Lines changed: 228 additions & 8 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, S
210210
* @param partitions {@link List} of partition to be deleted
211211
* @return HoodieWriteMetadata
212212
*/
213-
public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions);
213+
public abstract HoodieWriteMetadata<O> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions);
214214

215215
/**
216216
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,14 @@ public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
254254
return postWrite(result, instantTime, table);
255255
}
256256

257+
public List<WriteStatus> deletePartitions(List<String> partitions, String instantTime) {
258+
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
259+
initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
260+
preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
261+
HoodieWriteMetadata<List<WriteStatus>> result = table.deletePartitions(context, instantTime, partitions);
262+
return postWrite(result, instantTime, table);
263+
}
264+
257265
@Override
258266
public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
259267
setOperationType(writeOperationType);

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
5858
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
5959
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
60+
import org.apache.hudi.table.action.commit.FlinkDeletePartitionCommitActionExecutor;
6061
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
6162
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
6263
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor;
@@ -243,8 +244,8 @@ public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context
243244
}
244245

245246
@Override
246-
public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
247-
throw new HoodieNotSupportedException("DeletePartitions is not supported yet");
247+
public HoodieWriteMetadata<List<WriteStatus>> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
248+
return new FlinkDeletePartitionCommitActionExecutor<>(context, config, this, instantTime, partitions).execute();
248249
}
249250

250251
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.table.action.commit;
20+
21+
import org.apache.hadoop.fs.Path;
22+
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
23+
import org.apache.hudi.client.WriteStatus;
24+
import org.apache.hudi.common.engine.HoodieEngineContext;
25+
import org.apache.hudi.common.model.FileSlice;
26+
import org.apache.hudi.common.model.HoodieRecordPayload;
27+
import org.apache.hudi.common.model.WriteOperationType;
28+
import org.apache.hudi.common.table.timeline.HoodieInstant;
29+
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
30+
import org.apache.hudi.common.util.HoodieTimer;
31+
import org.apache.hudi.common.util.collection.Pair;
32+
import org.apache.hudi.config.HoodieWriteConfig;
33+
import org.apache.hudi.exception.HoodieDeletePartitionException;
34+
import org.apache.hudi.table.HoodieTable;
35+
import org.apache.hudi.table.WorkloadProfile;
36+
import org.apache.hudi.table.WorkloadStat;
37+
import org.apache.hudi.table.action.HoodieWriteMetadata;
38+
39+
import java.time.Duration;
40+
import java.util.Collections;
41+
import java.util.HashMap;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.stream.Collectors;
45+
46+
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
47+
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
48+
49+
public class FlinkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayload<T>>
50+
extends FlinkInsertOverwriteCommitActionExecutor<T> {
51+
52+
private final List<String> partitions;
53+
54+
public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
55+
HoodieWriteConfig config,
56+
HoodieTable<?, ?, ?, ?> table,
57+
String instantTime,
58+
List<String> partitions) {
59+
super(context, null, config, table, instantTime, null, WriteOperationType.DELETE_PARTITION);
60+
this.partitions = partitions;
61+
}
62+
63+
@Override
64+
public HoodieWriteMetadata<List<WriteStatus>> execute() {
65+
try {
66+
HoodieTimer timer = new HoodieTimer().startTimer();
67+
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
68+
Map<String, List<String>> partitionToReplaceFileIds =
69+
context.parallelize(partitions).distinct().collectAsList()
70+
.stream().collect(Collectors.toMap(partitionPath -> partitionPath, this::getAllExistingFileIds));
71+
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
72+
result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
73+
result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
74+
result.setWriteStatuses(Collections.emptyList());
75+
76+
// created requested
77+
HoodieInstant dropPartitionsInstant = new HoodieInstant(REQUESTED, REPLACE_COMMIT_ACTION, instantTime);
78+
if (!table.getMetaClient().getFs().exists(new Path(table.getMetaClient().getMetaPath(),
79+
dropPartitionsInstant.getFileName()))) {
80+
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
81+
.setOperationType(WriteOperationType.DELETE_PARTITION.name())
82+
.setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
83+
.build();
84+
table.getMetaClient().getActiveTimeline().saveToPendingReplaceCommit(dropPartitionsInstant,
85+
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
86+
}
87+
88+
this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())),
89+
instantTime);
90+
this.commitOnAutoCommit(result);
91+
return result;
92+
} catch (Exception e) {
93+
throw new HoodieDeletePartitionException("Failed to drop partitions for commit time " + instantTime, e);
94+
}
95+
}
96+
97+
private List<String> getAllExistingFileIds(String partitionPath) {
98+
// because new commit is not complete. it is safe to mark all existing file Ids as old files
99+
return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
100+
}
101+
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818

1919
package org.apache.hudi.table.catalog;
2020

21+
import org.apache.hudi.client.HoodieFlinkWriteClient;
2122
import org.apache.hudi.common.fs.FSUtils;
2223
import org.apache.hudi.common.table.HoodieTableMetaClient;
2324
import org.apache.hudi.common.table.TableSchemaResolver;
25+
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
2426
import org.apache.hudi.configuration.FlinkOptions;
2527
import org.apache.hudi.configuration.HadoopConfigurations;
28+
import org.apache.hudi.exception.HoodieMetadataException;
2629
import org.apache.hudi.util.AvroSchemaConverter;
2730
import org.apache.hudi.util.StreamerUtil;
2831

2932
import org.apache.avro.Schema;
33+
import org.apache.flink.annotation.VisibleForTesting;
3034
import org.apache.flink.configuration.Configuration;
3135
import org.apache.flink.table.catalog.AbstractCatalog;
3236
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -382,7 +386,7 @@ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec
382386

383387
@Override
384388
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec) throws CatalogException {
385-
return false;
389+
return StreamerUtil.partitionExists(inferTablePath(catalogPathStr, tablePath), inferPartitionPath(catalogPartitionSpec), hadoopConf);
386390
}
387391

388392
@Override
@@ -394,7 +398,27 @@ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPa
394398
@Override
395399
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, boolean ignoreIfNotExists)
396400
throws PartitionNotExistException, CatalogException {
397-
throw new UnsupportedOperationException("dropPartition is not implemented.");
401+
if (!partitionExists(tablePath, catalogPartitionSpec)) {
402+
if (ignoreIfNotExists) {
403+
return;
404+
} else {
405+
throw new PartitionNotExistException(getName(), tablePath, catalogPartitionSpec);
406+
}
407+
}
408+
409+
String tablePathStr = inferTablePath(catalogPathStr, tablePath);
410+
String partitionPathStr = inferPartitionPath(catalogPartitionSpec);
411+
try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(tablePathStr, tablePath)) {
412+
writeClient.deletePartitions(Collections.singletonList(partitionPathStr), HoodieActiveTimeline.createNewInstantTime())
413+
.forEach(writeStatus -> {
414+
if (writeStatus.hasErrors()) {
415+
throw new HoodieMetadataException(String.format("Failed to commit metadata table records at file id %s.", writeStatus.getFileId()));
416+
}
417+
});
418+
fs.delete(new Path(tablePathStr, partitionPathStr), true);
419+
} catch (Exception e) {
420+
throw new CatalogException(String.format("Dropping partition %s of table %s exception.", partitionPathStr, tablePath), e);
421+
}
398422
}
399423

400424
@Override
@@ -505,7 +529,21 @@ private Map<String, String> applyOptionsHook(String tablePath, Map<String, Strin
505529
return newOptions;
506530
}
507531

508-
private String inferTablePath(String catalogPath, ObjectPath tablePath) {
532+
private HoodieFlinkWriteClient<?> createWriteClient(String tablePathStr, ObjectPath tablePath) throws IOException {
533+
return StreamerUtil.createWriteClient(
534+
Configuration.fromMap(TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf))
535+
.set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
536+
.set(FlinkOptions.SOURCE_AVRO_SCHEMA,
537+
StreamerUtil.createMetaClient(tablePathStr, hadoopConf).getTableConfig().getTableCreateSchema().get().toString()));
538+
}
539+
540+
@VisibleForTesting
541+
protected String inferTablePath(String catalogPath, ObjectPath tablePath) {
509542
return String.format("%s/%s/%s", catalogPath, tablePath.getDatabaseName(), tablePath.getObjectName());
510543
}
544+
545+
private String inferPartitionPath(CatalogPartitionSpec catalogPartitionSpec) {
546+
return String.join("/", catalogPartitionSpec.getPartitionSpec().entrySet()
547+
.stream().map(entry -> String.format("%s=%s", entry.getKey(), entry.getValue())).collect(Collectors.toList()));
548+
}
511549
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,23 @@ public static boolean tableExists(String basePath, org.apache.hadoop.conf.Config
345345
}
346346
}
347347

348+
/**
349+
* Returns whether the hoodie partition exists under given table path {@code tablePath} and partition path {@code partitionPath}.
350+
*
351+
* @param tablePath Base path of the table.
352+
* @param partitionPath The path of the partition.
353+
* @param hadoopConf The hadoop configuration.
354+
*/
355+
public static boolean partitionExists(String tablePath, String partitionPath, org.apache.hadoop.conf.Configuration hadoopConf) {
356+
// Hadoop FileSystem
357+
FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
358+
try {
359+
return fs.exists(new Path(tablePath, partitionPath));
360+
} catch (IOException e) {
361+
throw new HoodieException(String.format("Error while checking whether table exists under table path [%s] and partition path [%s]", tablePath, partitionPath), e);
362+
}
363+
}
364+
348365
/**
349366
* Generates the bucket ID using format {partition path}_{fileID}.
350367
*/

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,20 @@
1818

1919
package org.apache.hudi.table.catalog;
2020

21+
import org.apache.flink.table.catalog.CatalogPartitionSpec;
22+
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
23+
import org.apache.hudi.client.HoodieFlinkWriteClient;
24+
import org.apache.hudi.common.model.HoodieAvroRecord;
25+
import org.apache.hudi.common.model.HoodieKey;
26+
import org.apache.hudi.common.model.HoodieRecordLocation;
27+
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
28+
import org.apache.hudi.common.testutils.RawTripTestPayload;
29+
import org.apache.hudi.common.util.Option;
30+
import org.apache.hudi.config.HoodieWriteConfig;
2131
import org.apache.hudi.configuration.FlinkOptions;
32+
import org.apache.hudi.metadata.HoodieTableMetadata;
33+
import org.apache.hudi.util.StreamerUtil;
34+
import org.apache.hudi.utils.TestConfigurations;
2235

2336
import org.apache.flink.configuration.Configuration;
2437
import org.apache.flink.table.api.DataTypes;
@@ -53,6 +66,7 @@
5366
import java.util.HashMap;
5467
import java.util.List;
5568
import java.util.Map;
69+
import java.util.UUID;
5670
import java.util.stream.Collectors;
5771

5872
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
@@ -119,6 +133,7 @@ public class TestHoodieCatalog {
119133
);
120134

121135
private TableEnvironment streamTableEnv;
136+
private String catalogPathStr;
122137
private HoodieCatalog catalog;
123138

124139
@TempDir
@@ -133,7 +148,8 @@ void beforeEach() {
133148
File testDb = new File(tempFile, TEST_DEFAULT_DATABASE);
134149
testDb.mkdir();
135150
Map<String, String> catalogOptions = new HashMap<>();
136-
catalogOptions.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
151+
catalogPathStr = tempFile.getAbsolutePath();
152+
catalogOptions.put(CATALOG_PATH.key(), catalogPathStr);
137153
catalogOptions.put(DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE);
138154
catalog = new HoodieCatalog("hudi", Configuration.fromMap(catalogOptions));
139155
catalog.open();
@@ -256,7 +272,7 @@ public void testGetTable() throws Exception {
256272
}
257273

258274
@Test
259-
public void dropTable() throws Exception {
275+
public void testDropTable() throws Exception {
260276
ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
261277
// create table
262278
catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
@@ -269,4 +285,43 @@ public void dropTable() throws Exception {
269285
assertThrows(TableNotExistException.class,
270286
() -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE, "non_exist"), false));
271287
}
288+
289+
@Test
290+
public void testDropPartition() throws Exception {
291+
ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
292+
// create table
293+
catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
294+
295+
CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(new HashMap<String, String>() {
296+
{
297+
put("log_date", "20220930");
298+
put("log_hour", "10");
299+
}
300+
});
301+
// drop non-exist partition
302+
assertThrows(PartitionNotExistException.class,
303+
() -> catalog.dropPartition(tablePath, partitionSpec, false));
304+
305+
String tablePathStr = catalog.inferTablePath(catalogPathStr, tablePath);
306+
try (HoodieFlinkWriteClient<?> writeClient = StreamerUtil.createWriteClient(
307+
TestConfigurations.getDefaultConf(tablePathStr))) {
308+
HoodieKey hoodieKey = new HoodieKey(UUID.randomUUID().toString(), "log_date=20220930/log_hour=10");
309+
String instantTime = HoodieActiveTimeline.createNewInstantTime();
310+
writeClient.insert(Collections.singletonList(
311+
new HoodieAvroRecord<>(hoodieKey, new RawTripTestPayload(Option.empty(), hoodieKey.getRecordKey(), hoodieKey.getPartitionPath(), null, true, 0L))
312+
.setCurrentLocation(new HoodieRecordLocation(instantTime, "0"))), instantTime);
313+
assertTrue(catalog.partitionExists(tablePath, partitionSpec));
314+
assertFalse(metadata(writeClient).getAllPartitionPaths().isEmpty());
315+
316+
catalog.dropPartition(tablePath, partitionSpec, false);
317+
assertFalse(catalog.partitionExists(tablePath, partitionSpec));
318+
assertTrue(metadata(writeClient).getAllPartitionPaths().isEmpty());
319+
}
320+
}
321+
322+
private HoodieTableMetadata metadata(HoodieFlinkWriteClient<?> writeClient) {
323+
HoodieWriteConfig clientConfig = writeClient.getConfig();
324+
return HoodieTableMetadata.create(writeClient.getEngineContext(), clientConfig.getMetadataConfig(), clientConfig.getBasePath(),
325+
clientConfig.getSpillableMapBasePath());
326+
}
272327
}

0 commit comments

Comments
 (0)