Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Option<HoodieCompactionPlan> execute() {
+ ", Compaction scheduled at " + instantTime));
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().getInstants()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swuferhong @danny0405 If you take a look at the previous version of this file, the method is called before was commitsAndCompactionTimeline ->

public HoodieDefaultTimeline getCommitsAndCompactionTimeline() {

This follows the same behavior as getWriteTimeline().getInstants(). Can you please explain what is a possible bug here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the method getWriteTimeline() does not really follow the behavior of filterCompletedAndCompactionInstants,
getWriteTimeline() actually may include any INFLIGHT instants but filterCompletedAndCompactionInstants only include COMPACTION INFLIGHT instants.

We should allow scheduling compaction if there are inflight commits or inflight delta_commits.

Copy link
Contributor

@n3nash n3nash Jun 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 This class was NOT using filterCompletedAndCompactionInstants before. It was using commitsAndCompactionTimeline(). See this -> https://github.com/apache/hudi/blob/release-0.7.0/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/BaseScheduleCompactionActionExecutor.java#L65
Let me know if there is any confusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at the comments

// Committed and pending compaction instants should have strictly lower timestamps

I think the code before that used commitsAndCompactionTimeline() is already wrong, it add restrictions that we can not generate compaction plan when there are inflight commits, of course we can actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay got it. So you're saying this bug was always there even in 0.7 release ? Can we please add a test case for this before landing ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let us add a test case.

.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,30 @@ public void testCompactionEmpty() throws Exception {
}
}

@Test
public void testScheduleCompactionWithInflightInstant() {
HoodieWriteConfig config = getConfig();
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
// insert 100 records.
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
writeClient.insert(recordsRDD, newCommitTime).collect();

// create one inflight instance.
newCommitTime = "102";
writeClient.startCommitWithTime(newCommitTime);
metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());

// create one compaction instance before exist inflight instance.
String compactionTime = "101";
writeClient.scheduleCompactionAtInstant(compactionTime, Option.empty());
}
}

@Test
public void testWriteStatusContentsAfterCompaction() throws Exception {
// insert 100 records
Expand Down