Skip to content

Commit 237fa48

Browse files
committed
[HUDI-4408] Reuse old rollover file as base file for flink merge handle
1 parent 0faa562 commit 237fa48

File tree

2 files changed

+8
-12
lines changed

2 files changed

+8
-12
lines changed

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,13 @@ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName,
143143
break;
144144
}
145145

146-
rolloverPaths.add(newFilePath);
146+
// Override the old file name,
147+
// In rare cases, when a checkpoint was aborted and the instant time
148+
// is reused, the merge handle generates a new file name
149+
// with the reused instant time of last checkpoint, which is duplicate,
150+
// use the same name file as new base file in case data loss.
151+
oldFilePath = newFilePath;
152+
rolloverPaths.add(oldFilePath);
147153
newFileName = newFileNameWithRollover(rollNumber++);
148154
newFilePath = makeNewFilePath(partitionPath, newFileName);
149155
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath + ", rolls over to new path: " + newFilePath);

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -408,16 +408,6 @@ private boolean hasData() {
408408
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
409409
}
410410

411-
private void cleanWriteHandles() {
412-
if (freshInstant(currentInstant)) {
413-
// In rare cases, when a checkpoint was aborted and the instant time
414-
// is reused, the merge handle generates a new file name
415-
// with the reused instant time of last checkpoint, the write handles
416-
// should be kept and reused in case data loss.
417-
this.writeClient.cleanHandles();
418-
}
419-
}
420-
421411
@SuppressWarnings("unchecked, rawtypes")
422412
private boolean flushBucket(DataBucket bucket) {
423413
String instant = instantToWrite(true);
@@ -489,7 +479,7 @@ private void flushRemaining(boolean endInput) {
489479
this.eventGateway.sendEventToCoordinator(event);
490480
this.buckets.clear();
491481
this.tracer.reset();
492-
cleanWriteHandles();
482+
this.writeClient.cleanHandles();
493483
this.writeStatuses.addAll(writeStatus);
494484
// blocks flushing until the coordinator starts a new instant
495485
this.confirming = true;

0 commit comments

Comments
 (0)