3030import io .fabric8 .kubernetes .api .model .rbac .ClusterRoleBinding ;
3131import io .strimzi .api .kafka .model .bridge .KafkaBridge ;
3232import io .strimzi .api .kafka .model .bridge .KafkaBridgeBuilder ;
33+ import io .strimzi .api .kafka .model .bridge .KafkaBridgeConsumerEnablement ;
3334import io .strimzi .api .kafka .model .bridge .KafkaBridgeHttpConfig ;
35+ import io .strimzi .api .kafka .model .bridge .KafkaBridgeProducerEnablement ;
3436import io .strimzi .api .kafka .model .bridge .KafkaBridgeResources ;
3537import io .strimzi .api .kafka .model .common .CertSecretSource ;
3638import io .strimzi .api .kafka .model .common .CertSecretSourceBuilder ;
@@ -141,6 +143,9 @@ protected List<EnvVar> getExpectedEnvVars() {
141143 expected .add (new EnvVarBuilder ().withName (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_ID ).withValue (cluster ).build ());
142144 expected .add (new EnvVarBuilder ().withName (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_HTTP_HOST ).withValue (KafkaBridgeHttpConfig .HTTP_DEFAULT_HOST ).build ());
143145 expected .add (new EnvVarBuilder ().withName (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_HTTP_PORT ).withValue (String .valueOf (KafkaBridgeHttpConfig .HTTP_DEFAULT_PORT )).build ());
146+ expected .add (new EnvVarBuilder ().withName (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT ).withValue (String .valueOf (KafkaBridgeHttpConfig .HTTP_DEFAULT_TIMEOUT )).build ());
147+ expected .add (new EnvVarBuilder ().withName (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_CONSUMER_ENABLED ).withValue (String .valueOf (true )).build ());
148+ expected .add (new EnvVarBuilder ().withName (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_PRODUCER_ENABLED ).withValue (String .valueOf (true )).build ());
144149 expected .add (new EnvVarBuilder ().withName (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED ).withValue (String .valueOf (false )).build ());
145150 return expected ;
146151 }
@@ -155,6 +160,9 @@ public void testDefaultValues() {
155160 assertThat (kbc .readinessProbeOptions .getTimeoutSeconds (), is (5 ));
156161 assertThat (kbc .livenessProbeOptions .getInitialDelaySeconds (), is (15 ));
157162 assertThat (kbc .livenessProbeOptions .getTimeoutSeconds (), is (5 ));
163+ assertThat (kbc .getHttp ().getTimeoutSeconds (), is (-1L ));
164+ assertThat (kbc .getHttp ().getConsumer ().isEnabled (), is (true ));
165+ assertThat (kbc .getHttp ().getProducer ().isEnabled (), is (true ));
158166 }
159167
160168 @ ParallelTest
@@ -1300,4 +1308,25 @@ public void testJvmOptions() {
13001308 assertThat (javaOpts .getValue (), containsString ("-Xmx256m" ));
13011309 assertThat (javaOpts .getValue (), containsString ("-XX:InitiatingHeapOccupancyPercent=36" ));
13021310 }
1311+
1312+ @ ParallelTest
1313+ public void testConsumerProducerOptions () {
1314+ KafkaBridge resource = new KafkaBridgeBuilder (this .resource )
1315+ .editSpec ()
1316+ .editHttp ()
1317+ .withTimeoutSeconds (60 )
1318+ .withConsumer (new KafkaBridgeConsumerEnablement ())
1319+ .withProducer (new KafkaBridgeProducerEnablement ())
1320+ .endHttp ()
1321+ .endSpec ()
1322+ .build ();
1323+
1324+ KafkaBridgeCluster kb = KafkaBridgeCluster .fromCrd (Reconciliation .DUMMY_RECONCILIATION , resource , SHARED_ENV_PROVIDER );
1325+ Deployment deployment = kb .generateDeployment (new HashMap <>(), true , null , null );
1326+ Container container = deployment .getSpec ().getTemplate ().getSpec ().getContainers ().get (0 );
1327+
1328+ assertThat (io .strimzi .operator .cluster .TestUtils .containerEnvVars (container ).get (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT ), is ("60" ));
1329+ assertThat (io .strimzi .operator .cluster .TestUtils .containerEnvVars (container ).get (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_CONSUMER_ENABLED ), is ("true" ));
1330+ assertThat (io .strimzi .operator .cluster .TestUtils .containerEnvVars (container ).get (KafkaBridgeCluster .ENV_VAR_KAFKA_BRIDGE_PRODUCER_ENABLED ), is ("true" ));
1331+ }
13031332}
0 commit comments