Skip to content

Commit f0f2dc3

Browse files
authored
GH-4155: Add deleteTopics() method to KafkaAdmin (#4156)
Fixes #4155 * Add `deleteTopics()` method to `KafkaAdmin` Implement `deleteTopics()` in `KafkaAdmin` and `KafkaAdminOperations` to complete the topic lifecycle management API. The method follows existing patterns with try-with-resources for Admin client management, respects `operationTimeout` configuration, and includes proper exception handling with thread interrupt state restoration. * Update documentation for deleteTopics() method Add `deleteTopics` to the list of `KafkaAdmin` runtime methods in the reference documentation. Signed-off-by: gobeomjun <[email protected]>
1 parent 63613f7 commit f0f2dc3

File tree

4 files changed

+99
-1
lines changed

4 files changed

+99
-1
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/configuring-topics.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,13 @@ The context then fails to initialize.
7373
NOTE: If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the `NewTopic.numPartitions`.
7474

7575
Starting with version 2.7, the `KafkaAdmin` provides methods to create and examine topics at runtime.
76+
Starting with version 4.0, it also provides a method to delete topics.
7677

7778
* `createOrModifyTopics`
7879
* `describeTopics`
80+
* `deleteTopics` (since 4.0)
7981

80-
For more advanced features, you can use the `AdminClient` directly.
82+
For more advanced administrative features, you can use the `AdminClient` directly.
8183
The following example shows how to do so:
8284

8385
[source, java]

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
* @author Anders Swanson
8282
* @author Omer Celik
8383
* @author Choi Wang Gyu
84+
* @author Go Beom Jun
8485
*
8586
* @since 1.3
8687
*/
@@ -391,6 +392,35 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
391392
}
392393
}
393394

395+
/**
396+
* Delete topics from the Kafka cluster.
397+
* @param topicNames the topic names to delete.
398+
* @throws KafkaException if the operation fails.
399+
* @since 4.0
400+
*/
401+
@Override
402+
public void deleteTopics(String... topicNames) {
403+
if (topicNames.length == 0) {
404+
return;
405+
}
406+
try (Admin admin = createAdmin()) {
407+
admin.deleteTopics(Arrays.asList(topicNames))
408+
.all()
409+
.get(this.operationTimeout, TimeUnit.SECONDS);
410+
LOGGER.debug(() -> "Deleted topics: " + Arrays.toString(topicNames));
411+
}
412+
catch (InterruptedException ex) {
413+
Thread.currentThread().interrupt();
414+
throw new KafkaException("Interrupted while deleting topics", ex);
415+
}
416+
catch (TimeoutException ex) {
417+
throw new KafkaException("Timed out waiting to delete topics", ex);
418+
}
419+
catch (ExecutionException ex) {
420+
throw new KafkaException("Failed to delete topics", ex.getCause());
421+
}
422+
}
423+
394424
/**
395425
* Creates a new {@link Admin} client instance using the {@link AdminClient} class.
396426
* @return the new {@link Admin} client instance.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdminOperations.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ public interface KafkaAdminOperations {
5050
*/
5151
Map<String, TopicDescription> describeTopics(String... topicNames);
5252

53+
/**
54+
* Delete topics from the Kafka cluster.
55+
* @param topicNames the topic names to delete.
56+
* @since 4.0
57+
*/
58+
void deleteTopics(String... topicNames);
59+
5360
/**
5461
* Return the cluster id, if available.
5562
* @return the describe cluster id.

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,65 @@ void getAdminConfigWithApplicationNameAsClientId() {
339339
assertThat(kafkaAdmin.getAdminConfig()).containsOnly(Map.entry(AdminClientConfig.CLIENT_ID_CONFIG, "appname-admin-0"));
340340
}
341341

342+
@Test
343+
void testDeleteTopics() {
344+
NewTopic testTopic1 = TopicBuilder.name("test-delete-1")
345+
.partitions(1)
346+
.replicas(1)
347+
.build();
348+
NewTopic testTopic2 = TopicBuilder.name("test-delete-2")
349+
.partitions(1)
350+
.replicas(1)
351+
.build();
352+
353+
this.admin.createOrModifyTopics(testTopic1, testTopic2);
354+
355+
await().atMost(10, TimeUnit.SECONDS).until(() -> {
356+
try {
357+
Map<String, TopicDescription> topics =
358+
this.admin.describeTopics("test-delete-1", "test-delete-2");
359+
return topics.size() == 2;
360+
}
361+
catch (Exception e) {
362+
return false;
363+
}
364+
});
365+
366+
Map<String, TopicDescription> beforeDelete = this.admin.describeTopics("test-delete-1", "test-delete-2");
367+
assertThat(beforeDelete).hasSize(2);
368+
assertThat(beforeDelete).containsKeys("test-delete-1", "test-delete-2");
369+
370+
this.admin.deleteTopics("test-delete-1", "test-delete-2");
371+
372+
await().atMost(10, TimeUnit.SECONDS).until(() -> {
373+
try (AdminClient adminClient = AdminClient.create(this.admin.getConfigurationProperties())) {
374+
DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("test-delete-1", "test-delete-2"));
375+
try {
376+
result.allTopicNames().get(5, TimeUnit.SECONDS);
377+
return false;
378+
}
379+
catch (ExecutionException ex) {
380+
return ex.getCause() instanceof UnknownTopicOrPartitionException;
381+
}
382+
}
383+
catch (InterruptedException | TimeoutException e) {
384+
return false;
385+
}
386+
});
387+
}
388+
389+
@Test
390+
void testDeleteNonExistentTopic() {
391+
assertThat(org.assertj.core.api.Assertions.catchThrowable(() ->
392+
this.admin.deleteTopics("non-existent-topic-12345")
393+
)).isInstanceOf(org.springframework.kafka.KafkaException.class);
394+
}
395+
396+
@Test
397+
void testDeleteTopicsWithEmptyArray() {
398+
this.admin.deleteTopics();
399+
}
400+
342401
@Configuration
343402
public static class Config {
344403

0 commit comments

Comments
 (0)