Skip to content

Commit 9c5c54e

Browse files
authored
Defaulting min.insync.replicas to 1 when it's not specified by the user (#11883)
Signed-off-by: Paolo Patierno <[email protected]>
1 parent b769941 commit 9c5c54e

File tree

8 files changed

+90
-89
lines changed

8 files changed

+90
-89
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.io.IOException;
1414
import java.io.InputStream;
1515
import java.util.ArrayList;
16+
import java.util.HashMap;
1617
import java.util.HashSet;
1718
import java.util.List;
1819
import java.util.Map;
@@ -31,10 +32,16 @@ public class KafkaConfiguration extends AbstractConfiguration {
3132

3233
private static final List<String> FORBIDDEN_PREFIXES;
3334
private static final List<String> FORBIDDEN_PREFIX_EXCEPTIONS;
35+
private static final Map<String, String> DEFAULTS;
3436

3537
static {
3638
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIXES);
3739
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
40+
41+
DEFAULTS = new HashMap<>(1);
42+
// when users remove "min.insync.replicas" from the Kafka custom resource, the operator is going to force the
43+
// default value (1) regardless of whether ELR (Eligible Leader Replicas) is enabled or disabled
44+
DEFAULTS.put("min.insync.replicas", "1");
3845
}
3946

4047
/**
@@ -198,11 +205,11 @@ public KafkaConfiguration(KafkaConfiguration configuration) {
198205
* @param jsonOptions Json object with configuration options as key ad value pairs.
199206
*/
200207
public KafkaConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
201-
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), Map.of());
208+
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
202209
}
203210

204211
private KafkaConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes) {
205-
super(reconciliation, configuration, forbiddenPrefixes, List.of(), List.of(), Map.of());
212+
super(reconciliation, configuration, forbiddenPrefixes, List.of(), List.of(), DEFAULTS);
206213
}
207214

