Skip to content

Commit 7a41bfb

Browse files
committed
Add some infrastructure for Observation
* Populate an `ObservationRegistry` bean from the `IntegrationManagementConfigurer` into all the `IntegrationManagement` components * Introduce `MessageReceiverContext` and `MessageSenderContext` for easier usage in the target code * Implement `Observation` handling in the `AbstractMessageHandler` * Modify `ObservationPropagationChannelInterceptorTests` for new `MessageSenderContext` * Use `BridgeHandler` to ensure that `Observation` is propagated and handled properly * Verify that tags from the `AbstractMessageHandler` are preset on the consumer span
1 parent 47cfae9 commit 7a41bfb

File tree

8 files changed

+227
-63
lines changed

8 files changed

+227
-63
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationManagementConfiguration.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,8 @@
3131
import org.springframework.integration.support.management.metrics.MetricsCaptor;
3232
import org.springframework.util.Assert;
3333

34+
import io.micrometer.observation.ObservationRegistry;
35+
3436
/**
3537
* {@code @Configuration} class that registers a {@link IntegrationManagementConfigurer} bean.
3638
*
@@ -64,12 +66,16 @@ public void setImportMetadata(AnnotationMetadata importMetadata) {
6466

6567
@Bean(name = IntegrationManagementConfigurer.MANAGEMENT_CONFIGURER_NAME)
6668
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
67-
public IntegrationManagementConfigurer managementConfigurer(ObjectProvider<MetricsCaptor> metricsCaptorProvider) {
69+
public IntegrationManagementConfigurer managementConfigurer(
70+
ObjectProvider<MetricsCaptor> metricsCaptorProvider,
71+
ObjectProvider<ObservationRegistry> observationRegistryProvider) {
72+
6873
IntegrationManagementConfigurer configurer = new IntegrationManagementConfigurer();
6974
configurer.setDefaultLoggingEnabled(
7075
Boolean.parseBoolean(this.environment.resolvePlaceholders(
7176
(String) this.attributes.get("defaultLoggingEnabled"))));
7277
configurer.setMetricsCaptorProvider(metricsCaptorProvider);
78+
configurer.setObservationRegistry(observationRegistryProvider);
7379
return configurer;
7480
}
7581

spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationManagementConfigurer.java

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,10 +38,12 @@
3838
import org.springframework.messaging.MessageHandler;
3939
import org.springframework.util.Assert;
4040

41+
import io.micrometer.observation.ObservationRegistry;
42+
4143

4244
/**
4345
* Configures beans that implement {@link IntegrationManagement}.
44-
* Configures counts, stats, logging for all (or selected) components.
46+
* Configures logging, {@link MetricsCaptor} and {@link ObservationRegistry} for all (or selected) components.
4547
*
4648
* @author Gary Russell
4749
* @author Artem Bilan
@@ -74,6 +76,10 @@ public class IntegrationManagementConfigurer
7476

7577
private ObjectProvider<MetricsCaptor> metricsCaptorProvider;
7678

79+
private ObservationRegistry observationRegistry;
80+
81+
private ObjectProvider<ObservationRegistry> observationRegistryProvider;
82+
7783
@Override
7884
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
7985
this.applicationContext = applicationContext;
@@ -95,8 +101,8 @@ public void setBeanName(String name) {
95101
* {@link org.apache.commons.logging.Log#isDebugEnabled()} can be quite expensive
96102
* and account for an inordinate amount of CPU time.
97103
* <p>
98-
* Set this to false to disable logging by default in all framework components that implement
99-
* {@link IntegrationManagement} (channels, message handlers etc). This turns off logging such as
104+
* Set this to 'false' to disable logging by default in all framework components that implement
105+
* {@link IntegrationManagement} (channels, message handlers etc.) This turns off logging such as
100106
* "PreSend on channel", "Received message" etc.
101107
* <p>
102108
* After the context is initialized, individual components can have their setting changed by invoking
@@ -115,14 +121,21 @@ void setMetricsCaptorProvider(ObjectProvider<MetricsCaptor> metricsCaptorProvide
115121
this.metricsCaptorProvider = metricsCaptorProvider;
116122
}
117123

118-
@Nullable
119-
MetricsCaptor obtainMetricsCaptor() {
120-
if (this.metricsCaptor == null && this.metricsCaptorProvider != null) {
121-
this.metricsCaptor = this.metricsCaptorProvider.getIfUnique();
122-
}
123-
return this.metricsCaptor;
124+
/**
125+
* Set an {@link ObservationRegistry} to populate to the {@link IntegrationManagement} components
126+
* in the application context.
127+
* @param observationRegistry the {@link ObservationRegistry} to use.
128+
* @since 6.0
129+
*/
130+
public void setObservationRegistry(@Nullable ObservationRegistry observationRegistry) {
131+
this.observationRegistry = observationRegistry;
132+
}
133+
134+
void setObservationRegistry(ObjectProvider<ObservationRegistry> observationRegistryProvider) {
135+
this.observationRegistryProvider = observationRegistryProvider;
124136
}
125137

