Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,49 @@

import io.javaoperatorsdk.operator.Operator;
import io.strimzi.kafka.access.server.HealthServlet;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Arrays.asList;

/**
* The main operator class for Strimzi Access Operator
*/
public class KafkaAccessOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAccessOperator.class);
private static final int HEALTH_CHECK_PORT = 8080;
private final static String ANY_NAMESPACE = "*";

/**
* Parses namespace configuration from a string
*
* @param namespacesList Comma-separated list of namespaces or "*" for all namespaces
* @return Set of namespace names
*/
private static Set<String> parseNamespaces(String namespacesList) {
Set<String> namespaces;
if (namespacesList.equals(ANY_NAMESPACE)) {
namespaces = Collections.singleton(ANY_NAMESPACE);
} else {
if (namespacesList.trim().equals(ANY_NAMESPACE)) {
namespaces = Collections.singleton(ANY_NAMESPACE);
} else if (namespacesList.matches("(\\s*[a-z0-9.-]+\\s*,)*\\s*[a-z0-9.-]+\\s*")) {
namespaces = new HashSet<>(asList(namespacesList.trim().split("\\s*,+\\s*")));
} else {
throw new InvalidConfigurationException("Not a valid list of namespaces nor the 'any namespace' wildcard "
+ ANY_NAMESPACE);
}
}

return namespaces;
}

/**
* Initializes the operator and runs a servlet for health checking
Expand All @@ -28,8 +59,24 @@ public static void main(final String[] args) {
LOGGER.info("Kafka Access operator starting");
final Operator operator = new Operator(overrider -> overrider
.withUseSSAToPatchPrimaryResource(false));
operator.register(new KafkaAccessReconciler(operator.getKubernetesClient()));
operator.start();

String strimziNamespace = System.getenv().getOrDefault("STRIMZI_NAMESPACE", "*");
Set<String> namespaces = parseNamespaces(strimziNamespace);

operator.register(new KafkaAccessReconciler(operator.getKubernetesClient()),
configuration -> configuration.settingNamespaces(namespaces));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does called configuration.settingNamespaces("*") really work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's why I had previous comment(s) about adding the watchingAllNamespaces().


try {
operator.start();
LOGGER.info("Kafka Access operator started successfully for namespaces: {}", namespaces);
} catch (Exception e) {
LOGGER.error("Failed to start Kafka Access operator. This may be due to missing RoleBindings for one or more namespaces: {}. " +
"Please ensure that appropriate RoleBindings exist for all configured namespaces. Error: {}",
namespaces, e.getMessage(), e);
// Continue running to keep pod alive and allow health checks to work
LOGGER.warn("Operator will continue running but may not be able to reconcile resources in namespaces without proper permissions");
}

Server server = new Server(HEALTH_CHECK_PORT);
ServletHandler handler = new ServletHandler();
server.setHandler(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,13 @@ void testReconcileWithUserProvidedSecretAndTestDeleteSecretWithNameChange() {
KafkaAccess currentKafkaAccess = client.resources(KafkaAccess.class).inNamespace(NAMESPACE).withName(NAME).get();
assertThat(currentKafkaAccess).isNotNull();

currentKafkaAccess.getSpec().setSecretName(NEW_USER_PROVIDED_SECRET_NAME);
client.resources(KafkaAccess.class).resource(currentKafkaAccess).update();
client.resources(KafkaAccess.class)
.inNamespace(NAMESPACE)
.withName(NAME)
.edit(ka -> {
ka.getSpec().setSecretName(NEW_USER_PROVIDED_SECRET_NAME);
return ka;
});

client.resources(KafkaAccess.class).inNamespace(NAMESPACE).withName(NAME).waitUntilCondition(updatedKafkaAccess -> {
final Optional<String> bindingName = Optional.ofNullable(updatedKafkaAccess)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{{- if .Values.watchAnyNamespace }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the file named RoleBinding when it contains a ClusterRoleBinding?

metadata:
name: strimzi-access-operator
labels:
app: strimzi-access-operator
subjects:
- kind: ServiceAccount
name: strimzi-access-operator
namespace: {{ .Release.Namespace }}
roleRef:
kind: ClusterRole
name: strimzi-access-operator
apiGroup: rbac.authorization.k8s.io
{{- else }}
{{- $namespaces := list }}
{{- if .Values.watchNamespaces }}
{{- if kindIs "slice" .Values.watchNamespaces }}
{{- $namespaces = .Values.watchNamespaces }}
{{- else }}
{{- $namespaces = list .Values.watchNamespaces }}
{{- end }}
{{- else }}
{{- $namespaces = list .Release.Namespace }}
{{- end }}
{{- range $namespaces }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: strimzi-access-operator
namespace: {{ . }}
labels:
app: strimzi-access-operator
subjects:
- kind: ServiceAccount
name: strimzi-access-operator
namespace: {{ $.Release.Namespace }}
roleRef:
kind: ClusterRole
name: strimzi-access-operator
apiGroup: rbac.authorization.k8s.io
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ spec:
volumeMounts:
- name: strimzi-tmp
mountPath: /tmp
env:
- name: STRIMZI_NAMESPACE
{{- if .Values.watchAnyNamespace }}
value: "*"
{{- else }}
{{- if .Values.watchNamespaces }}
value: "{{ include "strimzi.watchNamespacesList" . }}"
{{- else }}
valueFrom:
fieldRef:
fieldPath: metadata.namespace
{{- end }}
{{- end }}
resources:
{{ toYaml .Values.resources | indent 12 }}
livenessProbe:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{{/*
Generate the watch namespaces list for STRIMZI_NAMESPACE environment variable
*/}}
{{- define "strimzi.watchNamespacesList" -}}
{{- if .Values.watchNamespaces }}
{{- if eq .Values.watchNamespaces "*" }}
*
{{- else if kindIs "slice" .Values.watchNamespaces }}
{{- join "," .Values.watchNamespaces }}
{{- else }}
{{- .Values.watchNamespaces }}
{{- end }}
{{- else }}
{{- .Release.Namespace }}
{{- end }}
{{- end -}}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Contains `.Release.Namespace` by default
watchNamespaces: []
watchAnyNamespace: false