208215

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaSpecChecker.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class KafkaSpecChecker {
3333

3434
private final KafkaCluster kafkaCluster;
3535
private final String kafkaBrokerVersion;
36+
private final KafkaSpec kafkaSpec;
3637

3738
// This pattern is used to extract the MAJOR.MINOR version from the protocol or format version fields
3839
private final static Pattern MAJOR_MINOR_REGEX = Pattern.compile("(\\d+\\.\\d+).*");
@@ -47,6 +48,7 @@ public class KafkaSpecChecker {
4748
* this class to include awareness of what defaults are applied.
4849
*/
4950
public KafkaSpecChecker(KafkaSpec spec, KafkaVersion.Lookup versions, KafkaCluster kafkaCluster) {
51+
this.kafkaSpec = spec;
5052
this.kafkaCluster = kafkaCluster;
5153

5254
if (spec.getKafka().getVersion() != null) {
@@ -82,8 +84,10 @@ List<Condition> run() {
8284
* @param warnings List to add a warning to, if appropriate.
8385
*/
8486
private void checkKafkaReplicationConfig(List<Condition> warnings) {
85-
String defaultReplicationFactor = kafkaCluster.getConfiguration().getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR);
86-
String minInsyncReplicas = kafkaCluster.getConfiguration().getConfigOption(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
87+
Object defaultReplicationFactor = kafkaSpec.getKafka().getConfig() != null ?
88+
kafkaSpec.getKafka().getConfig().get(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR) : null;
89+
Object minInsyncReplicas = kafkaSpec.getKafka().getConfig() != null ?
90+
kafkaSpec.getKafka().getConfig().get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) : null;
8791

8892
if (defaultReplicationFactor == null && kafkaCluster.brokerNodes().size() > 1) {
8993
warnings.add(StatusUtils.buildWarningCondition("KafkaDefaultReplicationFactor",

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiff.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff {
4242
private final Reconciliation reconciliation;
4343
private final Collection<AlterConfigOp> brokerConfigDiff;
4444
private final Map<String, ConfigModel> configModel;
45-
private final short elrVersion;
4645

4746
/**
4847
* These options are skipped because they contain placeholders
@@ -74,11 +73,9 @@ public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff {
7473
* @param desired Desired configuration
7574
* @param kafkaVersion Kafka version
7675
* @param brokerNodeRef Broker node reference
77-
* @param elrVersion If Eligible Leader Replicas (ELR) is enabled on the Kafka cluster
7876
*/
79-
protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired, KafkaVersion kafkaVersion, NodeRef brokerNodeRef, short elrVersion) {
77+
protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired, KafkaVersion kafkaVersion, NodeRef brokerNodeRef) {
8078
this.reconciliation = reconciliation;
81-
this.elrVersion = elrVersion;
8279
this.configModel = KafkaConfiguration.readConfigModel(kafkaVersion);
8380
this.brokerConfigDiff = diff(brokerNodeRef, desired, brokerConfigs, configModel);
8481
}
@@ -218,9 +215,7 @@ private void removeProperty(Map<String, ConfigModel> configModel, Collection<Alt
218215
} else {
219216
// entry is in current, is not in desired, is not default -> it was using non-default value and was removed
220217
// if the entry was custom, it should be deleted
221-
if ("min.insync.replicas".equals(entry.name()) && elrVersion >= 1) {
222-
LOGGER.traceCr(reconciliation, "min.insync.replicas cannot be deleted when ELR is enabled");
223-
} else if (!isIgnorableProperty(pathValueWithoutSlash, nodeIsController)) {
218+
if (!isIgnorableProperty(pathValueWithoutSlash, nodeIsController)) {
224219
updatedCE.add(new AlterConfigOp(new ConfigEntry(pathValueWithoutSlash, null), AlterConfigOp.OpType.DELETE));
225220
LOGGER.infoCr(reconciliation, "{} not set in desired, unsetting back to default {}", entry.name(), "deleted entry");
226221
} else {

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.kafka.clients.admin.AlterConfigOp;
3535
import org.apache.kafka.clients.admin.AlterConfigsResult;
3636
import org.apache.kafka.clients.admin.Config;
37-
import org.apache.kafka.clients.admin.FeatureMetadata;
3837
import org.apache.kafka.common.KafkaException;
3938
import org.apache.kafka.common.KafkaFuture;
4039
import org.apache.kafka.common.config.ConfigException;
@@ -624,12 +623,8 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
624623
// Always get the broker config. This request gets sent to that specific broker, so it's a proof that we can
625624
// connect to the broker and that it's capable of responding.
626625
Config brokerConfig;
627-
short elrVersion = 0;
628626
try {
629627
brokerConfig = brokerConfig(nodeRef);
630-
FeatureMetadata featureMetadata = featureMetadata();
631-
elrVersion = featureMetadata.finalizedFeatures().get("eligible.leader.replicas.version") == null ?
632-
0 : featureMetadata.finalizedFeatures().get("eligible.leader.replicas.version").maxVersionLevel();
633628
} catch (ForceableProblem e) {
634629
if (restartContext.backOff.done()) {
635630
needsRestart = true;
@@ -641,7 +636,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
641636

642637
if (!needsRestart && allowReconfiguration) {
643638
LOGGER.traceCr(reconciliation, "Pod {}: description {}", nodeRef, brokerConfig);
644-
brokerConfigDiff = new KafkaBrokerConfigurationDiff(reconciliation, brokerConfig, kafkaConfigProvider.apply(nodeRef.nodeId()), kafkaVersion, nodeRef, elrVersion);
639+
brokerConfigDiff = new KafkaBrokerConfigurationDiff(reconciliation, brokerConfig, kafkaConfigProvider.apply(nodeRef.nodeId()), kafkaVersion, nodeRef);
645640

646641
if (brokerConfigDiff.getDiffSize() > 0) {
647642
if (brokerConfigDiff.canBeUpdatedDynamically()) {
@@ -662,17 +657,6 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
662657
restartContext.brokerConfigDiff = brokerConfigDiff;
663658
}
664659

665-
/**
666-
* Returns the information about the features within the Kafka Cluster
667-
* @return information about the features
668-
*/
669-
/* test */ FeatureMetadata featureMetadata() throws ForceableProblem, InterruptedException {
670-
return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeFeatures().featureMetadata()),
671-
30, TimeUnit.SECONDS,
672-
error -> new ForceableProblem("Error getting feature metadata", error)
673-
);
674-
}
675-
676660
/**
677661
* Returns a config of the given broker.
678662
* @param nodeRef The reference of the broker.

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,8 @@ public void testNullUserConfiguration() {
499499
"config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
500500
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
501501
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
502-
"config.providers.strimzidir.param.allowed.paths=/opt/kafka"));
502+
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
503+
"min.insync.replicas=1"));
503504
}
504505

505506
@Test
@@ -516,7 +517,8 @@ public void testNullUserConfigurationAndCCReporter() {
516517
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
517518
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
518519
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
519-
"metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"));
520+
"metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter",
521+
"min.insync.replicas=1"));
520522
}
521523

522524
@Test
@@ -535,7 +537,8 @@ public void testEmptyUserConfiguration() {
535537
"config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
536538
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
537539
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
538-
"config.providers.strimzidir.param.allowed.paths=/opt/kafka"));
540+
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
541+
"min.insync.replicas=1"));
539542
}
540543

541544
@Test
@@ -563,7 +566,8 @@ public void testUserConfiguration() {
563566
"auto.create.topics.enable=false",
564567
"offsets.topic.replication.factor=3",
565568
"transaction.state.log.replication.factor=3",
566-
"transaction.state.log.min.isr=2"));
569+
"transaction.state.log.min.isr=2",
570+
"min.insync.replicas=1"));
567571
}
568572

569573
@Test
@@ -587,7 +591,8 @@ public void testUserConfigurationWithConfigProviders() {
587591
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
588592
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
589593
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
590-
"config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"));
594+
"config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
595+
"min.insync.replicas=1"));
591596

592597
// Controller
593598
configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, new NodeRef("my-cluster-kafka-3", 3, "kafka", true, false))
@@ -598,7 +603,8 @@ public void testUserConfigurationWithConfigProviders() {
598603
"config.providers=env,strimzienv",
599604
"config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
600605
"config.providers.strimzienv.param.allowlist.pattern=.*",
601-
"config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider"));
606+
"config.providers.env.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
607+
"min.insync.replicas=1"));
602608
}
603609

604610
@Test
@@ -631,7 +637,8 @@ public void testNullUserConfigurationWithJmxMetricsReporter() {
631637
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
632638
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
633639
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
634-
"metric.reporters=org.apache.kafka.common.metrics.JmxReporter"));
640+
"metric.reporters=org.apache.kafka.common.metrics.JmxReporter",
641+
"min.insync.replicas=1"));
635642
}
636643

637644
@Test
@@ -647,6 +654,7 @@ public void testNullUserConfigurationWithStrimziMetricsReporter() {
647654
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
648655
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
649656
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
657+
"min.insync.replicas=1",
650658
"metric.reporters=" + StrimziMetricsReporterConfig.KAFKA_CLASS,
651659
"kafka.metrics.reporters=" + StrimziMetricsReporterConfig.YAMMER_CLASS));
652660
}
@@ -666,6 +674,7 @@ public void testNullUserConfigurationWithCruiseControlAndStrimziMetricsReporters
666674
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
667675
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
668676
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
677+
"min.insync.replicas=1",
669678
"metric.reporters=" + CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER
670679
+ "," + StrimziMetricsReporterConfig.KAFKA_CLASS,
671680
"kafka.metrics.reporters=" + StrimziMetricsReporterConfig.YAMMER_CLASS));
@@ -688,7 +697,8 @@ public void testNullUserConfigurationWithCruiseControlAndJmxAndStrimziMetricsRep
688697
"metric.reporters=" + CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER
689698
+ ",org.apache.kafka.common.metrics.JmxReporter"
690699
+ "," + StrimziMetricsReporterConfig.KAFKA_CLASS,
691-
"kafka.metrics.reporters=" + StrimziMetricsReporterConfig.YAMMER_CLASS));
700+
"kafka.metrics.reporters=" + StrimziMetricsReporterConfig.YAMMER_CLASS,
701+
"min.insync.replicas=1"));
692702
}
693703

694704
static Stream<Arguments> sourceUserConfigWithMetricsReporters() {
@@ -704,7 +714,8 @@ static Stream<Arguments> sourceUserConfigWithMetricsReporters() {
704714
+ "config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider\n"
705715
+ "config.providers.strimzifile.param.allowed.paths=/opt/kafka\n"
706716
+ "config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider\n"
707-
+ "config.providers.strimzidir.param.allowed.paths=/opt/kafka\n";
717+
+ "config.providers.strimzidir.param.allowed.paths=/opt/kafka\n"
718+
+ "min.insync.replicas=1\n";
708719

709720
// testing 8 combinations of 3 boolean values
710721
return Stream.of(
@@ -778,7 +789,8 @@ public void testStrimziMetricsReporterViaUserAndMetricsConfigs() {
778789
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
779790
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
780791
"metric.reporters=" + StrimziMetricsReporterConfig.KAFKA_CLASS,
781-
"kafka.metrics.reporters=" + StrimziMetricsReporterConfig.YAMMER_CLASS));
792+
"kafka.metrics.reporters=" + StrimziMetricsReporterConfig.YAMMER_CLASS,
793+
"min.insync.replicas=1"));
782794
}
783795

784796
@Test
@@ -2617,4 +2629,29 @@ public void testWithKRaftMetadataLogDir() {
26172629
"metadata.log.dir=/my/kraft/metadata/kafka-log2"
26182630
));
26192631
}
2632+
2633+
@Test
2634+
public void testDefaultMinInSyncReplicasWhenNotSpecified() {
2635+
Map<String, Object> userConfiguration = new HashMap<>();
2636+
userConfiguration.put("auto.create.topics.enable", "false");
2637+
userConfiguration.put("offsets.topic.replication.factor", 3);
2638+
2639+
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, userConfiguration.entrySet());
2640+
2641+
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, NODE_REF)
2642+
.withUserConfiguration(kafkaConfiguration, false, false, false)
2643+
.build();
2644+
2645+
assertThat(configuration, isEquivalent("node.id=2",
2646+
"config.providers=strimzienv,strimzifile,strimzidir",
2647+
"config.providers.strimzienv.class=org.apache.kafka.common.config.provider.EnvVarConfigProvider",
2648+
"config.providers.strimzienv.param.allowlist.pattern=.*",
2649+
"config.providers.strimzifile.class=org.apache.kafka.common.config.provider.FileConfigProvider",
2650+
"config.providers.strimzifile.param.allowed.paths=/opt/kafka",
2651+
"config.providers.strimzidir.class=org.apache.kafka.common.config.provider.DirectoryConfigProvider",
2652+
"config.providers.strimzidir.param.allowed.paths=/opt/kafka",
2653+
"min.insync.replicas=1",
2654+
"auto.create.topics.enable=false",
2655+
"offsets.topic.replication.factor=3"));
2656+
}
26202657
}

0 commit comments

Comments
 (0)