138+
126139
@Override
127140
public void afterSingletonsInstantiated() {
128141
Assert.state(this.applicationContext != null, "'applicationContext' must not be null");
@@ -133,15 +146,29 @@ public void afterSingletonsInstantiated() {
133146
registerComponentGauges();
134147
}
135148

136-
for (IntegrationManagement integrationManagement :
137-
this.applicationContext.getBeansOfType(IntegrationManagement.class).values()) {
149+
setupObservationRegistry();
138150

139-
enhanceIntegrationManagement(integrationManagement);
140-
}
151+
this.applicationContext.getBeansOfType(IntegrationManagement.class).values()
152+
.forEach(this::enhanceIntegrationManagement);
141153

142154
this.singletonsInstantiated = true;
143155
}
144156

157+
@Nullable
158+
private MetricsCaptor obtainMetricsCaptor() {
159+
if (this.metricsCaptor == null && this.metricsCaptorProvider != null) {
160+
this.metricsCaptor = this.metricsCaptorProvider.getIfUnique();
161+
}
162+
return this.metricsCaptor;
163+
}
164+
165+
@Nullable
166+
private void setupObservationRegistry() {
167+
if (this.observationRegistry == null && this.observationRegistryProvider != null) {
168+
this.observationRegistry = this.observationRegistryProvider.getIfUnique();
169+
}
170+
}
171+
145172
private void registerComponentGauges() {
146173
this.gauges.add(
147174
this.metricsCaptor.gaugeBuilder("spring.integration.channels", this,
@@ -169,17 +196,21 @@ private void enhanceIntegrationManagement(IntegrationManagement integrationManag
169196
if (this.metricsCaptor != null) {
170197
integrationManagement.registerMetricsCaptor(this.metricsCaptor);
171198
}
199+
if (this.observationRegistry != null) {
200+
integrationManagement.registerObservationRegistry(this.observationRegistry);
201+
}
172202
}
173203

174204
@Override
175205
public Object postProcessAfterInitialization(Object bean, String name) throws BeansException {
176-
if (this.singletonsInstantiated && bean instanceof IntegrationManagement) {
177-
enhanceIntegrationManagement((IntegrationManagement) bean);
206+
if (this.singletonsInstantiated && bean instanceof IntegrationManagement integrationManagement) {
207+
enhanceIntegrationManagement(integrationManagement);
178208
}
179209
return bean;
180210
}
181211

182-
@Override public void onApplicationEvent(ContextClosedEvent event) {
212+
@Override
213+
public void onApplicationEvent(ContextClosedEvent event) {
183214
if (event.getApplicationContext().equals(this.applicationContext)) {
184215
this.gauges.forEach(MeterFacade::remove);
185216
this.gauges.clear();

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import org.springframework.integration.history.MessageHistory;
2222
import org.springframework.integration.support.management.metrics.MetricsCaptor;
2323
import org.springframework.integration.support.management.metrics.SampleFacade;
24+
import org.springframework.integration.support.management.observation.MessageReceiverContext;
2425
import org.springframework.integration.support.utils.IntegrationUtils;
2526
import org.springframework.messaging.Message;
2627
import org.springframework.messaging.MessageHandler;
2728
import org.springframework.util.Assert;
2829

30+
import io.micrometer.observation.Observation;
31+
import io.micrometer.observation.ObservationRegistry;
2932
import reactor.core.CoreSubscriber;
3033

3134
/**
@@ -40,18 +43,31 @@ public abstract class AbstractMessageHandler extends MessageHandlerSupport
4043
@Override // NOSONAR
4144
public void handleMessage(Message<?> message) {
4245
Assert.notNull(message, "Message must not be null");
43-
if (isLoggingEnabled() && this.logger.isDebugEnabled()) {
44-
this.logger.debug(this + " received message: " + message);
46+
if (isLoggingEnabled()) {
47+
this.logger.debug(() -> this + " received message: " + message);
4548
}
46-
MetricsCaptor metricsCaptor = getMetricsCaptor();
47-
if (metricsCaptor != null) {
48-
handleWithMetrics(message, metricsCaptor);
49+
ObservationRegistry observationRegistry = getObservationRegistry();
50+
if (observationRegistry != null) {
51+
handleWithObservation(message, observationRegistry);
4952
}
5053
else {
51-
doHandleMessage(message);
54+
MetricsCaptor metricsCaptor = getMetricsCaptor();
55+
if (metricsCaptor != null) {
56+
handleWithMetrics(message, metricsCaptor);
57+
}
58+
else {
59+
doHandleMessage(message);
60+
}
5261
}
5362
}
5463

64+
private void handleWithObservation(Message<?> message, ObservationRegistry observationRegistry) {
65+
Observation.createNotStarted(CONSUME_OBSERVATION_NAME, new MessageReceiverContext(message), observationRegistry)
66+
.lowCardinalityKeyValue("type", "handler")
67+
.lowCardinalityKeyValue("name", getComponentName() == null ? "unknown" : getComponentName())
68+
.observe(() -> doHandleMessage(message));
69+
}
70+
5571
private void handleWithMetrics(Message<?> message, MetricsCaptor metricsCaptor) {
5672
SampleFacade sample = metricsCaptor.start();
5773
try {

spring-integration-core/src/main/java/org/springframework/integration/handler/MessageHandlerSupport.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,8 @@
3131
import org.springframework.integration.support.management.metrics.MetricsCaptor;
3232
import org.springframework.integration.support.management.metrics.TimerFacade;
3333

34+
import io.micrometer.observation.ObservationRegistry;
35+
3436
/**
3537
* Base class for Message handling components that provides basic validation and error
3638
* handling capabilities. Asserts that the incoming Message is not null and that it does
@@ -61,6 +63,8 @@ public abstract class MessageHandlerSupport extends IntegrationObjectSupport
6163

6264
private MetricsCaptor metricsCaptor;
6365

66+
private ObservationRegistry observationRegistry;
67+
6468
private int order = Ordered.LOWEST_PRECEDENCE;
6569

6670
private String managedName;
@@ -89,6 +93,15 @@ protected MetricsCaptor getMetricsCaptor() {
8993
return this.metricsCaptor;
9094
}
9195

96+
@Override
97+
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
98+
this.observationRegistry = observationRegistry;
99+
}
100+
101+
protected ObservationRegistry getObservationRegistry() {
102+
return this.observationRegistry;
103+
}
104+
92105
@Override
93106
public void setOrder(int order) {
94107
this.order = order;

spring-integration-core/src/main/java/org/springframework/integration/support/management/IntegrationManagement.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2021 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,10 +22,14 @@
2222
import org.springframework.jmx.export.annotation.ManagedAttribute;
2323
import org.springframework.lang.Nullable;
2424

25+
import io.micrometer.observation.ObservationRegistry;
26+
2527
/**
2628
* Base interface for Integration managed components.
2729
*
2830
* @author Gary Russell
31+
* @author Artem Bilan
32+
*
2933
* @since 4.2
3034
*
3135
*/
@@ -35,11 +39,13 @@ public interface IntegrationManagement extends NamedComponent, DisposableBean {
3539

3640
String SEND_TIMER_NAME = METER_PREFIX + "send";
3741

42+
String CONSUME_OBSERVATION_NAME = SEND_TIMER_NAME;
43+
3844
String RECEIVE_COUNTER_NAME = METER_PREFIX + "receive";
3945

4046
/**
4147
* Enable logging or not.
42-
* @param enabled dalse to disable.
48+
* @param enabled false to disable.
4349
*/
4450
@ManagedAttribute(description = "Use to disable debug logging during normal message flow")
4551
default void setLoggingEnabled(boolean enabled) {
@@ -80,13 +86,28 @@ default ManagementOverrides getOverrides() {
8086

8187
/**
8288
* Inject a {@link MetricsCaptor}.
89+
* Ignored if {@link ObservationRegistry} is provided.
8390
* @param captor the captor.
8491
* @since 5.0.4
92+
* @see #registerObservationRegistry(ObservationRegistry)
8593
*/
8694
default void registerMetricsCaptor(MetricsCaptor captor) {
8795
// no op
8896
}
8997

98+
/**
99+
* Inject an {@link ObservationRegistry}.
100+
* If provided, the {@link MetricsCaptor} is ignored.
101+
* The meters capturing has to be configured as an {@link io.micrometer.observation.ObservationHandler}
102+
* on the provided {@link ObservationRegistry}.
103+
* @param observationRegistry the {@link ObservationRegistry} to expose observations from the component.
104+
* @since 6.0
105+
* @see #registerMetricsCaptor(MetricsCaptor)
106+
*/
107+
default void registerObservationRegistry(ObservationRegistry observationRegistry) {
108+
// no op
109+
}
110+
90111
@Override
91112
default void destroy() {
92113
// no op
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.management.observation;
18+
19+
import org.springframework.messaging.Message;
20+
21+
import io.micrometer.observation.transport.ReceiverContext;
22+
23+
/**
24+
* The {@link ReceiverContext} extension for {@link Message} context.
25+
*
26+
* @author Artem Bilan
27+
*
28+
* @since 6.0
29+
*/
30+
public class MessageReceiverContext extends ReceiverContext<Message<?>> {
31+
32+
public MessageReceiverContext(Message<?> message) {
33+
super((carrier, key) -> carrier.getHeaders().get(key, String.class));
34+
setCarrier(message);
35+
}
36+
37+
}

0 commit comments

Comments
 (0)