-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-4832] Fix drop partition meta sync #6662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,15 +21,16 @@ | |
| import org.apache.hudi.common.engine.HoodieLocalEngineContext; | ||
| import org.apache.hudi.common.fs.FSUtils; | ||
| import org.apache.hudi.common.model.HoodieCommitMetadata; | ||
| import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; | ||
| import org.apache.hudi.common.model.HoodieTableType; | ||
| import org.apache.hudi.common.model.WriteOperationType; | ||
| import org.apache.hudi.common.table.HoodieTableMetaClient; | ||
| import org.apache.hudi.common.table.TableSchemaResolver; | ||
| import org.apache.hudi.common.table.timeline.HoodieInstant; | ||
| import org.apache.hudi.common.table.timeline.HoodieTimeline; | ||
| import org.apache.hudi.common.table.timeline.TimelineUtils; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.common.util.ReflectionUtils; | ||
| import org.apache.hudi.metadata.HoodieTableMetadataUtil; | ||
| import org.apache.hudi.exception.HoodieException; | ||
| import org.apache.hudi.sync.common.model.Partition; | ||
| import org.apache.hudi.sync.common.model.PartitionEvent; | ||
| import org.apache.hudi.sync.common.model.PartitionValueExtractor; | ||
|
|
@@ -41,8 +42,10 @@ | |
|
|
||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
||
| import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION; | ||
| import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; | ||
|
|
@@ -83,18 +86,17 @@ public boolean isBootstrap() { | |
| return metaClient.getTableConfig().getBootstrapBasePath().isPresent(); | ||
| } | ||
|
|
||
| public boolean isDropPartition() { | ||
| try { | ||
| Option<HoodieCommitMetadata> hoodieCommitMetadata = HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient); | ||
|
|
||
| if (hoodieCommitMetadata.isPresent() | ||
| && WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) { | ||
| return true; | ||
| } | ||
| } catch (Exception e) { | ||
| throw new HoodieSyncException("Failed to get commit metadata", e); | ||
| } | ||
| return false; | ||
| /** | ||
| * Get the set of dropped partitions since the last synced commit. | ||
| * If last sync time is not known then consider only active timeline. | ||
| * Going through archive timeline is a costly operation, and it should be avoided unless some start time is given. | ||
| */ | ||
| public Set<String> getDroppedPartitions(Option<String> lastCommitTimeSynced) { | ||
xushiyan marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) | ||
| .mergeTimeline(metaClient.getActiveTimeline()) | ||
| .getCommitsTimeline() | ||
| .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline(); | ||
| return new HashSet<>(TimelineUtils.getPartitionsDropped(timeline)); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -118,16 +120,19 @@ public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSyn | |
| config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION)); | ||
| } else { | ||
| LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); | ||
| return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline() | ||
| .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); | ||
| return TimelineUtils.getPartitionsWritten( | ||
| metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) | ||
| .mergeTimeline(metaClient.getActiveTimeline()) | ||
| .getCommitsTimeline() | ||
| .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. | ||
| * Generate a list of PartitionEvent based on the changes required. | ||
| */ | ||
| public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) { | ||
| public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, Set<String> droppedPartitions) { | ||
| Map<String, String> paths = new HashMap<>(); | ||
| for (Partition tablePartition : tablePartitions) { | ||
| List<String> hivePartitionValues = tablePartition.getValues(); | ||
|
|
@@ -143,7 +148,7 @@ public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, | |
| // Check if the partition values or if hdfs path is the same | ||
| List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); | ||
|
|
||
| if (isDropPartition) { | ||
| if (droppedPartitions.contains(storagePartition)) { | ||
| events.add(PartitionEvent.newPartitionDropEvent(storagePartition)); | ||
| } else { | ||
| if (!storagePartitionValues.isEmpty()) { | ||
|
|
@@ -158,4 +163,23 @@ public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, | |
| } | ||
| return events; | ||
| } | ||
|
|
||
| /** | ||
| * Get Last commit's Metadata. | ||
| */ | ||
| private static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) { | ||
|
||
| try { | ||
| HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); | ||
| if (timeline.lastInstant().isPresent()) { | ||
| HoodieInstant instant = timeline.lastInstant().get(); | ||
| byte[] data = timeline.getInstantDetails(instant).get(); | ||
| return HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()) ? Option.of(HoodieReplaceCommitMetadata.fromBytes(data, HoodieReplaceCommitMetadata.class)) : | ||
| Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); | ||
| } else { | ||
| return Option.empty(); | ||
| } | ||
| } catch (Exception e) { | ||
| throw new HoodieException("Failed to get commit metadata", e); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.