Skip to content

Commit 96b297b

Browse files
committed
Fix issues
1 parent 86c6977 commit 96b297b

File tree

13 files changed

+351
-237
lines changed

13 files changed

+351
-237
lines changed

instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ dependencies {
2020
tasks {
2121
val testReceiveSpanDisabled by registering(Test::class) {
2222
filter {
23-
includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessageWithReceiveSpanSuppressed")
23+
includeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
2424
}
25-
include("**/RocketMqClientTest.*")
25+
include("**/RocketMqClientSuppressReceiveSpanTest.*")
2626
}
2727

2828
test {
2929
filter {
30-
includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessage")
30+
excludeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
3131
}
3232
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
3333
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
9+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
10+
import static net.bytebuddy.matcher.ElementMatchers.named;
11+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
12+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
13+
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
15+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
16+
import net.bytebuddy.asm.Advice;
17+
import net.bytebuddy.description.type.TypeDescription;
18+
import net.bytebuddy.matcher.ElementMatcher;
19+
import org.apache.rocketmq.client.apis.consumer.MessageListener;
20+
21+
final class ConsumeServiceInstrumentation implements TypeInstrumentation {
22+
@Override
23+
public ElementMatcher<TypeDescription> typeMatcher() {
24+
// Instrument ConsumerService instead of MessageListener because lambda could not be enhanced.
25+
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService");
26+
}
27+
28+
@Override
29+
public void transform(TypeTransformer transformer) {
30+
transformer.applyAdviceToMethod(
31+
isConstructor()
32+
.and(
33+
isPublic()
34+
.and(takesArguments(5))
35+
.and(
36+
takesArgument(
37+
1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))),
38+
ConsumeServiceInstrumentation.class.getName() + "$ConstructorAdvice");
39+
}
40+
41+
@SuppressWarnings("unused")
42+
public static class ConstructorAdvice {
43+
@Advice.OnMethodEnter(suppress = Throwable.class)
44+
public static void onEnter(
45+
@Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) {
46+
// Replace messageListener by wrapper.
47+
if (!(messageListener instanceof MessageListenerWrapper)) {
48+
messageListener = new MessageListenerWrapper(messageListener);
49+
}
50+
}
51+
}
52+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture;
2222
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
2323

24-
final class RocketMqConsumerInstrumentation implements TypeInstrumentation {
24+
final class ConsumerImplInstrumentation implements TypeInstrumentation {
2525
@Override
2626
public ElementMatcher<TypeDescription> typeMatcher() {
2727
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl");
@@ -36,7 +36,7 @@ public void transform(TypeTransformer transformer) {
3636
.and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest")))
3737
.and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl")))
3838
.and(takesArgument(2, named("java.time.Duration"))),
39-
RocketMqConsumerInstrumentation.class.getName() + "$ReceiveMessageAdvice");
39+
ConsumerImplInstrumentation.class.getName() + "$ReceiveMessageAdvice");
4040
}
4141

4242
@SuppressWarnings("unused")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
7+
8+
import io.opentelemetry.context.Context;
9+
import io.opentelemetry.context.Scope;
10+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
11+
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
12+
import org.apache.rocketmq.client.apis.consumer.MessageListener;
13+
import org.apache.rocketmq.client.apis.message.MessageView;
14+
15+
public final class MessageListenerWrapper implements MessageListener {
16+
private final MessageListener delegator;
17+
18+
public MessageListenerWrapper(MessageListener delegator) {
19+
this.delegator = delegator;
20+
}
21+
22+
@Override
23+
public ConsumeResult consume(MessageView messageView) {
24+
Context parentContext = VirtualFieldStore.getContextByMessage(messageView);
25+
if (parentContext == null) {
26+
parentContext = Context.current();
27+
}
28+
Instrumenter<MessageView, ConsumeResult> processInstrumenter =
29+
RocketMqSingletons.consumerProcessInstrumenter();
30+
if (!processInstrumenter.shouldStart(parentContext, messageView)) {
31+
return delegator.consume(messageView);
32+
}
33+
Context context = processInstrumenter.start(parentContext, messageView);
34+
try (Scope ignored = context.makeCurrent()) {
35+
ConsumeResult consumeResult = delegator.consume(messageView);
36+
processInstrumenter.end(context, messageView, consumeResult, null);
37+
return consumeResult;
38+
} catch (Throwable t) {
39+
processInstrumenter.end(context, messageView, null, t);
40+
throw t;
41+
}
42+
}
43+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
2727
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;
2828

29-
final class RocketMqProducerInstrumentation implements TypeInstrumentation {
29+
final class ProducerImplInstrumentation implements TypeInstrumentation {
3030

3131
/** Match the implementation of RocketMQ producer. */
3232
@Override
@@ -51,7 +51,7 @@ public void transform(TypeTransformer transformer) {
5151
.and(takesArgument(3, List.class))
5252
.and(takesArgument(4, List.class))
5353
.and(takesArgument(5, int.class)),
54-
RocketMqProducerInstrumentation.class.getName() + "$SendAdvice");
54+
ProducerImplInstrumentation.class.getName() + "$SendAdvice");
5555
}
5656

5757
@SuppressWarnings("unused")
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.rocketmq.client.java.message.MessageImpl;
2626
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
2727

28-
final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation {
28+
final class PublishingMessageImplInstrumentation implements TypeInstrumentation {
2929

3030
@Override
3131
public ElementMatcher<TypeDescription> typeMatcher() {
@@ -44,10 +44,10 @@ public void transform(TypeTransformer transformer) {
4444
takesArgument(
4545
1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings")))
4646
.and(takesArgument(2, boolean.class)),
47-
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
47+
PublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
4848
transformer.applyAdviceToMethod(
4949
isMethod().and(named("getProperties")).and(isPublic()),
50-
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
50+
PublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
5151
}
5252

5353
@SuppressWarnings("unused")
@@ -56,7 +56,7 @@ public static class ConstructorAdvice {
5656
* The constructor of {@link PublishingMessageImpl} is always called in the same thread that
5757
* user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link
5858
* Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link
59-
* RocketMqProducerInstrumentation}.
59+
* ProducerImplInstrumentation}.
6060
*/
6161
@Advice.OnMethodExit(suppress = Throwable.class)
6262
public static void onExit(@Advice.This PublishingMessageImpl message) {

instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public RocketMqInstrumentationModule() {
2121
@Override
2222
public List<TypeInstrumentation> typeInstrumentations() {
2323
return asList(
24-
new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation(),
25-
new RocketMqConsumerInstrumentation(), new RocketMqMessageListenerInstrumentation());
24+
new PublishingMessageImplInstrumentation(), new ProducerImplInstrumentation(),
25+
new ConsumerImplInstrumentation(), new ConsumeServiceInstrumentation());
2626
}
2727
}

instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,7 @@ public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProduce
4343
INSTRUMENTATION_NAME,
4444
MessagingSpanNameExtractor.create(getter, operation))
4545
.addAttributesExtractor(attributesExtractor)
46-
.addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE)
47-
.setSpanStatusExtractor(
48-
(spanStatusBuilder, message, sendReceipt, error) -> {
49-
if (error != null) {
50-
spanStatusBuilder.setStatus(StatusCode.ERROR);
51-
}
52-
});
46+
.addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE);
5347
return instrumenterBuilder.buildProducerInstrumenter(MessageMapSetter.INSTANCE);
5448
}
5549

@@ -69,13 +63,7 @@ public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProduce
6963
MessagingSpanNameExtractor.create(getter, operation))
7064
.setEnabled(enabled)
7165
.addAttributesExtractor(attributesExtractor)
72-
.addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE)
73-
.setSpanStatusExtractor(
74-
(spanStatusBuilder, messageView, unused, error) -> {
75-
if (error != null) {
76-
spanStatusBuilder.setStatus(StatusCode.ERROR);
77-
}
78-
});
66+
.addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE);
7967
return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
8068
}
8169

instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java

Lines changed: 0 additions & 94 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.rocketmqclient.v5_0;
7+
8+
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
9+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
10+
import org.junit.jupiter.api.extension.RegisterExtension;
11+
12+
public class RocketMqClientSuppressReceiveSpanTest
13+
extends AbstractRocketMqClientSuppressReceiveSpanTest {
14+
@RegisterExtension
15+
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
16+
17+
@Override
18+
protected InstrumentationExtension testing() {
19+
return testing;
20+
}
21+
}

0 commit comments

Comments
 (0)