Skip to content

Commit 28e3db1

Browse files
voonhousTengHuodanny0405
authored
[HUDI-4907] Prevent single commit multi instant issue (#6766)
Co-authored-by: TengHuo <teng_huo@outlook.com> Co-authored-by: yuzhao.cyz <yuzhao.cyz@gmail.com>
1 parent db03e1f commit 28e3db1

3 files changed

Lines changed: 6 additions & 5 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ private static void initMetadataTable(HoodieFlinkWriteClient<?> writeClient) {
341341

342342
private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException {
343343
CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath());
344-
ckpMetadata.bootstrap(metaClient);
344+
ckpMetadata.bootstrap();
345345
return ckpMetadata;
346346
}
347347

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void close() {
9494
*
9595
* <p>This expects to be called by the driver.
9696
*/
97-
public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
97+
public void bootstrap() throws IOException {
9898
fs.delete(path, true);
9999
fs.mkdirs(path);
100100
}
@@ -173,8 +173,8 @@ private void load() {
173173
@Nullable
174174
public String lastPendingInstant() {
175175
load();
176-
for (int i = this.messages.size() - 1; i >= 0; i--) {
177-
CkpMessage ckpMsg = this.messages.get(i);
176+
if (this.messages.size() > 0) {
177+
CkpMessage ckpMsg = this.messages.get(this.messages.size() - 1);
178178
// consider 'aborted' as pending too to reuse the instant
179179
if (!ckpMsg.isComplete()) {
180180
return ckpMsg.getInstant();

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.File;
3333
import java.util.stream.IntStream;
3434

35+
import static org.hamcrest.CoreMatchers.equalTo;
3536
import static org.hamcrest.CoreMatchers.is;
3637
import static org.hamcrest.MatcherAssert.assertThat;
3738

@@ -63,7 +64,7 @@ void testWriteAndReadMessage() {
6364

6465
assertThat(metadata.lastPendingInstant(), is("2"));
6566
metadata.commitInstant("2");
66-
assertThat(metadata.lastPendingInstant(), is("1"));
67+
assertThat(metadata.lastPendingInstant(), equalTo(null));
6768

6869
// test cleaning
6970
IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + ""));

0 commit comments

Comments
 (0)