Skip to content

Commit b3cd456

Browse files
aaron-aitrask
andauthored
Implement consumer part of rocketmq new client instrumentation (#7019)
Fixes #6764 , This PR is about the consumer part. Co-authored-by: Trask Stalnaker <[email protected]>
1 parent 87c7147 commit b3cd456

23 files changed

+1048
-93
lines changed

instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,23 @@ dependencies {
1616

1717
testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing"))
1818
}
19+
20+
tasks {
21+
val testReceiveSpanDisabled by registering(Test::class) {
22+
filter {
23+
includeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
24+
}
25+
include("**/RocketMqClientSuppressReceiveSpanTest.*")
26+
}
27+
28+
test {
29+
filter {
30+
excludeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
31+
}
32+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
33+
}
34+
35+
check {
36+
dependsOn(testReceiveSpanDisabled)
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
9+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
10+
import static net.bytebuddy.matcher.ElementMatchers.named;
11+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
12+
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
15+
import net.bytebuddy.asm.Advice;
16+
import net.bytebuddy.description.type.TypeDescription;
17+
import net.bytebuddy.matcher.ElementMatcher;
18+
import org.apache.rocketmq.client.apis.consumer.MessageListener;
19+
20+
final class ConsumeServiceInstrumentation implements TypeInstrumentation {
21+
@Override
22+
public ElementMatcher<TypeDescription> typeMatcher() {
23+
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService");
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
transformer.applyAdviceToMethod(
29+
isConstructor()
30+
.and(
31+
isPublic()
32+
.and(
33+
takesArgument(
34+
1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))),
35+
ConsumeServiceInstrumentation.class.getName() + "$ConstructorAdvice");
36+
}
37+
38+
@SuppressWarnings("unused")
39+
public static class ConstructorAdvice {
40+
@Advice.OnMethodEnter(suppress = Throwable.class)
41+
public static void onEnter(
42+
@Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) {
43+
// Replace messageListener by wrapper.
44+
if (!(messageListener instanceof MessageListenerWrapper)) {
45+
messageListener = new MessageListenerWrapper(messageListener);
46+
}
47+
}
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
11+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
12+
13+
import apache.rocketmq.v2.ReceiveMessageRequest;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
15+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
16+
import net.bytebuddy.asm.Advice;
17+
import net.bytebuddy.description.type.TypeDescription;
18+
import net.bytebuddy.matcher.ElementMatcher;
19+
import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult;
20+
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
21+
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture;
22+
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
23+
24+
final class ConsumerImplInstrumentation implements TypeInstrumentation {
25+
@Override
26+
public ElementMatcher<TypeDescription> typeMatcher() {
27+
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl");
28+
}
29+
30+
@Override
31+
public void transform(TypeTransformer transformer) {
32+
transformer.applyAdviceToMethod(
33+
isMethod()
34+
.and(named("receiveMessage"))
35+
.and(takesArguments(3))
36+
.and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest")))
37+
.and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl")))
38+
.and(takesArgument(2, named("java.time.Duration"))),
39+
ConsumerImplInstrumentation.class.getName() + "$ReceiveMessageAdvice");
40+
}
41+
42+
@SuppressWarnings("unused")
43+
public static class ReceiveMessageAdvice {
44+
45+
@Advice.OnMethodEnter(suppress = Throwable.class)
46+
public static Timer onStart() {
47+
return Timer.start();
48+
}
49+
50+
@Advice.OnMethodExit(suppress = Throwable.class)
51+
public static void onExit(
52+
@Advice.Argument(0) ReceiveMessageRequest request,
53+
@Advice.Enter Timer timer,
54+
@Advice.Return ListenableFuture<ReceiveMessageResult> future) {
55+
ReceiveSpanFinishingCallback spanFinishingCallback =
56+
new ReceiveSpanFinishingCallback(request, timer);
57+
Futures.addCallback(future, spanFinishingCallback, MoreExecutors.directExecutor());
58+
}
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
11+
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
12+
import org.apache.rocketmq.client.apis.consumer.MessageListener;
13+
import org.apache.rocketmq.client.apis.message.MessageView;
14+
15+
public final class MessageListenerWrapper implements MessageListener {
16+
private final MessageListener delegator;
17+
18+
public MessageListenerWrapper(MessageListener delegator) {
19+
this.delegator = delegator;
20+
}
21+
22+
@Override
23+
public ConsumeResult consume(MessageView messageView) {
24+
Context parentContext = VirtualFieldStore.getContextByMessage(messageView);
25+
if (parentContext == null) {
26+
parentContext = Context.current();
27+
}
28+
Instrumenter<MessageView, ConsumeResult> processInstrumenter =
29+
RocketMqSingletons.consumerProcessInstrumenter();
30+
if (!processInstrumenter.shouldStart(parentContext, messageView)) {
31+
return delegator.consume(messageView);
32+
}
33+
Context context = processInstrumenter.start(parentContext, messageView);
34+
ConsumeResult consumeResult = null;
35+
Throwable error = null;
36+
try (Scope ignored = context.makeCurrent()) {
37+
consumeResult = delegator.consume(messageView);
38+
return consumeResult;
39+
} catch (Throwable t) {
40+
error = t;
41+
throw t;
42+
} finally {
43+
processInstrumenter.end(context, messageView, consumeResult, error);
44+
}
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
7+
8+
import io.opentelemetry.context.propagation.TextMapGetter;
9+
import javax.annotation.Nullable;
10+
import org.apache.rocketmq.client.apis.message.MessageView;
11+
12+
enum MessageMapGetter implements TextMapGetter<MessageView> {
13+
INSTANCE;
14+
15+
@Override
16+
public Iterable<String> keys(MessageView carrier) {
17+
return carrier.getProperties().keySet();
18+
}
19+
20+
@Nullable
21+
@Override
22+
public String get(@Nullable MessageView carrier, String key) {
23+
return carrier == null ? null : carrier.getProperties().get(key);
24+
}
25+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import javax.annotation.Nullable;
1212
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
1313

14-
enum MapSetter implements TextMapSetter<PublishingMessageImpl> {
14+
enum MessageMapSetter implements TextMapSetter<PublishingMessageImpl> {
1515
INSTANCE;
1616

1717
@Override
Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@
2222
import net.bytebuddy.matcher.ElementMatcher;
2323
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
2424
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
25-
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
2625
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
2726
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
2827
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;
2928

30-
final class RocketMqProducerInstrumentation implements TypeInstrumentation {
29+
final class ProducerImplInstrumentation implements TypeInstrumentation {
3130

3231
/** Match the implementation of RocketMQ producer. */
3332
@Override
@@ -52,7 +51,7 @@ public void transform(TypeTransformer transformer) {
5251
.and(takesArgument(3, List.class))
5352
.and(takesArgument(4, List.class))
5453
.and(takesArgument(5, int.class)),
55-
RocketMqProducerInstrumentation.class.getName() + "$SendAdvice");
54+
ProducerImplInstrumentation.class.getName() + "$SendAdvice");
5655
}
5756

5857
@SuppressWarnings("unused")
@@ -86,34 +85,9 @@ public static void onEnter(
8685
Context context = instrumenter.start(parentContext, message);
8786
Futures.addCallback(
8887
future,
89-
new SpanFinishingCallback(instrumenter, context, message),
88+
new SendSpanFinishingCallback(instrumenter, context, message),
9089
MoreExecutors.directExecutor());
9190
}
9291
}
9392
}
94-
95-
public static class SpanFinishingCallback implements FutureCallback<SendReceiptImpl> {
96-
private final Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter;
97-
private final Context context;
98-
private final PublishingMessageImpl message;
99-
100-
public SpanFinishingCallback(
101-
Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter,
102-
Context context,
103-
PublishingMessageImpl message) {
104-
this.instrumenter = instrumenter;
105-
this.context = context;
106-
this.message = message;
107-
}
108-
109-
@Override
110-
public void onSuccess(SendReceiptImpl sendReceipt) {
111-
instrumenter.end(context, message, sendReceipt, null);
112-
}
113-
114-
@Override
115-
public void onFailure(Throwable t) {
116-
instrumenter.end(context, message, null, t);
117-
}
118-
}
11993
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.rocketmq.client.java.message.MessageImpl;
2626
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
2727

28-
final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation {
28+
final class PublishingMessageImplInstrumentation implements TypeInstrumentation {
2929

3030
@Override
3131
public ElementMatcher<TypeDescription> typeMatcher() {
@@ -44,10 +44,10 @@ public void transform(TypeTransformer transformer) {
4444
takesArgument(
4545
1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings")))
4646
.and(takesArgument(2, boolean.class)),
47-
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
47+
PublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
4848
transformer.applyAdviceToMethod(
4949
isMethod().and(named("getProperties")).and(isPublic()),
50-
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
50+
PublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
5151
}
5252

5353
@SuppressWarnings("unused")
@@ -56,7 +56,7 @@ public static class ConstructorAdvice {
5656
* The constructor of {@link PublishingMessageImpl} is always called in the same thread that
5757
* user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link
5858
* Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link
59-
* RocketMqProducerInstrumentation}.
59+
* ProducerImplInstrumentation}.
6060
*/
6161
@Advice.OnMethodExit(suppress = Throwable.class)
6262
public static void onExit(@Advice.This PublishingMessageImpl message) {
@@ -66,7 +66,7 @@ public static void onExit(@Advice.This PublishingMessageImpl message) {
6666

6767
@SuppressWarnings("unused")
6868
public static class GetPropertiesAdvice {
69-
/** Update the message properties to propagate context recorded by {@link MapSetter}. */
69+
/** Update the message properties to propagate context recorded by {@link MessageMapSetter}. */
7070
@Advice.OnMethodExit(suppress = Throwable.class)
7171
public static void onExit(
7272
@Advice.This MessageImpl messageImpl,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
7+
8+
import apache.rocketmq.v2.ReceiveMessageRequest;
9+
import io.opentelemetry.context.Context;
10+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
11+
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
12+
import java.util.List;
13+
import org.apache.rocketmq.client.apis.message.MessageView;
14+
import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult;
15+
import org.apache.rocketmq.client.java.message.MessageViewImpl;
16+
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
17+
18+
public final class ReceiveSpanFinishingCallback implements FutureCallback<ReceiveMessageResult> {
19+
20+
private final ReceiveMessageRequest request;
21+
private final Timer timer;
22+
23+
public ReceiveSpanFinishingCallback(ReceiveMessageRequest request, Timer timer) {
24+
this.request = request;
25+
this.timer = timer;
26+
}
27+
28+
@Override
29+
public void onSuccess(ReceiveMessageResult receiveMessageResult) {
30+
List<MessageViewImpl> messageViews = receiveMessageResult.getMessageViewImpls();
31+
// Don't create spans when no messages were received.
32+
if (messageViews.isEmpty()) {
33+
return;
34+
}
35+
String consumerGroup = request.getGroup().getName();
36+
for (MessageViewImpl messageView : messageViews) {
37+
VirtualFieldStore.setConsumerGroupByMessage(messageView, consumerGroup);
38+
}
39+
Instrumenter<ReceiveMessageRequest, List<MessageView>> receiveInstrumenter =
40+
RocketMqSingletons.consumerReceiveInstrumenter();
41+
Context parentContext = Context.current();
42+
if (receiveInstrumenter.shouldStart(parentContext, request)) {
43+
Context context =
44+
InstrumenterUtil.startAndEnd(
45+
receiveInstrumenter,
46+
parentContext,
47+
request,
48+
null,
49+
null,
50+
timer.startTime(),
51+
timer.now());
52+
for (MessageViewImpl messageView : messageViews) {
53+
VirtualFieldStore.setContextByMessage(messageView, context);
54+
}
55+
}
56+
}
57+
58+
@Override
59+
public void onFailure(Throwable throwable) {
60+
Instrumenter<ReceiveMessageRequest, List<MessageView>> receiveInstrumenter =
61+
RocketMqSingletons.consumerReceiveInstrumenter();
62+
Context parentContext = Context.current();
63+
if (receiveInstrumenter.shouldStart(parentContext, request)) {
64+
InstrumenterUtil.startAndEnd(
65+
receiveInstrumenter,
66+
parentContext,
67+
request,
68+
null,
69+
throwable,
70+
timer.startTime(),
71+
timer.now());
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)