diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java index 717c414a29bcf..bc157ec51c074 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java @@ -32,6 +32,8 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -41,7 +43,7 @@ * to check S3 files activity over time. The hudi table created by this source is consumed by * {@link S3EventsHoodieIncrSource} to apply changes to the hudi table corresponding to user data. */ -public class S3EventsSource extends RowSource { +public class S3EventsSource extends RowSource implements Closeable { private final S3EventsMetaSelector pathSelector; private final List processedMessages = new ArrayList<>(); @@ -79,6 +81,12 @@ public Pair>, String> fetchNextBatch(Option lastCkpt } } + @Override + public void close() throws IOException { + // close resource + this.sqs.shutdown(); + } + @Override public void onCommit(String lastCkptStr) { pathSelector.deleteProcessedMessages(sqs, pathSelector.queueUrl, processedMessages);