@@ -152,10 +152,8 @@ private boolean throttlingEnabled() {
152152 private Set <Integer > getParticipatingBrokers (List <ExecutionProposal > replicaMovementProposals ) {
153153 Set <Integer > participatingBrokers = new TreeSet <>();
154154 for (ExecutionProposal proposal : replicaMovementProposals ) {
155- participatingBrokers .addAll (
156- proposal .oldReplicas ().stream ().map (ReplicaPlacementInfo ::brokerId ).collect (Collectors .toSet ()));
157- participatingBrokers .addAll (
158- proposal .newReplicas ().stream ().map (ReplicaPlacementInfo ::brokerId ).collect (Collectors .toSet ()));
155+ participatingBrokers .addAll (proposal .oldReplicas ().stream ().map (ReplicaPlacementInfo ::brokerId ).collect (Collectors .toSet ()));
156+ participatingBrokers .addAll (proposal .newReplicas ().stream ().map (ReplicaPlacementInfo ::brokerId ).collect (Collectors .toSet ()));
159157 }
160158 participatingBrokers .removeAll (_deadBrokers );
161159 return participatingBrokers ;
@@ -231,7 +229,7 @@ private void setThrottledRateIfNecessary(Set<Integer> brokerIds) throws Executio
231229 bulkOps .put (cf , ops );
232230 }
233231 }
234- applyIncrementalAlterConfigsForBrokers (bulkOps );
232+ changeBrokerConfigs (bulkOps );
235233 }
236234
237235 private void setThrottledReplicas (Map <String , Set <String >> replicasByTopic )
@@ -269,7 +267,7 @@ private void setThrottledReplicas(Map<String, Set<String>> replicasByTopic)
269267 bulkOps .put (cf , ops );
270268 }
271269 }
272- applyIncrementalAlterConfigsForTopics (bulkOps );
270+ changeTopicConfigs (bulkOps );
273271 }
274272
275273 boolean topicExists (String topic ) throws InterruptedException , TimeoutException , ExecutionException {
@@ -356,7 +354,7 @@ private void removeThrottledReplicasFromTopics(Map<String, Set<String>> replicas
356354 bulkOps .put (cf , ops );
357355 }
358356 }
359- applyIncrementalAlterConfigsForTopics (bulkOps );
357+ changeTopicConfigs (bulkOps );
360358 }
361359
362360 private void removeThrottledRateFromBrokers (Set <Integer > brokerIds )
@@ -390,10 +388,10 @@ private void removeThrottledRateFromBrokers(Set<Integer> brokerIds)
390388 bulkOps .put (cf , ops );
391389 }
392390 }
393- applyIncrementalAlterConfigsForBrokers (bulkOps );
391+ changeBrokerConfigs (bulkOps );
394392 }
395393
396- private void applyIncrementalAlterConfigsForBrokers (Map <ConfigResource , Collection <AlterConfigOp >> bulkOps )
394+ private void changeBrokerConfigs (Map <ConfigResource , Collection <AlterConfigOp >> bulkOps )
397395 throws ExecutionException , InterruptedException , TimeoutException {
398396 if (bulkOps == null || bulkOps .isEmpty ()) {
399397 return ;
@@ -403,7 +401,7 @@ private void applyIncrementalAlterConfigsForBrokers(Map<ConfigResource, Collecti
403401 waitForConfigs (bulkOps );
404402 }
405403
406- private void applyIncrementalAlterConfigsForTopics (Map <ConfigResource , Collection <AlterConfigOp >> bulkOps )
404+ private void changeTopicConfigs (Map <ConfigResource , Collection <AlterConfigOp >> bulkOps )
407405 throws ExecutionException , InterruptedException , TimeoutException {
408406 if (bulkOps == null || bulkOps .isEmpty ()) {
409407 return ;
0 commit comments