Skip to content

Commit 34ff529

Browse files
codopeyuzhaojing
authored andcommitted
[HUDI-4832] Fix drop partition meta sync (#6662)
1 parent 5a8fab6 commit 34ff529

8 files changed

Lines changed: 84 additions & 61 deletions

File tree

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hudi.common.table.HoodieTableMetaClient;
2727
import org.apache.hudi.common.util.Option;
2828
import org.apache.hudi.exception.HoodieIOException;
29+
2930
import org.apache.log4j.LogManager;
3031
import org.apache.log4j.Logger;
3132

@@ -52,11 +53,34 @@ public class TimelineUtils {
5253
* Returns partitions that have new data strictly after commitTime.
5354
* Does not include internal operations such as clean in the timeline.
5455
*/
55-
public static List<String> getPartitionsWritten(HoodieTimeline timeline) {
56+
public static List<String> getWrittenPartitions(HoodieTimeline timeline) {
5657
HoodieTimeline timelineToSync = timeline.getWriteTimeline();
5758
return getAffectedPartitions(timelineToSync);
5859
}
5960

61+
/**
62+
* Returns partitions that have been deleted or marked for deletion in the given timeline.
63+
* Does not include internal operations such as clean in the timeline.
64+
*/
65+
public static List<String> getDroppedPartitions(HoodieTimeline timeline) {
66+
HoodieTimeline replaceCommitTimeline = timeline.getWriteTimeline().filterCompletedInstants().getCompletedReplaceTimeline();
67+
68+
return replaceCommitTimeline.getInstants().flatMap(instant -> {
69+
try {
70+
HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes(
71+
replaceCommitTimeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
72+
if (WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
73+
Map<String, List<String>> partitionToReplaceFileIds = commitMetadata.getPartitionToReplaceFileIds();
74+
return partitionToReplaceFileIds.keySet().stream();
75+
} else {
76+
return Stream.empty();
77+
}
78+
} catch (IOException e) {
79+
throw new HoodieIOException("Failed to get partitions modified at " + instant, e);
80+
}
81+
}).distinct().filter(partition -> !partition.isEmpty()).collect(Collectors.toList());
82+
}
83+
6084
/**
6185
* Returns partitions that have been modified including internal operations such as clean in the passed timeline.
6286
*/

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1357,22 +1357,4 @@ public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableC
13571357
inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
13581358
return inflightAndCompletedPartitions;
13591359
}
1360-
1361-
/**
1362-
* Get Last commit's Metadata.
1363-
*/
1364-
public static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
1365-
try {
1366-
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
1367-
if (timeline.lastInstant().isPresent()) {
1368-
HoodieInstant instant = timeline.lastInstant().get();
1369-
byte[] data = timeline.getInstantDetails(instant).get();
1370-
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
1371-
} else {
1372-
return Option.empty();
1373-
}
1374-
} catch (Exception e) {
1375-
throw new HoodieException("Failed to get commit metadata", e);
1376-
}
1377-
}
13781360
}

hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,11 @@ public void testGetPartitions() throws IOException {
130130
assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"}));
131131

132132
// verify only commit actions
133-
partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
133+
partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
134134
assertEquals(4, partitions.size());
135135
assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));
136136

137-
partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
137+
partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
138138
assertEquals(3, partitions.size());
139139
assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
140140
}

hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, b
194194
if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
195195
writtenPartitionsSince = new ArrayList<>();
196196
} else {
197-
writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
197+
writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
198198
}
199199
LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size());
200200

hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.List;
4444
import java.util.Map;
4545
import java.util.Properties;
46+
import java.util.Set;
4647
import java.util.stream.Collectors;
4748

4849
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
@@ -199,9 +200,6 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
199200
// Check if the necessary table exists
200201
boolean tableExists = syncClient.tableExists(tableName);
201202

202-
// check if isDropPartition
203-
boolean isDropPartition = syncClient.isDropPartition();
204-
205203
// Get the parquet schema for this table looking at the latest commit
206204
MessageType schema = syncClient.getStorageSchema();
207205

