Skip to content

Commit f808ddd

Browse files
fengjian428jian.feng
authored andcommitted
[HUDI-4730] Fix batch job cannot clean old commits files (#6515)
* [HUDI-4370] Fix batch job cannot clean old commits files Co-authored-by: jian.feng <jian.feng@shopee.com>
1 parent 6c38663 commit f808ddd

1 file changed

Lines changed: 5 additions & 11 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.hudi.client.HoodieFlinkWriteClient;
2222
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
2323
import org.apache.hudi.configuration.FlinkOptions;
24-
import org.apache.hudi.configuration.OptionsResolver;
2524
import org.apache.hudi.sink.utils.NonThrownExecutor;
2625
import org.apache.hudi.util.StreamerUtil;
2726

@@ -61,16 +60,11 @@ public CleanFunction(Configuration conf) {
6160
@Override
6261
public void open(Configuration parameters) throws Exception {
6362
super.open(parameters);
64-
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
65-
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
66-
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
67-
68-
if (OptionsResolver.isInsertOverwrite(conf)) {
69-
String instantTime = HoodieActiveTimeline.createNewInstantTime();
70-
LOG.info(String.format("exec sync clean with instant time %s...", instantTime));
71-
executor.execute(() -> writeClient.clean(instantTime), "wait for sync cleaning finish");
72-
}
73-
}
63+
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
64+
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
65+
String instantTime = HoodieActiveTimeline.createNewInstantTime();
66+
LOG.info(String.format("exec clean with instant time %s...", instantTime));
67+
executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish");
7468
}
7569

7670
@Override

0 commit comments

Comments
 (0)