diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index 48d4f48989b0a..338352d4b0c93 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.FlinkTables; @@ -134,6 +135,9 @@ private void scheduleCompaction(HoodieFlinkTable table, long checkpointId) th List operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size()); + WriteMarkersFactory + .get(table.getConfig().getMarkersType(), table, compactionInstantTime) + .deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism()); for (CompactionOperation operation : operations) { output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation))); }