@@ -225,11 +223,13 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
225223
lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName);
226224
}
227225
LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
228-
List<String> writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
226+
List<String> writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
229227
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
230228

231229
// Sync the partitions if needed
232-
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
230+
// find dropped partitions, if any, in the latest commit
231+
Set<String> droppedPartitions = syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
232+
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions);
233233
boolean meetSyncConditions = schemaChanged || partitionsChanged;
234234
if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
235235
syncClient.updateLastCommitTimeSynced(tableName);
@@ -310,12 +310,12 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
310310
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
311311
* partition path does not match, it updates the partition path).
312312
*/
313-
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, boolean isDropPartition) {
313+
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, Set<String> droppedPartitions) {
314314
boolean partitionsChanged;
315315
try {
316316
List<Partition> hivePartitions = syncClient.getAllPartitions(tableName);
317317
List<PartitionEvent> partitionEvents =
318-
syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition);
318+
syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, droppedPartitions);
319319

320320
List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
321321
if (!newPartitions.isEmpty()) {

hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode)
197197
+ "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");
198198

199199
hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
200-
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
201-
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
200+
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.empty());
201+
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
202202
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
203203
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
204204
"The one partition event must of type UPDATE");
@@ -475,10 +475,10 @@ public void testSyncIncremental(String syncMode) throws Exception {
475475

476476
// Lets do the sync
477477
reSyncHiveTable();
478-
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
478+
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(commitTime1));
479479
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
480480
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
481-
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
481+
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
482482
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
483483
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
484484

@@ -754,10 +754,10 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
754754
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
755755

756756
reinitHiveSyncClient();
757-
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
757+
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(instantTime));
758758
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
759759
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
760-
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
760+
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
761761
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
762762
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
763763

@@ -784,7 +784,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
784784
"Table partitions should match the number of partitions we wrote");
785785
assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
786786
"The last commit that was synced should be updated in the TBLPROPERTIES");
787-
assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
787+
assertEquals(1, hiveClient.getWrittenPartitionsSince(Option.of(commitTime2)).size());
788788
}
789789

790790
@ParameterizedTest
@@ -854,17 +854,33 @@ public void testDropPartition(String syncMode) throws Exception {
854854
"Table partitions should match the number of partitions we wrote");
855855
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
856856
"The last commit that was synced should be updated in the TBLPROPERTIES");
857+
// add a partition but do not sync
858+
String instantTime2 = "101";
859+
String newPartition = "2010/02/01";
860+
HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
861+
HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME);
862+
partitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
863+
assertEquals(1, partitions.size(),
864+
"Table partitions should match the number of partitions we wrote");
865+
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
866+
"The last commit that was synced should be updated in the TBLPROPERTIES");
867+
868+
// create two replace commits to delete current partitions, but do not sync in between
857869
String partitiontoDelete = partitions.get(0).getValues().get(0).replace("-", "/");
858-
// create a replace commit to delete current partitions+
859-
HiveTestUtil.createReplaceCommit("101", partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true);
870+
String instantTime3 = "102";
871+
HiveTestUtil.createReplaceCommit(instantTime3, partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true);
872+
String instantTime4 = "103";
873+
HiveTestUtil.createReplaceCommit(instantTime4, newPartition, WriteOperationType.DELETE_PARTITION, true, true);
860874

861-
// sync drop partitions
875+
// now run hive sync
862876
reinitHiveSyncClient();
863877
reSyncHiveTable();
864878

865879
List<Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
866880
assertEquals(0, hivePartitions.size(),
867-
"Table should have 0 partition because of the drop the only one partition");
881+
"Table should have no partitions");
882+
assertEquals(instantTime4, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
883+
"The last commit that was synced should be updated in the TBLPROPERTIES");
868884
}
869885

870886
@ParameterizedTest

hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ public static void createCommitFile(HoodieCommitMetadata commitMetadata, String
493493
fsout.close();
494494
}
495495