image:
registry: quay.io
repository: strimzi
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FMPOV you should not have multiple files containing RoleBinding for each Namespace - similarly to the Cluster Operator (and what we have in docs) you can just have one and if needed, it can be created in every Namespace the operator will watch.

Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
kind: RoleBinding
metadata:
name: strimzi-access-operator
namespace: strimzi-access-operator
labels:
app: strimzi-access-operator
subjects:
Expand Down
5 changes: 5 additions & 0 deletions packaging/install/050-Deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ spec:
volumeMounts:
- name: strimzi-tmp
mountPath: /tmp
env:
- name: STRIMZI_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
resources:
limits:
cpu: 500m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface TestConstants {
String SERVICE_ACCOUNT = "ServiceAccount";
String CLUSTER_ROLE = "ClusterRole";
String CLUSTER_ROLE_BINDING = "ClusterRoleBinding";
String ROLE_BINDING = "RoleBinding";
String CUSTOM_RESOURCE_DEFINITION_SHORT = "Crd";

//--------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.fabric8.kubernetes.api.model.rbac.ClusterRole;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBindingBuilder;
import io.fabric8.kubernetes.api.model.rbac.RoleBinding;
import io.fabric8.kubernetes.api.model.rbac.RoleBindingBuilder;
import io.skodjob.testframe.installation.InstallationMethod;
import io.skodjob.testframe.resources.KubeResourceManager;
import io.skodjob.testframe.utils.ImageUtils;
Expand Down Expand Up @@ -76,6 +78,18 @@ public void install() {
.build()
);
break;
case TestConstants.ROLE_BINDING:
RoleBinding roleBinding = TestFrameUtils.configFromYaml(file, RoleBinding.class);
KubeResourceManager.get().createOrUpdateResourceWithWait(new RoleBindingBuilder(roleBinding)
.editOrNewMetadata()
.withNamespace(installationNamespace)
.endMetadata()
.editFirstSubject()
.withNamespace(installationNamespace)
.endSubject()
.build()
);
break;
case TestConstants.CUSTOM_RESOURCE_DEFINITION_SHORT:
CustomResourceDefinition customResourceDefinition = TestFrameUtils.configFromYaml(file, CustomResourceDefinition.class);
KubeResourceManager.get().createOrUpdateResourceWithWait(customResourceDefinition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.skodjob.testframe.resources.DeploymentType;
import io.skodjob.testframe.resources.KubeResourceManager;
import io.skodjob.testframe.resources.NamespaceType;
import io.skodjob.testframe.resources.RoleBindingType;
import io.skodjob.testframe.utils.KubeUtils;
import io.strimzi.kafka.access.installation.SetupAccessOperator;
import io.strimzi.kafka.access.log.MustGatherImpl;
Expand Down Expand Up @@ -42,6 +43,7 @@ public abstract class AbstractST {
new CustomResourceDefinitionType(),
new DeploymentType(),
new NamespaceType(),
new RoleBindingType(),
new KafkaAccessType()
);

Expand Down
Loading