Skip to content

Commit f195ec0

Browse files
author
Mateusz Rzeszutek
authored
Link JMS receive span with the producer span (#6804)
Resolves #6779 In JMS you can have either the consumer receive span or the consumer process span (unlike Kafka, where the process span is always there and the receive span is just an addition) - in scenarios where polling (receive) is used, I think it makes sense to add links to the producer span to preserve the producer-consumer connection. Current messaging semantic conventions don't really describe a situation like this one, but the open-telemetry/oteps#220 OTEP mentions that links might be used in a scenario like this one - which makes me think that adding links here might be a not that bad idea.
1 parent da3eecf commit f195ec0

File tree

4 files changed

+150
-71
lines changed

4 files changed

+150
-71
lines changed

instrumentation/jms-1.1/javaagent/src/jms2Test/groovy/Jms2Test.groovy

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
import com.google.common.io.Files
76
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
87
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
98
import io.opentelemetry.sdk.trace.data.SpanData
@@ -26,6 +25,7 @@ import javax.jms.Message
2625
import javax.jms.MessageListener
2726
import javax.jms.Session
2827
import javax.jms.TextMessage
28+
import java.nio.file.Files
2929
import java.util.concurrent.CountDownLatch
3030
import java.util.concurrent.atomic.AtomicReference
3131

@@ -43,7 +43,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
4343
HornetQTextMessage message = session.createTextMessage(messageText)
4444

4545
def setupSpec() {
46-
def tempDir = Files.createTempDir()
46+
def tempDir = Files.createTempDirectory("jmsTempDir").toFile()
4747
tempDir.deleteOnExit()
4848

4949
Configuration config = new ConfigurationImpl()
@@ -86,19 +86,34 @@ class Jms2Test extends AgentInstrumentationSpecification {
8686
def producer = session.createProducer(destination)
8787
def consumer = session.createConsumer(destination)
8888

89-
producer.send(message)
89+
runWithSpan("producer parent") {
90+
producer.send(message)
91+
}
9092

91-
TextMessage receivedMessage = consumer.receive()
93+
TextMessage receivedMessage = runWithSpan("consumer parent") {
94+
return consumer.receive() as TextMessage
95+
}
9296
String messageId = receivedMessage.getJMSMessageID()
9397

9498
expect:
9599
receivedMessage.text == messageText
96100
assertTraces(2) {
97-
trace(0, 1) {
98-
producerSpan(it, 0, destinationType, destinationName)
101+
SpanData producerSpanData
102+
trace(0, 2) {
103+
span(0) {
104+
name "producer parent"
105+
hasNoParent()
106+
}
107+
producerSpan(it, 1, destinationType, destinationName, span(0))
108+
109+
producerSpanData = span(1)
99110
}
100-
trace(1, 1) {
101-
consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive")
111+
trace(1, 2) {
112+
span(0) {
113+
name "consumer parent"
114+
hasNoParent()
115+
}
116+
consumerSpan(it, 1, destinationType, destinationName, messageId, "receive", span(0), producerSpanData)
102117
}
103118
}
104119

@@ -124,18 +139,24 @@ class Jms2Test extends AgentInstrumentationSpecification {
124139
@Override
125140
void onMessage(Message message) {
126141
lock.await() // ensure the producer trace is reported first.
127-
messageRef.set(message)
142+
messageRef.set(message as TextMessage)
128143
}
129144
}
130145

131-
producer.send(message)
146+
runWithSpan("parent") {
147+
producer.send(message)
148+
}
132149
lock.countDown()
133150

134151
expect:
135152
assertTraces(1) {
136-
trace(0, 2) {
137-
producerSpan(it, 0, destinationType, destinationName)
138-
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
153+
trace(0, 3) {
154+
span(0) {
155+
name "parent"
156+
hasNoParent()
157+
}
158+
producerSpan(it, 1, destinationType, destinationName, span(0))
159+
consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
139160
}
140161
}
141162
// This check needs to go after all traces have been accounted for
@@ -158,7 +179,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
158179
def consumer = session.createConsumer(destination)
159180

160181
// Receive with timeout
161-
TextMessage receivedMessage = consumer.receiveNoWait()
182+
Message receivedMessage = consumer.receiveNoWait()
162183

163184
expect:
164185
receivedMessage == null
@@ -179,7 +200,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
179200
def consumer = session.createConsumer(destination)
180201
181202
// Receive with timeout
182-
TextMessage receivedMessage = consumer.receive(100)
203+
Message receivedMessage = consumer.receive(100)
183204
184205
expect:
185206
receivedMessage == null
@@ -206,19 +227,25 @@ class Jms2Test extends AgentInstrumentationSpecification {
206227
@Override
207228
void onMessage(Message message) {
208229
lock.await() // ensure the producer trace is reported first.
209-
messageRef.set(message)
230+
messageRef.set(message as TextMessage)
210231
}
211232
}
212233
213234
when:
214-
producer.send(destination, message)
235+
runWithSpan("parent") {
236+
producer.send(destination, message)
237+
}
215238
lock.countDown()
216239
217240
then:
218241
assertTraces(1) {
219-
trace(0, 2) {
220-
producerSpan(it, 0, destinationType, destinationName)
221-
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
242+
trace(0, 3) {
243+
span(0) {
244+
name "parent"
245+
hasNoParent()
246+
}
247+
producerSpan(it, 1, destinationType, destinationName, span(0))
248+
consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
222249
}
223250
}
224251
// This check needs to go after all traces have been accounted for
@@ -236,11 +263,15 @@ class Jms2Test extends AgentInstrumentationSpecification {
236263
session.createTemporaryTopic() | "topic" | "(temporary)"
237264
}
238265
239-
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
266+
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, SpanData parentSpan = null) {
240267
trace.span(index) {
241268
name destinationName + " send"
242269
kind PRODUCER
243-
hasNoParent()
270+
if (parentSpan == null) {
271+
hasNoParent()
272+
} else {
273+
childOf(parentSpan)
274+
}
244275
attributes {
245276
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
246277
"$SemanticAttributes.MESSAGING_DESTINATION" destinationName
@@ -256,14 +287,19 @@ class Jms2Test extends AgentInstrumentationSpecification {
256287
// passing messageId = null will verify message.id is not captured,
257288
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
258289
// any other value for messageId will verify that message.id is captured and has that same value
259-
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) {
290+
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null) {
260291
trace.span(index) {
261292
name destinationName + " " + operation
262293
kind CONSUMER
263-
if (parentOrLinkedSpan != null) {
264-
childOf((SpanData) parentOrLinkedSpan)
265-
} else {
294+
if (parentSpan == null) {
266295
hasNoParent()
296+
} else {
297+
childOf(parentSpan)
298+
}
299+
if (linkedSpan == null) {
300+
hasNoLinks()
301+
} else {
302+
hasLink(linkedSpan)
267303
}
268304
attributes {
269305
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"

instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageConsumerInstrumentation.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static io.opentelemetry.javaagent.instrumentation.jms.JmsSingletons.consumerInstrumenter;
1111
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
1212
import static net.bytebuddy.matcher.ElementMatchers.named;
13+
import static net.bytebuddy.matcher.ElementMatchers.returns;
1314
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
1415

1516
import io.opentelemetry.context.Context;
@@ -37,10 +38,16 @@ public ElementMatcher<TypeDescription> typeMatcher() {
3738
@Override
3839
public void transform(TypeTransformer transformer) {
3940
transformer.applyAdviceToMethod(
40-
named("receive").and(takesArguments(0).or(takesArguments(1))).and(isPublic()),
41+
named("receive")
42+
.and(takesArguments(0).or(takesArguments(1)))
43+
.and(returns(named("javax.jms.Message")))
44+
.and(isPublic()),
4145
JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice");
4246
transformer.applyAdviceToMethod(
43-
named("receiveNoWait").and(takesArguments(0)).and(isPublic()),
47+
named("receiveNoWait")
48+
.and(takesArguments(0))
49+
.and(returns(named("javax.jms.Message")))
50+
.and(isPublic()),
4451
JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice");
4552
}
4653

instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsSingletons.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
1313
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
1414
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
15+
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
1516
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
1617

1718
public final class JmsSingletons {
@@ -47,6 +48,10 @@ private static Instrumenter<MessageWithDestination, Void> buildConsumerInstrumen
4748
MessagingSpanNameExtractor.create(getter, operation))
4849
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
4950
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
51+
.addSpanLinksExtractor(
52+
new PropagatorBasedSpanLinksExtractor<>(
53+
GlobalOpenTelemetry.getPropagators().getTextMapPropagator(),
54+
MessagePropertyGetter.INSTANCE))
5055
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
5156
}
5257

0 commit comments

Comments
 (0)