44
55package com .linkedin .kafka .cruisecontrol .metricsreporter ;
66
7- import com .linkedin .kafka .cruisecontrol .metricsreporter .exception .KafkaTopicDescriptionException ;
87import com .linkedin .kafka .cruisecontrol .metricsreporter .metric .CruiseControlMetric ;
98import com .linkedin .kafka .cruisecontrol .metricsreporter .metric .MetricSerde ;
109import com .linkedin .kafka .cruisecontrol .metricsreporter .utils .CCContainerizedKraftCluster ;
2019import java .util .Set ;
2120import java .util .concurrent .TimeoutException ;
2221import java .util .concurrent .atomic .AtomicInteger ;
23- import java .util .function .Predicate ;
2422import java .util .regex .Pattern ;
2523import org .apache .kafka .clients .CommonClientConfigs ;
26- import org .apache .kafka .clients .admin .Admin ;
27- import org .apache .kafka .clients .admin .AdminClient ;
2824import org .apache .kafka .clients .admin .TopicDescription ;
2925import org .apache .kafka .clients .consumer .Consumer ;
3026import org .apache .kafka .clients .consumer .ConsumerConfig ;
3632import org .apache .kafka .clients .producer .ProducerConfig ;
3733import org .apache .kafka .clients .producer .ProducerRecord ;
3834import org .apache .kafka .clients .producer .RecordMetadata ;
39- import org .apache .kafka .common .errors .UnknownTopicOrPartitionException ;
4035import org .apache .kafka .common .serialization .StringDeserializer ;
4136import org .junit .After ;
4237import org .junit .Before ;
4540
4641import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporter .DEFAULT_BOOTSTRAP_SERVERS_HOST ;
4742import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporter .DEFAULT_BOOTSTRAP_SERVERS_PORT ;
48- import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporter .getTopicDescription ;
4943import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG ;
5044import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG ;
5145import static com .linkedin .kafka .cruisecontrol .metricsreporter .CruiseControlMetricsReporterConfig .CRUISE_CONTROL_METRICS_TOPIC_CONFIG ;
@@ -206,41 +200,9 @@ public void testReportingMetrics() {
206200 assertEquals ("Expected " + expectedMetricTypes + ", but saw " + metricTypes , expectedMetricTypes , metricTypes );
207201 }
208202
209- private TopicDescription waitForTopicMetadata (Admin adminClient ,
210- Duration timeout ,
211- Predicate <TopicDescription > condition )
212- throws InterruptedException , TimeoutException {
213-
214- long deadline = System .currentTimeMillis () + timeout .toMillis ();
215-
216- while (System .currentTimeMillis () < deadline ) {
217- try {
218- TopicDescription topicDescription = getTopicDescription ((AdminClient ) adminClient , TOPIC );
219-
220- if (condition .test (topicDescription )) {
221- return topicDescription ;
222- }
223- } catch (KafkaTopicDescriptionException e ) {
224- if (!(e .getCause () instanceof UnknownTopicOrPartitionException )) {
225- throw new RuntimeException ("Failed to describe topic: " + TOPIC , e );
226- }
227- // else ignore and retry
228- }
229-
230- Thread .sleep (500 );
231- }
232-
233- throw new TimeoutException ("Timeout waiting for topic metadata condition to be met: " + TOPIC );
234- }
235-
236203 @ Test
237204 public void testUpdatingMetricsTopicConfig () throws InterruptedException , TimeoutException {
238- Properties props = new Properties ();
239- setSecurityConfigs (props , "admin" );
240- props .setProperty (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers ());
241- Admin adminClient = Admin .create (props );
242-
243- TopicDescription topicDescription = waitForTopicMetadata (adminClient , Duration .ofSeconds (30 ), td -> true );
205+ TopicDescription topicDescription = _cluster .waitForTopicMetadata (TOPIC , Duration .ofSeconds (30 ), td -> true );
244206 assertEquals (1 , topicDescription .partitions ().size ());
245207
246208 KafkaContainer broker = _cluster .getBrokers ().get (0 );
@@ -261,7 +223,7 @@ public void testUpdatingMetricsTopicConfig() throws InterruptedException, Timeou
261223
262224 // Wait for topic metadata configuration change to propagate
263225 int oldPartitionCount = topicDescription .partitions ().size ();
264- TopicDescription newTopicDescription = waitForTopicMetadata (adminClient , Duration .ofSeconds (30 ),
226+ TopicDescription newTopicDescription = _cluster . waitForTopicMetadata (TOPIC , Duration .ofSeconds (30 ),
265227 td -> td .partitions ().size () != oldPartitionCount );
266228
267229 assertEquals (2 , newTopicDescription .partitions ().size ());
0 commit comments