Skip to content

Commit 86fcfe5

Browse files
Mateusz RzeszutekLironKS
authored andcommitted
Link RabbitMQ receive span with the producer span (open-telemetry#6808)
Similar to open-telemetry#6804, but for RabbitMQ. Also changed the span kind of the receive span to `CONSUMER`, to match the spec.
1 parent 432426d commit 86fcfe5

File tree

4 files changed

+156
-92
lines changed

4 files changed

+156
-92
lines changed

instrumentation/rabbitmq-2.7/javaagent/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,7 @@ dependencies {
3232
tasks.withType<Test>().configureEach {
3333
// TODO run tests both with and without experimental span attributes
3434
jvmArgs("-Dotel.instrumentation.rabbitmq.experimental-span-attributes=true")
35+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
36+
3537
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
3638
}

instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
1919
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
2020
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
21+
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
2122
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
2223
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
2324
import java.util.ArrayList;
@@ -29,18 +30,15 @@ public final class RabbitSingletons {
2930
InstrumentationConfig.get()
3031
.getBoolean("otel.instrumentation.rabbitmq.experimental-span-attributes", false);
3132
private static final String instrumentationName = "io.opentelemetry.rabbitmq-2.7";
32-
private static final Instrumenter<ChannelAndMethod, Void> channelInstrumenter;
33-
private static final Instrumenter<ReceiveRequest, GetResponse> receiveInstrumenter;
34-
private static final Instrumenter<DeliveryRequest, Void> deliverInstrumenter;
33+
private static final Instrumenter<ChannelAndMethod, Void> channelInstrumenter =
34+
createChannelInstrumenter();
35+
private static final Instrumenter<ReceiveRequest, GetResponse> receiveInstrumenter =
36+
createReceiveInstrumenter();
37+
private static final Instrumenter<DeliveryRequest, Void> deliverInstrumenter =
38+
createDeliverInstrumenter();
3539
static final ContextKey<RabbitChannelAndMethodHolder> CHANNEL_AND_METHOD_CONTEXT_KEY =
3640
ContextKey.named("opentelemetry-rabbitmq-channel-and-method-context-key");
3741

38-
static {
39-
channelInstrumenter = createChannelInstrumenter();
40-
receiveInstrumenter = createReceiveInstrumenter();
41-
deliverInstrumenter = createDeliverInstrumenter();
42-
}
43-
4442
public static Instrumenter<ChannelAndMethod, Void> channelInstrumenter() {
4543
return channelInstrumenter;
4644
}
@@ -82,7 +80,12 @@ private static Instrumenter<ReceiveRequest, GetResponse> createReceiveInstrument
8280
return Instrumenter.<ReceiveRequest, GetResponse>builder(
8381
GlobalOpenTelemetry.get(), instrumentationName, ReceiveRequest::spanName)
8482
.addAttributesExtractors(extractors)
85-
.buildInstrumenter(SpanKindExtractor.alwaysClient());
83+
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
84+
.addSpanLinksExtractor(
85+
new PropagatorBasedSpanLinksExtractor<>(
86+
GlobalOpenTelemetry.getPropagators().getTextMapPropagator(),
87+
ReceiveRequestTextMapGetter.INSTANCE))
88+
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
8689
}
8790

8891
private static Instrumenter<DeliveryRequest, Void> createDeliverInstrumenter() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.rabbitmq;
7+
8+
import com.rabbitmq.client.AMQP;
9+
import com.rabbitmq.client.GetResponse;
10+
import io.opentelemetry.context.propagation.TextMapGetter;
11+
import java.util.Collections;
12+
import java.util.Map;
13+
import java.util.Optional;
14+
import javax.annotation.Nullable;
15+
16+
enum ReceiveRequestTextMapGetter implements TextMapGetter<ReceiveRequest> {
17+
INSTANCE;
18+
19+
@Override
20+
public Iterable<String> keys(ReceiveRequest carrier) {
21+
return Optional.of(carrier)
22+
.map(ReceiveRequest::getResponse)
23+
.map(GetResponse::getProps)
24+
.map(AMQP.BasicProperties::getHeaders)
25+
.map(Map::keySet)
26+
.orElse(Collections.emptySet());
27+
}
28+
29+
@Nullable
30+
@Override
31+
public String get(@Nullable ReceiveRequest carrier, String key) {
32+
return Optional.ofNullable(carrier)
33+
.map(ReceiveRequest::getResponse)
34+
.map(GetResponse::getProps)
35+
.map(AMQP.BasicProperties::getHeaders)
36+
.map(headers -> headers.get(key))
37+
.map(Object::toString)
38+
.orElse(null);
39+
}
40+
}

0 commit comments

Comments
 (0)