diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 1c827517ff144..a59be858bae7b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -21,7 +21,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; @@ -61,16 +60,11 @@ public CleanFunction(Configuration conf) { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); - this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); - - if (OptionsResolver.isInsertOverwrite(conf)) { - String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info(String.format("exec sync clean with instant time %s...", instantTime)); - executor.execute(() -> writeClient.clean(instantTime), "wait for sync cleaning finish"); - } - } + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + LOG.info(String.format("exec clean with instant time %s...", instantTime)); + executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish"); } @Override