Skip to content

Commit 08c966c

Browse files
committed
Upgrade Kafka to 4.1.1
Change-Id: I605570ae67238e2a9270531628625045cdbbccdd
1 parent e30eaf3 commit 08c966c

7 files changed

Lines changed: 32 additions & 37 deletions

File tree

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ Cruise Control for Apache Kafka
5353
`2.6` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.11+`), `2.7` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.36+`),
5454
`2.8` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.66+`), `3.0` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.85+`),
5555
`3.1` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.85+`), `3.8` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.142+`),
56-
`3.9` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.143+`), and `4.0` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.144+`)
56+
`3.9` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.143+`), `4.0` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.144+`),
57+
and `4.1.1` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.146+`)
5758
* The `migrate_to_kafka_2_4` branch of Cruise Control is compatible with Apache Kafka `2.4` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.4.*`).
5859
* The `kafka_2_0_to_2_3` branch (deprecated) of Cruise Control is compatible with Apache Kafka `2.0`, `2.1`, `2.2`, and `2.3` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.0.*`).
5960
* The `kafka_0_11_and_1_0` branch (deprecated) of Cruise Control is compatible with Apache Kafka `0.11.0.0`, `1.0`, and `1.1` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `0.1.*`).

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.Map;
1818
import java.util.Properties;
1919
import java.util.Set;
20-
import java.util.concurrent.TimeoutException;
2120
import java.util.concurrent.atomic.AtomicInteger;
2221
import java.util.regex.Pattern;
2322
import org.apache.kafka.clients.CommonClientConfigs;
@@ -201,8 +200,8 @@ public void testReportingMetrics() {
201200
}
202201

203202
@Test
204-
public void testUpdatingMetricsTopicConfig() throws InterruptedException, TimeoutException {
205-
TopicDescription topicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30), td -> true);
203+
public void testUpdatingMetricsTopicConfig() {
204+
TopicDescription topicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(60), td -> true);
206205
assertEquals(1, topicDescription.partitions().size());
207206

208207
KafkaContainer broker = _cluster.getBrokers().get(0);
@@ -211,10 +210,11 @@ public void testUpdatingMetricsTopicConfig() throws InterruptedException, Timeou
211210
broker.stop();
212211

213212
// Change broker config
213+
int newPartitionCount = 4;
214214
Map<Object, Object> brokerConfig = _brokerConfigs.get(0);
215215
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
216-
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "2");
217-
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
216+
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, String.valueOf(newPartitionCount));
217+
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, String.valueOf(newPartitionCount));
218218

219219
_cluster.overrideBrokerConfig(broker, brokerConfig);
220220

@@ -223,10 +223,10 @@ public void testUpdatingMetricsTopicConfig() throws InterruptedException, Timeou
223223

224224
// Wait for topic metadata configuration change to propagate
225225
int oldPartitionCount = topicDescription.partitions().size();
226-
TopicDescription newTopicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30),
226+
TopicDescription newTopicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(180),
227227
td -> td.partitions().size() != oldPartitionCount);
228228

229-
assertEquals(2, newTopicDescription.partitions().size());
229+
assertEquals(newPartitionCount, newTopicDescription.partitions().size());
230230
}
231231

232232
@Test

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.apache.kafka.clients.admin.TopicDescription;
1414
import org.apache.kafka.common.Uuid;
1515
import org.apache.kafka.common.config.types.Password;
16-
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
16+
import org.apache.kafka.common.errors.InvalidConfigurationException;
1717
import org.testcontainers.containers.GenericContainer;
1818
import org.testcontainers.containers.Network;
1919
import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
@@ -55,7 +55,7 @@
5555
* </ul>
5656
*/
5757
public class CCContainerizedKraftCluster implements Startable {
58-
private static final String KAFKA_IMAGE = System.getenv().getOrDefault("KAFKA_IMAGE", "apache/kafka:3.9.1");
58+
private static final String KAFKA_IMAGE = System.getenv().getOrDefault("KAFKA_IMAGE", "apache/kafka:4.1.1");
5959
/**
6060
* Determines the hostname used by containers to connect to services running on the host machine.
6161
* Required for CI environments like CircleCI, where the Docker executor relies on a specific hostname
@@ -156,6 +156,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
156156
.withNetwork(NETWORK)
157157
.withNetworkAliases(networkAlias)
158158
.withExposedPorts(CONTAINER_EXTERNAL_LISTENER_PORT)
159+
.withCreateContainerCmdModifier(cmd -> cmd.withUser("root"))
159160
.withEnv("CLUSTER_ID", clusterId)
160161
// Uncomment the following line when debugging Kafka cluster problems.
161162
//.withLogConsumer(outputFrame -> System.out.print(networkAlias + " | " + outputFrame.getUtf8String()))
@@ -353,16 +354,20 @@ public TopicDescription waitForTopicMetadata(String topicName, Duration timeout,
353354
return _adminClient.describeTopics(Collections.singleton(topicName))
354355
.topicNameValues()
355356
.get(topicName)
356-
.get();
357+
.get(5, TimeUnit.SECONDS);
357358
} catch (ExecutionException e) {
358-
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
359-
// Topic doesn't exist yet, retry
360-
return null;
359+
Throwable cause = e.getCause();
360+
if (cause instanceof InvalidConfigurationException) {
361+
throw new RuntimeException("Fatal error describing topic: " + topicName, cause);
361362
}
362-
throw new RuntimeException("Failed to describe topic: " + topicName, e);
363+
364+
// Transient error fetching metadata (retrying)
365+
return null;
366+
} catch (TimeoutException e) {
367+
return null;
363368
} catch (InterruptedException e) {
364369
Thread.currentThread().interrupt();
365-
throw new RuntimeException("Interrupted while waiting for broker to become ready", e);
370+
throw new RuntimeException("Interrupted while waiting for broker", e);
366371
}
367372
},
368373
td -> td != null && condition.test(td),

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.kafka.common.security.auth.SecurityProtocol;
1717
import org.apache.kafka.common.utils.Time;
1818
import org.apache.kafka.network.SocketServerConfigs;
19+
import org.apache.kafka.raft.MetadataLogConfig;
1920
import org.apache.kafka.server.config.KRaftConfigs;
2021
import org.apache.kafka.server.config.ServerLogConfigs;
2122
import org.slf4j.Logger;
@@ -60,7 +61,7 @@ public CCEmbeddedBroker(Map<Object, Object> config) {
6061
private void parseConfigs(Map<Object, Object> config) {
6162
readLogDirs(config);
6263
_id = Integer.parseInt((String) config.get(KRaftConfigs.NODE_ID_CONFIG));
63-
_metadataLogDir = new File((String) config.get(KRaftConfigs.METADATA_LOG_DIR_CONFIG));
64+
_metadataLogDir = new File((String) config.get(MetadataLogConfig.METADATA_LOG_DIR_CONFIG));
6465

6566
// Bind addresses
6667
String listenersString = (String) config.get(SocketServerConfigs.LISTENERS_CONFIG);

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.kafka.common.security.auth.SecurityProtocol;
1414
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
1515
import org.apache.kafka.network.SocketServerConfigs;
16+
import org.apache.kafka.raft.MetadataLogConfig;
1617
import org.apache.kafka.raft.QuorumConfig;
1718
import org.apache.kafka.server.config.KRaftConfigs;
1819
import org.apache.kafka.server.config.ReplicationConfigs;
@@ -41,7 +42,6 @@ public class CCEmbeddedBrokerBuilder {
4142
//feature control
4243
private boolean _enableControlledShutdown;
4344
private boolean _enableDeleteTopic;
44-
private boolean _enableLogCleaner;
4545
//resource management
4646
// 2MB
4747
private long _logCleanerDedupBufferSize = 2097152;
@@ -210,17 +210,6 @@ public CCEmbeddedBrokerBuilder enableDeleteTopic(boolean enableDeleteTopic) {
210210
return this;
211211
}
212212

213-
/**
214-
* Enable log cleaner.
215-
*
216-
* @param enableLogCleaner {@code true} to enable log cleaner, {@code false} otherwise.
217-
* @return This.
218-
*/
219-
public CCEmbeddedBrokerBuilder enableLogCleaner(boolean enableLogCleaner) {
220-
_enableLogCleaner = enableLogCleaner;
221-
return this;
222-
}
223-
224213
/**
225214
* Set log cleaner dedup buffer size.
226215
* @param logCleanerDedupBufferSize log cleaner dedup buffer size.
@@ -285,13 +274,12 @@ public Map<Object, Object> buildConfig() {
285274
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL");
286275
props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString());
287276
props.put(ServerLogConfigs.LOG_DIR_CONFIG, _logDirectory.getAbsolutePath());
288-
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath());
277+
props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath());
289278
props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
290279
props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
291280
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, Boolean.toString(_enableControlledShutdown));
292281
props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, Boolean.toString(_enableDeleteTopic));
293282
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, Long.toString(_logCleanerDedupBufferSize));
294-
props.put(CleanerConfig.LOG_CLEANER_ENABLE_PROP, Boolean.toString(_enableLogCleaner));
295283
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
296284
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
297285
if (_rack != null) {

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaRaftServer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;
66

7-
import kafka.log.UnifiedLog;
87
import kafka.metrics.KafkaMetricsReporter;
98
import kafka.server.BrokerServer;
109
import kafka.server.ControllerServer;
@@ -34,6 +33,7 @@
3433
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.Loader;
3534
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag;
3635
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
36+
import org.apache.kafka.raft.MetadataLogConfig;
3737
import org.apache.kafka.raft.QuorumConfig;
3838
import org.apache.kafka.server.ProcessRole;
3939
import org.apache.kafka.server.ServerSocketFactory;
@@ -42,9 +42,9 @@
4242
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
4343
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
4444
import org.apache.kafka.storage.internals.log.LogConfig;
45+
import org.apache.kafka.storage.internals.log.UnifiedLog;
4546
import org.slf4j.Logger;
4647
import org.slf4j.LoggerFactory;
47-
import scala.jdk.javaapi.CollectionConverters;
4848
import java.io.File;
4949
import java.io.FileOutputStream;
5050
import java.io.IOException;
@@ -165,7 +165,7 @@ public int boundControllerPort() throws ExecutionException, InterruptedException
165165
private static MetaTuple initializeLogDirs(KafkaConfig config, String logPrefix) {
166166
Loader loader = new Loader();
167167
loader.addMetadataLogDir(config.metadataLogDir());
168-
loader.addLogDirs(CollectionConverters.asJava(config.logDirs()));
168+
loader.addLogDirs(config.logDirs());
169169

170170
// Load the MetaPropertiesEnsemble
171171
MetaPropertiesEnsemble initialMetaPropsEnsemble;
@@ -222,7 +222,7 @@ private static MetaTuple initializeLogDirs(KafkaConfig config, String logPrefix)
222222

223223
private void initializeMetaData(KafkaConfig config) {
224224
Set<File> allLogDirs = new HashSet<>(readLogDirs(config));
225-
allLogDirs.add(new File(config.getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG)));
225+
allLogDirs.add(new File(config.getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)));
226226
for (File logDir : allLogDirs) {
227227
File metaPropsFile = new File(logDir, "meta.properties");
228228
if (!metaPropsFile.exists()) {
@@ -240,7 +240,7 @@ private void initializeMetaData(KafkaConfig config) {
240240
properties.setProperty(CLUSTER_ID_CONFIG, _clusterId);
241241
properties.setProperty(KRaftConfigs.NODE_ID_CONFIG, config.get(KRaftConfigs.NODE_ID_CONFIG).toString());
242242
properties.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, config.getString(ServerLogConfigs.LOG_DIR_CONFIG));
243-
properties.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, config.getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG));
243+
properties.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, config.getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG));
244244

245245
try (FileOutputStream out = new FileOutputStream(metaPropsFile)) {
246246
properties.store(out, "Meta Properties");

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ org.gradle.daemon=false
22
org.gradle.parallel=false
33
org.gradle.jvmargs=-Xms512m -Xmx512m
44
scalaVersion=2.13.13
5-
kafkaVersion=4.0.0
5+
kafkaVersion=4.1.1
66
nettyVersion=4.1.118.Final
77
jettyVersion=9.4.56.v20240826
88
vertxVersion=4.5.8

0 commit comments

Comments
 (0)