496-
public static void createReplaceCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException {
496+
public static void createReplaceCommitFile(HoodieReplaceCommitMetadata commitMetadata, String instantTime) throws IOException {
497497
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
498498
Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
499499
+ HoodieTimeline.makeReplaceFileName(instantTime));

hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,13 @@
2020

2121
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
2222
import org.apache.hudi.common.fs.FSUtils;
23-
import org.apache.hudi.common.model.HoodieCommitMetadata;
2423
import org.apache.hudi.common.model.HoodieTableType;
25-
import org.apache.hudi.common.model.WriteOperationType;
2624
import org.apache.hudi.common.table.HoodieTableMetaClient;
2725
import org.apache.hudi.common.table.TableSchemaResolver;
2826
import org.apache.hudi.common.table.timeline.HoodieTimeline;
2927
import org.apache.hudi.common.table.timeline.TimelineUtils;
3028
import org.apache.hudi.common.util.Option;
3129
import org.apache.hudi.common.util.ReflectionUtils;
32-
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
3330
import org.apache.hudi.sync.common.model.Partition;
3431
import org.apache.hudi.sync.common.model.PartitionEvent;
3532
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
@@ -41,8 +38,10 @@
4138

4239
import java.util.ArrayList;
4340
import java.util.HashMap;
41+
import java.util.HashSet;
4442
import java.util.List;
4543
import java.util.Map;
44+
import java.util.Set;
4645

4746
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
4847
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
@@ -83,18 +82,17 @@ public boolean isBootstrap() {
8382
return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
8483
}
8584

86-
public boolean isDropPartition() {
87-
try {
88-
Option<HoodieCommitMetadata> hoodieCommitMetadata = HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient);
89-
90-
if (hoodieCommitMetadata.isPresent()
91-
&& WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) {
92-
return true;
93-
}
94-
} catch (Exception e) {
95-
throw new HoodieSyncException("Failed to get commit metadata", e);
96-
}
97-
return false;
85+
/**
86+
* Get the set of dropped partitions since the last synced commit.
87+
* If last sync time is not known then consider only active timeline.
88+
* Going through archive timeline is a costly operation, and it should be avoided unless some start time is given.
89+
*/
90+
public Set<String> getDroppedPartitionsSince(Option<String> lastCommitTimeSynced) {
91+
HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
92+
.mergeTimeline(metaClient.getActiveTimeline())
93+
.getCommitsTimeline()
94+
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline();
95+
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
9896
}
9997

10098
@Override
@@ -106,7 +104,7 @@ public MessageType getStorageSchema() {
106104
}
107105
}
108106

109-
public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
107+
public List<String> getWrittenPartitionsSince(Option<String> lastCommitTimeSynced) {
110108
if (!lastCommitTimeSynced.isPresent()) {
111109
LOG.info("Last commit time synced is not known, listing all partitions in "
112110
+ config.getString(META_SYNC_BASE_PATH)
@@ -118,16 +116,19 @@ public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSyn
118116
config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
119117
} else {
120118
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
121-
return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
122-
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
119+
return TimelineUtils.getWrittenPartitions(
120+
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
121+
.mergeTimeline(metaClient.getActiveTimeline())
122+
.getCommitsTimeline()
123+
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
123124
}
124125
}
125126

126127
/**
127128
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
128129
* Generate a list of PartitionEvent based on the changes required.
129130
*/
130-
public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) {
131+
public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, Set<String> droppedPartitions) {
131132
Map<String, String> paths = new HashMap<>();
132133
for (Partition tablePartition : tablePartitions) {
133134
List<String> hivePartitionValues = tablePartition.getValues();
@@ -143,7 +144,7 @@ public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions,
143144
// Check if the partition values or if hdfs path is the same
144145
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
145146

146-
if (isDropPartition) {
147+
if (droppedPartitions.contains(storagePartition)) {
147148
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
148149
} else {
149150
if (!storagePartitionValues.isEmpty()) {

0 commit comments

Comments
 (0)