Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,15 @@ private void init(HoodieInstant instant) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata();
org.apache.hudi.avro.model.HoodieCommitMetadata inflightCommitMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata();
if (instant.isRequested()) {
if (requestedReplaceMetadata != null) {
// for insert_overwrite/insert_overwrite_table clusteringPlan will be empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to comment here, descript it on the jira ticket, it's enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add comments here as clustering and insert_overwrite are involved here

if (requestedReplaceMetadata != null && requestedReplaceMetadata.getClusteringPlan() != null) {
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
this.operationType = WriteOperationType.CLUSTER;
}
} else {
if (inflightCommitMetadata != null) {
this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata().getOperationType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking if this change has been tested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
The intent is right but it is a critical change. Please add a test to cover this scenario.

} else if (requestedReplaceMetadata != null) {
// inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,41 @@ public void testConcurrentWritesWithInterleavingSuccesssfulCommit() throws Excep
}
}

@Test
public void testConcurrentWritesWithReplaceInflightCommit() throws Exception {
createReplaceInflight(HoodieActiveTimeline.createNewInstantTime());
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
Option<HoodieInstant> lastSuccessfulInstant = Option.empty();

// writer 1 starts
String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
createInflightCommit(currentWriterInstant);
Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));

// writer 2 starts and finishes
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
createReplaceInflight(newInstantTime);

SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy();
HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant);
timeline = timeline.reload();

List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect(
Collectors.toList());

// writer 1 conflicts with writer 2
Assertions.assertTrue(candidateInstants.size() == 1);
ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation));
try {
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict");
} catch (HoodieWriteConflictException e) {
// expected
}
}

@Test
public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exception {
createCommit(HoodieActiveTimeline.createNewInstantTime());
Expand Down Expand Up @@ -394,6 +429,20 @@ private void createReplaceRequested(String instantTime) throws Exception {
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

private void createReplaceInflight(String instantTime) throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";

HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata();
inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId("file-1");
inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
HoodieTestTable.of(metaClient)
.addInflightReplace(instantTime, Option.of(inflightReplaceMetadata))
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ public HoodieTestTable addRequestedReplace(String instantTime, Option<HoodieRequ
return this;
}

public HoodieTestTable addInflightReplace(String instantTime, Option<HoodieCommitMetadata> inflightReplaceMetadata) throws Exception {
createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata);
currentInstantTime = instantTime;
return this;
}

public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
createRequestedCleanFile(basePath, instantTime, cleanerPlan);
createInflightCleanFile(basePath, instantTime, cleanerPlan);
Expand Down