Skip to content

Commit 1ef04a8

Browse files
committed
Migrate Executor off private Kafka APIs
Signed-off-by: Kyle Liberti <kliberti.us@gmail.com>
1 parent 2c9e746 commit 1ef04a8

File tree

7 files changed

+119
-53
lines changed

7 files changed

+119
-53
lines changed

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -396,25 +396,6 @@ public void shutDownBroker(int brokerId) {
396396
timeout,
397397
String.format("Broker %s did not shutdown properly.", brokerId)
398398
);
399-
400-
// Wait until describeLogDirs fails
401-
waitUntil(
402-
() -> {
403-
try {
404-
_adminClient.describeLogDirs(Collections.singletonList(brokerId))
405-
.allDescriptions()
406-
.get(5, TimeUnit.SECONDS);
407-
return false;
408-
} catch (InterruptedException ie) {
409-
throw new RuntimeException(ie);
410-
} catch (Exception e) {
411-
return true;
412-
}
413-
},
414-
result -> result,
415-
timeout,
416-
String.format("Broker %s did not fully shut down (logDirs RPC still succeeds).", brokerId)
417-
);
418399
}
419400

420401
public static class BrokerWaitStrategy extends AbstractWaitStrategy {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
3+
*/
4+
5+
package com.linkedin.kafka.cruisecontrol.common;
6+
7+
import java.util.ArrayList;
8+
import java.util.Collection;
9+
import java.util.Collections;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Set;
13+
import java.util.concurrent.ExecutionException;
14+
import org.apache.kafka.clients.admin.Admin;
15+
import org.apache.kafka.clients.admin.DescribeClusterResult;
16+
import org.apache.kafka.clients.admin.TopicDescription;
17+
import org.apache.kafka.common.Cluster;
18+
import org.apache.kafka.common.Node;
19+
import org.apache.kafka.common.PartitionInfo;
20+
import org.apache.kafka.common.TopicPartitionInfo;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* Client for fetching Kafka cluster metadata using Kafka Admin APIs.
26+
*
27+
* This replaces the use of MetadataClient which relies on internal Kafka APIs.
28+
*/
29+
public class MetadataAdminClient {
30+
private static final Logger LOG = LoggerFactory.getLogger(MetadataAdminClient.class);
31+
32+
private final Admin _adminClient;
33+
34+
/**
35+
* Creates a new MetadataAdminClient.
36+
*
37+
* @param adminClient The AdminClient to use for fetching cluster metadata.
38+
*/
39+
public MetadataAdminClient(Admin adminClient) {
40+
_adminClient = adminClient;
41+
}
42+
43+
/**
44+
* Close adminClient
45+
*/
46+
public void close() {
47+
if (_adminClient != null) {
48+
try {
49+
_adminClient.close();
50+
} catch (Exception e) {
51+
LOG.warn("Failed to close AdminClient", e);
52+
}
53+
}
54+
}
55+
56+
/**
57+
* Get cluster metadata.
58+
*
59+
* @return Cluster containing the cluster metadata.
60+
*/
61+
public Cluster cluster() {
62+
try {
63+
DescribeClusterResult describeResult = _adminClient.describeCluster();
64+
Collection<Node> nodes = describeResult.nodes().get();
65+
String clusterId = describeResult.clusterId().get();
66+
67+
Set<String> topicNames = _adminClient.listTopics()
68+
.names().get();
69+
70+
Map<String, TopicDescription> topicDescriptions = _adminClient.describeTopics(topicNames)
71+
.allTopicNames().get();
72+
73+
List<PartitionInfo> partitionInfos = new ArrayList<>();
74+
for (TopicDescription desc : topicDescriptions.values()) {
75+
for (TopicPartitionInfo partInfo : desc.partitions()) {
76+
Node leader = partInfo.leader();
77+
Node[] replicas = partInfo.replicas().toArray(Node[]::new);
78+
Node[] isr = partInfo.isr().toArray(Node[]::new);
79+
partitionInfos.add(new PartitionInfo(desc.name(), partInfo.partition(), leader, replicas, isr));
80+
}
81+
}
82+
83+
return new Cluster(clusterId, nodes, partitionInfos, Collections.emptySet(), Collections.emptySet());
84+
} catch (ExecutionException | InterruptedException e) {
85+
LOG.error("Failed to fetch cluster metadata", e);
86+
throw new RuntimeException("Failed to fetch cluster metadata", e);
87+
}
88+
}
89+
}

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/common/MetadataClient.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29-
29+
/**
30+
* Legacy client for fetching Kafka cluster metadata.
31+
*
32+
* @deprecated This class is deprecated and will be removed in a future release in favor of {@link MetadataAdminClient}
33+
* which uses public Kafka APIs.
34+
*/
35+
@Deprecated
3036
public class MetadataClient {
3137
private static final Logger LOG = LoggerFactory.getLogger(MetadataClient.class);
3238
private static final LogContext LOG_CONTEXT = new LogContext();

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.TimeoutException;
3030
import javax.annotation.Nullable;
3131
import org.apache.kafka.clients.CommonClientConfigs;
32+
import org.apache.kafka.clients.admin.Admin;
3233
import org.apache.kafka.clients.admin.AdminClient;
3334
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
3435
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
@@ -347,7 +348,7 @@ public static void populateMinIsrState(Cluster cluster,
347348
* @param adminClient The adminClient to ask for ongoing partition reassignments.
348349
* @return The set of {@link TopicPartition partitions} that are being reassigned.
349350
*/
350-
public static Set<TopicPartition> partitionsBeingReassigned(AdminClient adminClient)
351+
public static Set<TopicPartition> partitionsBeingReassigned(Admin adminClient)
351352
throws InterruptedException, ExecutionException, TimeoutException {
352353
return ongoingPartitionReassignments(adminClient).keySet();
353354
}
@@ -362,7 +363,7 @@ public static Set<TopicPartition> partitionsBeingReassigned(AdminClient adminCli
362363
* @param adminClient The adminClient to ask for ongoing partition reassignments.
363364
* @return The map of {@link PartitionReassignment reassignment} by {@link TopicPartition partitions}.
364365
*/
365-
public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(AdminClient adminClient)
366+
public static Map<TopicPartition, PartitionReassignment> ongoingPartitionReassignments(Admin adminClient)
366367
throws InterruptedException, ExecutionException, TimeoutException {
367368
Map<TopicPartition, PartitionReassignment> partitionReassignments = null;
368369
int attempts = 0;
@@ -404,7 +405,7 @@ private static Optional<NewPartitionReassignment> reassignmentValue(List<Integer
404405
* @param tasks Preferred leader election tasks to execute.
405406
* @return The {@link ElectLeadersResult result} of preferred leader election request -- cannot be {@code null}.
406407
*/
407-
public static ElectLeadersResult submitPreferredLeaderElection(AdminClient adminClient, List<ExecutionTask> tasks) {
408+
public static ElectLeadersResult submitPreferredLeaderElection(Admin adminClient, List<ExecutionTask> tasks) {
408409
if (validateNotNull(tasks, "Tasks to execute cannot be null.").isEmpty()) {
409410
throw new IllegalArgumentException("Tasks to execute cannot be empty.");
410411
}
@@ -442,7 +443,7 @@ public static ElectLeadersResult submitPreferredLeaderElection(AdminClient admin
442443
* @param tasks Inter-broker replica reassignment tasks to execute.
443444
* @return The {@link AlterPartitionReassignmentsResult result} of reassignment request -- cannot be {@code null}.
444445
*/
445-
public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(AdminClient adminClient, List<ExecutionTask> tasks) {
446+
public static AlterPartitionReassignmentsResult submitReplicaReassignmentTasks(Admin adminClient, List<ExecutionTask> tasks) {
446447
if (validateNotNull(tasks, "Tasks to execute cannot be null.").isEmpty()) {
447448
throw new IllegalArgumentException("Tasks to execute cannot be empty.");
448449
}

cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
import com.codahale.metrics.Timer;
1010
import com.google.common.util.concurrent.AtomicDouble;
1111
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
12+
import com.linkedin.kafka.cruisecontrol.common.MetadataAdminClient;
1213
import com.linkedin.kafka.cruisecontrol.common.TopicMinIsrCache;
1314
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
1415
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
15-
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
1616
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
1717
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorManager;
1818
import com.linkedin.kafka.cruisecontrol.exception.OngoingExecutionException;
@@ -42,7 +42,6 @@
4242
import java.util.concurrent.atomic.AtomicInteger;
4343
import java.util.function.Supplier;
4444
import java.util.stream.Collectors;
45-
import org.apache.kafka.clients.Metadata;
4645
import org.apache.kafka.clients.admin.AdminClient;
4746
import org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
4847
import org.apache.kafka.clients.admin.DescribeConfigsResult;
@@ -52,8 +51,6 @@
5251
import org.apache.kafka.common.Node;
5352
import org.apache.kafka.common.TopicPartition;
5453
import org.apache.kafka.common.config.ConfigResource;
55-
import org.apache.kafka.common.internals.ClusterResourceListeners;
56-
import org.apache.kafka.common.utils.LogContext;
5754
import org.apache.kafka.common.utils.Time;
5855
import org.slf4j.Logger;
5956
import org.slf4j.LoggerFactory;
@@ -87,7 +84,7 @@ public class Executor {
8784
private static final long EXECUTION_PROGRESS_CHECK_INTERVAL_ADJUSTING_MS = 1000;
8885
// The execution progress is controlled by the ExecutionTaskManager.
8986
private final ExecutionTaskManager _executionTaskManager;
90-
private final MetadataClient _metadataClient;
87+
private final MetadataAdminClient _metadataClient;
9188
private volatile long _executionProgressCheckIntervalMs;
9289
private final long _defaultExecutionProgressCheckIntervalMs;
9390
private Long _requestedExecutionProgressCheckIntervalMs;
@@ -168,7 +165,7 @@ public Executor(KafkaCruiseControlConfig config,
168165
Executor(KafkaCruiseControlConfig config,
169166
Time time,
170167
MetricRegistry dropwizardMetricRegistry,
171-
MetadataClient metadataClient,
168+
MetadataAdminClient metadataClient,
172169
ExecutorNotifier executorNotifier,
173170
AnomalyDetectorManager anomalyDetectorManager) {
174171
_numExecutionStopped = new AtomicInteger(0);
@@ -189,15 +186,7 @@ public Executor(KafkaCruiseControlConfig config,
189186
_executionTaskManager = new ExecutionTaskManager(_adminClient, dropwizardMetricRegistry, time, config);
190187
// Register gauge sensors.
191188
registerGaugeSensors(dropwizardMetricRegistry);
192-
_metadataClient = metadataClient != null ? metadataClient
193-
: new MetadataClient(config,
194-
new Metadata(ExecutionUtils.METADATA_REFRESH_BACKOFF,
195-
ExecutionUtils.METADATA_REFRESH_BACKOFF_MAX,
196-
ExecutionUtils.METADATA_EXPIRY_MS,
197-
new LogContext(),
198-
new ClusterResourceListeners()),
199-
-1L,
200-
time);
189+
_metadataClient = metadataClient != null ? metadataClient : new MetadataAdminClient(_adminClient);
201190
_defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
202191
_executionProgressCheckIntervalMs = _defaultExecutionProgressCheckIntervalMs;
203192
_leaderMovementTimeoutMs = config.getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG);
@@ -875,7 +864,7 @@ private synchronized void initProposalExecution(Collection<ExecutionProposal> pr
875864
recentlyRemovedBrokers(), isTriggeredByUserRequest);
876865
_executionTaskManager.setExecutionModeForTaskTracker(_isKafkaAssignerMode);
877866
// Get a snapshot of (1) cluster and (2) minIsr with time by topic name.
878-
StrategyOptions strategyOptions = new StrategyOptions.Builder(_metadataClient.refreshMetadata().cluster())
867+
StrategyOptions strategyOptions = new StrategyOptions.Builder(_metadataClient.cluster())
879868
.minIsrWithTimeByTopic(_topicMinIsrCache.minIsrWithTimeByTopic()).build();
880869
_executionTaskManager.addExecutionProposals(proposals, brokersToSkipConcurrencyCheck, strategyOptions, replicaMovementStrategy);
881870
_concurrencyAdjuster.initAdjustment(loadMonitor,
@@ -1799,7 +1788,7 @@ private Cluster getClusterForExecutionProgressCheck() {
17991788
if (LOG.isDebugEnabled()) {
18001789
LOG.debug("Tasks in execution: {}", inExecutionTasks());
18011790
}
1802-
return _metadataClient.refreshMetadata().cluster();
1791+
return _metadataClient.cluster();
18031792
}
18041793

18051794
/**

cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import com.codahale.metrics.MetricRegistry;
88
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
9-
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
9+
import com.linkedin.kafka.cruisecontrol.common.MetadataAdminClient;
1010
import com.linkedin.kafka.cruisecontrol.common.TestConstants;
1111
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigFileResolver;
1212
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
@@ -103,6 +103,8 @@ public int clusterSize() {
103103
@Before
104104
public void setUp() {
105105
Properties adminClientProps = new Properties();
106+
adminClientProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
107+
adminClientProps.setProperty(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "0");
106108
setSecurityConfigs(adminClientProps, "admin");
107109

108110
_cluster = new CCContainerizedKraftCluster(clusterSize(), buildBrokerConfigs(), adminClientProps);
@@ -405,7 +407,7 @@ private static boolean verifyFutureError(Future<?> future, Class<? extends Throw
405407
@Test
406408
public void testSetRequestedExecutionProgressCheckIntervalMs() {
407409
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getExecutorProperties());
408-
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataClient.class),
410+
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataAdminClient.class),
409411
null, EasyMock.mock(AnomalyDetectorManager.class));
410412
long minExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
411413

@@ -419,7 +421,7 @@ public void testSetRequestedExecutionProgressCheckIntervalMs() {
419421
@Test
420422
public void testSetExecutionProgressCheckIntervalMsWithRequestedValue() {
421423
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getExecutorProperties());
422-
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataClient.class),
424+
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataAdminClient.class),
423425
null, EasyMock.mock(AnomalyDetectorManager.class));
424426
long defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
425427
long minExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
@@ -440,7 +442,7 @@ public void testSetExecutionProgressCheckIntervalMsWithRequestedValue() {
440442
@Test
441443
public void testSetExecutionProgressCheckIntervalMsWithNoRequestedValue() {
442444
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getExecutorProperties());
443-
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataClient.class),
445+
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataAdminClient.class),
444446
null, EasyMock.mock(AnomalyDetectorManager.class));
445447
long defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
446448
long minExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
@@ -461,7 +463,7 @@ public void testSetExecutionProgressCheckIntervalMsWithNoRequestedValue() {
461463
@Test
462464
public void testResetExecutionProgressCheckIntervalMs() {
463465
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getExecutorProperties());
464-
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataClient.class),
466+
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataAdminClient.class),
465467
null, EasyMock.mock(AnomalyDetectorManager.class));
466468
long defaultExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
467469
executor.resetExecutionProgressCheckIntervalMs();
@@ -477,8 +479,8 @@ public void testResetExecutionProgressCheckIntervalMs() {
477479
public void testExecutionKnobs() {
478480
KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(getExecutorProperties());
479481
assertThrows(IllegalStateException.class,
480-
() -> new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataClient.class), null, null));
481-
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataClient.class),
482+
() -> new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataAdminClient.class), null, null));
483+
Executor executor = new Executor(config, null, new MetricRegistry(), EasyMock.mock(MetadataAdminClient.class),
482484
null, EasyMock.mock(AnomalyDetectorManager.class));
483485

484486
// Verify correctness of add/drop recently removed/demoted brokers.
@@ -502,7 +504,7 @@ public void testTimeoutAndExecutionStop() throws InterruptedException, OngoingEx
502504

503505
KafkaCruiseControlConfig configs = new KafkaCruiseControlConfig(getExecutorProperties());
504506
Time time = new MockTime();
505-
MetadataClient mockMetadataClient = EasyMock.mock(MetadataClient.class);
507+
MetadataAdminClient mockMetadataClient = EasyMock.mock(MetadataAdminClient.class);
506508
// Fake the metadata to never change so the leader movement will timeout.
507509
Node node0 = new Node(BROKER_ID_0, "host0", 100);
508510
Node node1 = new Node(BROKER_ID_1, "host1", 100);
@@ -512,9 +514,7 @@ public void testTimeoutAndExecutionStop() throws InterruptedException, OngoingEx
512514
PartitionInfo partitionInfo = new PartitionInfo(TP1.topic(), TP1.partition(), node1, replicas, replicas);
513515
Cluster cluster = new Cluster("id", Arrays.asList(node0, node1), Collections.singleton(partitionInfo),
514516
Collections.emptySet(), Collections.emptySet());
515-
MetadataClient.ClusterAndGeneration clusterAndGeneration = new MetadataClient.ClusterAndGeneration(cluster, 0);
516-
EasyMock.expect(mockMetadataClient.refreshMetadata()).andReturn(clusterAndGeneration).anyTimes();
517-
EasyMock.expect(mockMetadataClient.cluster()).andReturn(clusterAndGeneration.cluster()).anyTimes();
517+
EasyMock.expect(mockMetadataClient.cluster()).andReturn(cluster).anyTimes();
518518
LoadMonitor mockLoadMonitor = getMockLoadMonitor();
519519
AnomalyDetectorManager mockAnomalyDetectorManager = getMockAnomalyDetector(RANDOM_UUID, false);
520520
UserTaskManager.UserTaskInfo mockUserTaskInfo = getMockUserTaskInfo();

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ kafkaVersion=4.0.0
66
nettyVersion=4.1.118.Final
77
jettyVersion=9.4.56.v20240826
88
vertxVersion=4.5.8
9-
testcontainersVersion=1.21.3
9+
testcontainersVersion=1.21.4

0 commit comments

Comments
 (0)