Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

import com.google.common.io.Files
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
Expand All @@ -26,6 +25,7 @@ import javax.jms.Message
import javax.jms.MessageListener
import javax.jms.Session
import javax.jms.TextMessage
import java.nio.file.Files
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference

Expand All @@ -43,7 +43,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
HornetQTextMessage message = session.createTextMessage(messageText)

def setupSpec() {
def tempDir = Files.createTempDir()
def tempDir = Files.createTempDirectory("jmsTempDir").toFile()
tempDir.deleteOnExit()

Configuration config = new ConfigurationImpl()
Expand Down Expand Up @@ -86,19 +86,34 @@ class Jms2Test extends AgentInstrumentationSpecification {
def producer = session.createProducer(destination)
def consumer = session.createConsumer(destination)

producer.send(message)
runWithSpan("producer parent") {
producer.send(message)
}

TextMessage receivedMessage = consumer.receive()
TextMessage receivedMessage = runWithSpan("consumer parent") {
return consumer.receive() as TextMessage
}
String messageId = receivedMessage.getJMSMessageID()

expect:
receivedMessage.text == messageText
assertTraces(2) {
trace(0, 1) {
producerSpan(it, 0, destinationType, destinationName)
SpanData producerSpanData
trace(0, 2) {
span(0) {
name "producer parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))

producerSpanData = span(1)
}
trace(1, 1) {
consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive")
trace(1, 2) {
span(0) {
name "consumer parent"
hasNoParent()
}
consumerSpan(it, 1, destinationType, destinationName, messageId, "receive", span(0), producerSpanData)
}
}

Expand All @@ -124,18 +139,24 @@ class Jms2Test extends AgentInstrumentationSpecification {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
messageRef.set(message as TextMessage)
}
}

producer.send(message)
runWithSpan("parent") {
producer.send(message)
}
lock.countDown()

expect:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))
consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
Expand All @@ -158,7 +179,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
def consumer = session.createConsumer(destination)

// Receive with timeout
TextMessage receivedMessage = consumer.receiveNoWait()
Message receivedMessage = consumer.receiveNoWait()

expect:
receivedMessage == null
Expand All @@ -179,7 +200,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
def consumer = session.createConsumer(destination)

// Receive with timeout
TextMessage receivedMessage = consumer.receive(100)
Message receivedMessage = consumer.receive(100)

expect:
receivedMessage == null
Expand All @@ -206,19 +227,25 @@ class Jms2Test extends AgentInstrumentationSpecification {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
messageRef.set(message as TextMessage)
}
}

when:
producer.send(destination, message)
runWithSpan("parent") {
producer.send(destination, message)
}
lock.countDown()

then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
}
producerSpan(it, 1, destinationType, destinationName, span(0))
consumerSpan(it, 2, destinationType, destinationName, messageRef.get().getJMSMessageID(), "process", span(1))
}
}
// This check needs to go after all traces have been accounted for
Expand All @@ -236,11 +263,15 @@ class Jms2Test extends AgentInstrumentationSpecification {
session.createTemporaryTopic() | "topic" | "(temporary)"
}

static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, SpanData parentSpan = null) {
trace.span(index) {
name destinationName + " send"
kind PRODUCER
hasNoParent()
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION" destinationName
Expand All @@ -256,14 +287,19 @@ class Jms2Test extends AgentInstrumentationSpecification {
// passing messageId = null will verify message.id is not captured,
// passing messageId = "" will verify message.id is captured (but won't verify anything about the value),
// any other value for messageId will verify that message.id is captured and has that same value
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) {
static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, String operation, SpanData parentSpan, SpanData linkedSpan = null) {
trace.span(index) {
name destinationName + " " + operation
kind CONSUMER
if (parentOrLinkedSpan != null) {
childOf((SpanData) parentOrLinkedSpan)
} else {
if (parentSpan == null) {
hasNoParent()
} else {
childOf(parentSpan)
}
if (linkedSpan == null) {
hasNoLinks()
} else {
hasLink(linkedSpan)
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.opentelemetry.javaagent.instrumentation.jms.JmsSingletons.consumerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
Expand Down Expand Up @@ -37,10 +38,16 @@ public ElementMatcher<TypeDescription> typeMatcher() {
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("receive").and(takesArguments(0).or(takesArguments(1))).and(isPublic()),
named("receive")
.and(takesArguments(0).or(takesArguments(1)))
.and(returns(named("javax.jms.Message")))
.and(isPublic()),
JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice");
transformer.applyAdviceToMethod(
named("receiveNoWait").and(takesArguments(0)).and(isPublic()),
named("receiveNoWait")
.and(takesArguments(0))
.and(returns(named("javax.jms.Message")))
.and(isPublic()),
JmsMessageConsumerInstrumentation.class.getName() + "$ConsumerAdvice");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;

public final class JmsSingletons {
Expand Down Expand Up @@ -47,6 +48,10 @@ private static Instrumenter<MessageWithDestination, Void> buildConsumerInstrumen
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation))
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<>(
GlobalOpenTelemetry.getPropagators().getTextMapPropagator(),
MessagePropertyGetter.INSTANCE))
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

Expand Down
Loading