Skip to content

Commit 96330b1

Browse files
wxploveccdanny0405
authored andcommitted
[HUDI-4311] Fix Flink lose data on some rollback scene (apache#5950)
(cherry picked from commit 3a1fd22)
1 parent 451a14e commit 96330b1

File tree

2 files changed

+1
-3
lines changed

2 files changed

+1
-3
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public void notifyCheckpointComplete(long checkpointId) {
265265

266266
@Override
267267
public void notifyCheckpointAborted(long checkpointId) {
268-
if (checkpointId == this.checkpointId) {
268+
if (checkpointId == this.checkpointId && !WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
269269
executor.execute(() -> {
270270
this.ckpMetadata.abortInstant(this.instant);
271271
}, "abort instant %s", this.instant);

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ public void close() {
9797
public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
9898
fs.delete(path, true);
9999
fs.mkdirs(path);
100-
metaClient.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction()
101-
.lastInstant().ifPresent(instant -> startInstant(instant.getTimestamp()));
102100
}
103101

104102
public void startInstant(String instant) {

0 commit comments

Comments
 (0)