77import com .github .dockerjava .api .command .InspectContainerResponse ;
88import com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporter ;
99import com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig ;
10+ import com .linkedin .kafka .cruisecontrol .metricsreporter .exception .KafkaTopicDescriptionException ;
1011import org .apache .kafka .clients .CommonClientConfigs ;
1112import org .apache .kafka .clients .admin .Admin ;
12- import org .apache .kafka .clients .admin .DescribeClusterResult ;
13+ import org .apache .kafka .clients .admin .AdminClient ;
14+ import org .apache .kafka .clients .admin .TopicDescription ;
1315import org .apache .kafka .common .Uuid ;
1416import org .apache .kafka .common .config .types .Password ;
17+ import org .apache .kafka .common .errors .UnknownTopicOrPartitionException ;
1518import org .testcontainers .containers .GenericContainer ;
1619import org .testcontainers .containers .Network ;
1720import org .testcontainers .containers .wait .strategy .AbstractWaitStrategy ;
3336import java .util .concurrent .ExecutionException ;
3437import java .util .concurrent .TimeUnit ;
3538import java .util .concurrent .TimeoutException ;
39+ import java .util .function .Predicate ;
40+ import java .util .function .Supplier ;
3641import java .util .stream .Collectors ;
3742import java .util .stream .IntStream ;
3843
44+ import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporter .getTopicDescription ;
3945import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG ;
4046import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_TOPIC_CONFIG ;
4147import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .DEFAULT_CRUISE_CONTROL_METRICS_TOPIC ;
@@ -79,6 +85,7 @@ public class CCContainerizedKraftCluster implements Startable {
7985 private final List <KafkaContainer > _brokers ;
8086 private final String _bootstrapServers ;
8187 private final List <String > _brokerAddressList ;
88+ private final Admin _adminClient ;
8289
8390 public CCContainerizedKraftCluster (int numOfBrokers , List <Map <Object , Object >> brokerConfigs , Properties adminClientProps ) {
8491 if (numOfBrokers <= 0 ) {
@@ -105,6 +112,7 @@ public CCContainerizedKraftCluster(int numOfBrokers, List<Map<Object, Object>> b
105112 Properties adminClientPropsWithBootstrapAddress = new Properties ();
106113 adminClientPropsWithBootstrapAddress .putAll (adminClientProps );
107114 adminClientPropsWithBootstrapAddress .setProperty (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG , _bootstrapServers );
115+ this ._adminClient = Admin .create (adminClientPropsWithBootstrapAddress );
108116
109117 this ._brokers =
110118 IntStream
@@ -154,7 +162,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
154162 //.withLogConsumer(outputFrame -> System.out.print(networkAlias + " | " + outputFrame.getUtf8String()))
155163 // Uncomment the following line when debugging SSL connection problems.
156164 //.withEnv("KAFKA_OPTS", "-Djavax.net.debug=ssl,handshake")
157- .waitingFor (new BrokerWaitStrategy (brokerNum , metricsTopic , adminClientPropsWithBootstrapAddress )
165+ .waitingFor (new BrokerWaitStrategy (brokerNum , metricsTopic , _adminClient )
158166 .withStartupTimeout (Duration .ofMinutes (1 ))
159167 );
160168 kafkaContainer .setPortBindings (List .of (containerHostPort + ":" + CONTAINER_EXTERNAL_LISTENER_PORT ));
@@ -300,49 +308,133 @@ public void start() {
300308
301309 @ Override
302310 public void stop () {
311+ this ._adminClient .close ();
303312 this ._brokers .stream ().parallel ().forEach (GenericContainer ::close );
304313 }
305314
315+ private static <T > T waitUntil (Supplier <T > supplier , Predicate <T > condition , Duration timeout , String timeoutMessage ) {
316+ long deadline = System .currentTimeMillis () + timeout .toMillis ();
317+
318+ while (System .currentTimeMillis () < deadline ) {
319+ T value ;
320+ try {
321+ value = supplier .get ();
322+ } catch (Exception e ) {
323+ throw new RuntimeException (e );
324+ }
325+
326+ if (condition .test (value )) {
327+ return value ;
328+ }
329+
330+ try {
331+ Thread .sleep (500 );
332+ } catch (InterruptedException e ) {
333+ Thread .currentThread ().interrupt ();
334+ throw new RuntimeException (e );
335+ }
336+ }
337+
338+ throw new RuntimeException (timeoutMessage );
339+ }
340+
341+ /**
342+ * Waits until the metadata for a Kafka topic meets the specified condition,
343+ * or until the given timeout period elapses.
344+ *
345+ * @param topicName the name of the topic whose metadata should be monitored
346+ * @param timeout the maximum duration to wait for the condition to be satisfied
347+ * @param condition a {@link Predicate} that tests the {@link TopicDescription} for readiness
348+ * @return the {@link TopicDescription} once the condition evaluates to {@code true}
349+ */
350+ public TopicDescription waitForTopicMetadata (String topicName , Duration timeout , Predicate <TopicDescription > condition ) {
351+ return waitUntil (
352+ () -> {
353+ try {
354+ return getTopicDescription ((AdminClient ) _adminClient , topicName );
355+ } catch (KafkaTopicDescriptionException e ) {
356+ if (e .getCause () instanceof UnknownTopicOrPartitionException ) {
357+ return null ;
358+ }
359+ throw new RuntimeException ("Failed to describe topic: " + topicName , e );
360+ }
361+ },
362+ td -> td != null && condition .test (td ),
363+ timeout ,
364+ String .format ("Timeout waiting for topic %s metadata to be ready." , topicName )
365+ );
366+ }
367+
368+ /**
369+ * Shuts down the Kafka broker with the given broker ID and waits for it
370+ * to be removed from the cluster metadata.
371+ *
372+ * @param brokerId the ID of the broker to wait for shutdown.
373+ */
374+ public void shutDownBroker (int brokerId ) {
375+ Duration timeout = Duration .ofSeconds (30 );
376+ _brokers .get (brokerId ).stop ();
377+
378+ // Wait until brokerId is no longer in the cluster metadata.
379+ waitUntil (
380+ () -> {
381+ try {
382+ return _adminClient .describeCluster ().nodes ().get ().stream ()
383+ .map (node -> String .valueOf (node .id ()))
384+ .collect (Collectors .toSet ());
385+ } catch (InterruptedException | ExecutionException e ) {
386+ throw new RuntimeException (e );
387+ }
388+ },
389+ brokerIds -> !brokerIds .contains (String .valueOf (brokerId )),
390+ timeout ,
391+ String .format ("Broker %s was not removed from cluster metadata after shutdown." , brokerId )
392+ );
393+ }
394+
306395 public static class BrokerWaitStrategy extends AbstractWaitStrategy {
307396 private final int _brokerId ;
308397 private final String _metricsTopic ;
309- private final Properties _adminClientProps ;
398+ private final Admin _adminClient ;
310399
311- public BrokerWaitStrategy (int brokerId , String metricsTopic , Properties adminClientProps ) {
400+ public BrokerWaitStrategy (int brokerId , String metricsTopic , Admin adminClient ) {
312401 this ._brokerId = brokerId ;
313402 this ._metricsTopic = metricsTopic ;
314- this ._adminClientProps = adminClientProps ;
403+ this ._adminClient = adminClient ;
315404 }
316405
317406 @ Override
318407 protected void waitUntilReady () {
319- long deadline = System .currentTimeMillis () + startupTimeout .toMillis ();
320-
321- try (Admin adminClient = Admin .create (_adminClientProps )) {
322- while (System .currentTimeMillis () < deadline ) {
408+ waitUntil (
409+ () -> {
410+ // Returns true if broker is online and metrics topic exists
323411 try {
324- DescribeClusterResult cluster = adminClient .describeCluster ();
325- boolean brokerOnline = cluster .nodes ().get ().stream ().anyMatch (node -> node .id () == _brokerId );
412+ boolean brokerOnline = _adminClient .describeCluster ().nodes ().get ()
413+ .stream ()
414+ .anyMatch (node -> node .id () == _brokerId );
326415
327416 if (!brokerOnline ) {
328- Thread .sleep (500 );
329- continue ;
417+ return false ;
330418 }
331419
332- adminClient .describeTopics (Collections .singleton (_metricsTopic )).allTopicNames ().get (5 , TimeUnit .SECONDS );
333- return ;
420+ return _adminClient .describeTopics (Collections .singleton (_metricsTopic ))
421+ .allTopicNames ()
422+ .get (5 , TimeUnit .SECONDS )
423+ .containsKey (_metricsTopic );
334424
335425 } catch (InterruptedException e ) {
336426 // Restore interrupt status.
337427 Thread .currentThread ().interrupt ();
338- throw new RuntimeException ("Interrupted while waiting for broker to become ready" , e );
428+ throw new RuntimeException (e );
339429 } catch (ExecutionException | TimeoutException e ) {
340430 // Kafka broker is not ready yet, ignore and retry.
431+ return false ;
341432 }
342- }
343-
344- throw new RuntimeException (String .format ("Broker %d did not become ready within timeout of %d ms" , _brokerId , startupTimeout .toMillis ()));
345- }
433+ },
434+ ready -> ready ,
435+ startupTimeout ,
436+ String .format ("Broker %d did not become ready within timeout of %d ms" , _brokerId , startupTimeout .toMillis ())
437+ );
346438 }
347439 }
348440}
0 commit comments