Skip to content

Commit 2de2d75

Browse files
[HUDI-4152] Add IT for InstantCompactionPlanSelectStrategy
1 parent 8d0647a commit 2de2d75

4 files changed

Lines changed: 15 additions & 12 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@
2121
import java.util.Arrays;
2222
import java.util.Collections;
2323
import java.util.List;
24-
import java.util.stream.Stream;
24+
import java.util.stream.Collectors;
2525
import org.apache.hudi.common.table.timeline.HoodieInstant;
2626
import org.apache.hudi.common.table.timeline.HoodieTimeline;
2727
import org.apache.hudi.common.util.StringUtils;
28-
import org.apache.hudi.exception.HoodieException;
2928
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
3029
import org.apache.hudi.sink.compact.HoodieFlinkCompactor;
3130
import org.slf4j.Logger;
@@ -43,11 +42,9 @@ public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, Flin
4342
LOG.warn("None instant is selected");
4443
return Collections.emptyList();
4544
}
46-
Stream<String> instants = Arrays.stream(config.compactionPlanInstant.split(","));
47-
HoodieInstant specifiedInstant = pendingCompactionTimeline.getInstants()
48-
.filter(instant -> instants.anyMatch(i -> i.equals(instant.getTimestamp())))
49-
.findFirst()
50-
.orElseThrow(() -> new HoodieException("The instant " + config.compactionPlanInstant + " is not found in timeline"));
51-
return Collections.singletonList(specifiedInstant);
45+
List<String> instants = Arrays.asList(config.compactionPlanInstant.split(","));
46+
return pendingCompactionTimeline.getInstants()
47+
.filter(instant -> instants.contains(instant.getTimestamp()))
48+
.collect(Collectors.toList());
5249
}
5350
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class MultiCompactionPlanSelectStrategy implements CompactionPlanSelectSt
3333
@Override
3434
public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
3535
List<HoodieInstant> pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList());
36-
if (!CompactionUtil.isLIFO(config.compactionSeq)) {
36+
if (CompactionUtil.isLIFO(config.compactionSeq)) {
3737
Collections.reverse(pendingCompactionPlanInstants);
3838
}
3939
int range = Math.min(config.compactionPlanMaxSelect, pendingCompactionPlanInstants.size());

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ public class SingleCompactionPlanSelectStrategy implements CompactionPlanSelectS
3333
@Override
3434
public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
3535
Option<HoodieInstant> compactionPlanInstant = CompactionUtil.isLIFO(config.compactionSeq)
36-
? pendingCompactionTimeline.firstInstant()
37-
: pendingCompactionTimeline.lastInstant();
36+
? pendingCompactionTimeline.lastInstant()
37+
: pendingCompactionTimeline.firstInstant();
3838
if (compactionPlanInstant.isPresent()) {
3939
return Collections.singletonList(compactionPlanInstant.get());
4040
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,14 @@ void testInstantCompactionPlanSelectStrategy() {
119119
InstantCompactionPlanSelectStrategy strategy = new InstantCompactionPlanSelectStrategy();
120120
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig));
121121

122+
compactionConfig.compactionPlanInstant = "002,003";
123+
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig));
124+
125+
compactionConfig.compactionPlanInstant = "002,005";
126+
assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig));
127+
122128
compactionConfig.compactionPlanInstant = "005";
123-
assertThrows(HoodieException.class, () -> strategy.select(pendingCompactionTimeline, compactionConfig));
129+
assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline, compactionConfig));
124130
}
125131

126132
private void assertHoodieInstantsEquals(HoodieInstant[] expected, List<HoodieInstant> actual) {

0 commit comments

Comments
 (0)