66
77import io .javaoperatorsdk .operator .Operator ;
88import io .strimzi .kafka .access .server .HealthServlet ;
9- import java .util .Arrays ;
9+ import java .util .Collections ;
10+ import java .util .HashSet ;
1011import java .util .Set ;
11- import java . util . stream . Collectors ;
12+ import org . apache . kafka . common . errors . InvalidConfigurationException ;
1213import org .eclipse .jetty .server .Server ;
1314import org .eclipse .jetty .servlet .ServletHandler ;
1415import org .slf4j .Logger ;
1516import org .slf4j .LoggerFactory ;
1617
18+ import static java .util .Arrays .asList ;
19+
1720/**
1821 * The main operator class for Strimzi Access Operator
1922 */
2023public class KafkaAccessOperator {
2124
2225 private static final Logger LOGGER = LoggerFactory .getLogger (KafkaAccessOperator .class );
2326 private static final int HEALTH_CHECK_PORT = 8080 ;
27+ private final static String ANY_NAMESPACE = "*" ;
28+
29+ /**
30+ * Parses namespace configuration from a string
31+ *
32+ * @param namespacesList Comma-separated list of namespaces or "*" for all namespaces
33+ * @return Set of namespace names
34+ */
35+ private static Set <String > parseNamespaces (String namespacesList ) {
36+ Set <String > namespaces ;
37+ if (namespacesList .equals (ANY_NAMESPACE )) {
38+ namespaces = Collections .singleton (ANY_NAMESPACE );
39+ } else {
40+ if (namespacesList .trim ().equals (ANY_NAMESPACE )) {
41+ namespaces = Collections .singleton (ANY_NAMESPACE );
42+ } else if (namespacesList .matches ("(\\ s*[a-z0-9.-]+\\ s*,)*\\ s*[a-z0-9.-]+\\ s*" )) {
43+ namespaces = new HashSet <>(asList (namespacesList .trim ().split ("\\ s*,+\\ s*" )));
44+ } else {
45+ throw new InvalidConfigurationException ("Not a valid list of namespaces nor the 'any namespace' wildcard "
46+ + ANY_NAMESPACE );
47+ }
48+ }
49+
50+ return namespaces ;
51+ }
2452
2553 /**
2654 * Initializes the operator and runs a servlet for health checking
@@ -32,22 +60,23 @@ public static void main(final String[] args) {
3260 final Operator operator = new Operator (overrider -> overrider
3361 .withUseSSAToPatchPrimaryResource (false ));
3462
35- String strimziNamespace = System .getenv ("STRIMZI_NAMESPACE" );
36- if (strimziNamespace != null && strimziNamespace .matches ("\\ *" )) {
37- LOGGER .info ("Watching all namespaces" );
38- operator .register (new KafkaAccessReconciler (operator .getKubernetesClient ()),
39- o -> o .watchingAllNamespaces ());
40- } else {
41- Set <String > namespaces =
42- strimziNamespace == null ? Set .of () :
43- Arrays .stream (strimziNamespace .split ("," ))
44- .map (String ::trim )
45- .collect (Collectors .toSet ());
46- LOGGER .info ("Watching specific namespaces: {}" , namespaces );
47- operator .register (new KafkaAccessReconciler (operator .getKubernetesClient ()),
48- o -> o .settingNamespaces (namespaces ));
63+ String strimziNamespace = System .getenv ().getOrDefault ("STRIMZI_NAMESPACE" , "*" );
64+ Set <String > namespaces = parseNamespaces (strimziNamespace );
65+
66+ operator .register (new KafkaAccessReconciler (operator .getKubernetesClient ()),
67+ configuration -> configuration .settingNamespaces (namespaces ));
68+
69+ try {
70+ operator .start ();
71+ LOGGER .info ("Kafka Access operator started successfully for namespaces: {}" , namespaces );
72+ } catch (Exception e ) {
73+ LOGGER .error ("Failed to start Kafka Access operator. This may be due to missing RoleBindings for one or more namespaces: {}. " +
74+ "Please ensure that appropriate RoleBindings exist for all configured namespaces. Error: {}" ,
75+ namespaces , e .getMessage (), e );
76+ // Continue running to keep pod alive and allow health checks to work
77+ LOGGER .warn ("Operator will continue running but may not be able to reconcile resources in namespaces without proper permissions" );
4978 }
50- operator . start ();
79+
5180 Server server = new Server (HEALTH_CHECK_PORT );
5281 ServletHandler handler = new ServletHandler ();
5382 server .setHandler (handler );
0 commit comments