diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 228b924fc4917..837b06a89a8b6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -180,7 +180,10 @@ public void processElement(StreamRecord element) throws Exc } @Override - public void close() { + public void close() throws Exception { + if (null != this.executor) { + this.executor.close(); + } if (this.writeClient != null) { this.writeClient.cleanHandlesGracefully(); this.writeClient.close(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 14c5c1694142f..f14bc8aa61a70 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -133,4 +133,16 @@ private HoodieWriteConfig reloadWriteConfig() throws Exception { public void setExecutor(NonThrownExecutor executor) { this.executor = executor; } + + @Override + public void close() throws Exception { + if (null != this.executor) { + this.executor.close(); + } + if (null != this.writeClient) { + this.writeClient.cleanHandlesGracefully(); + this.writeClient.close(); + this.writeClient = null; + } + } }