Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,23 @@ dependencies {

testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing"))
}

tasks {
val testReceiveSpanDisabled by registering(Test::class) {
filter {
includeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
}
include("**/RocketMqClientSuppressReceiveSpanTest.*")
}

test {
filter {
excludeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}

check {
dependsOn(testReceiveSpanDisabled)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.apis.consumer.MessageListener;

final class ConsumeServiceInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// Instrument ConsumerService instead of MessageListener because lambda could not be enhanced.
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(
isPublic()
.and(takesArguments(5))
.and(
takesArgument(
1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))),
ConsumeServiceInstrumentation.class.getName() + "$ConstructorAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) {
// Replace messageListener by wrapper.
if (!(messageListener instanceof MessageListenerWrapper)) {
messageListener = new MessageListenerWrapper(messageListener);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;

final class ConsumerImplInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("receiveMessage"))
.and(takesArguments(3))
.and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest")))
.and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl")))
.and(takesArgument(2, named("java.time.Duration"))),
ConsumerImplInstrumentation.class.getName() + "$ReceiveMessageAdvice");
}

@SuppressWarnings("unused")
public static class ReceiveMessageAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static Timer onStart() {
return Timer.start();
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) ReceiveMessageRequest request,
@Advice.Enter Timer timer,
@Advice.Return ListenableFuture<ReceiveMessageResult> future) {
ReceiveSpanFinishingCallback spanFinishingCallback =
new ReceiveSpanFinishingCallback(request, timer);
Futures.addCallback(future, spanFinishingCallback, MoreExecutors.directExecutor());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.message.MessageView;

public final class MessageListenerWrapper implements MessageListener {
private final MessageListener delegator;

public MessageListenerWrapper(MessageListener delegator) {
this.delegator = delegator;
}

@Override
public ConsumeResult consume(MessageView messageView) {
Context parentContext = VirtualFieldStore.getContextByMessage(messageView);
if (parentContext == null) {
parentContext = Context.current();
}
Instrumenter<MessageView, ConsumeResult> processInstrumenter =
RocketMqSingletons.consumerProcessInstrumenter();
if (!processInstrumenter.shouldStart(parentContext, messageView)) {
return delegator.consume(messageView);
}
Context context = processInstrumenter.start(parentContext, messageView);
try (Scope ignored = context.makeCurrent()) {
ConsumeResult consumeResult = delegator.consume(messageView);
processInstrumenter.end(context, messageView, consumeResult, null);
return consumeResult;
} catch (Throwable t) {
processInstrumenter.end(context, messageView, null, t);
throw t;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.context.propagation.TextMapGetter;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.apis.message.MessageView;

enum MessageMapGetter implements TextMapGetter<MessageView> {
INSTANCE;

@Override
public Iterable<String> keys(MessageView carrier) {
return carrier.getProperties().keySet();
}

@Nullable
@Override
public String get(@Nullable MessageView carrier, String key) {
return carrier == null ? null : carrier.getProperties().get(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

enum MapSetter implements TextMapSetter<PublishingMessageImpl> {
enum MessageMapSetter implements TextMapSetter<PublishingMessageImpl> {
INSTANCE;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;

final class RocketMqProducerInstrumentation implements TypeInstrumentation {
final class ProducerImplInstrumentation implements TypeInstrumentation {

/** Match the implementation of RocketMQ producer. */
@Override
Expand All @@ -52,7 +51,7 @@ public void transform(TypeTransformer transformer) {
.and(takesArgument(3, List.class))
.and(takesArgument(4, List.class))
.and(takesArgument(5, int.class)),
RocketMqProducerInstrumentation.class.getName() + "$SendAdvice");
ProducerImplInstrumentation.class.getName() + "$SendAdvice");
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -86,34 +85,9 @@ public static void onEnter(
Context context = instrumenter.start(parentContext, message);
Futures.addCallback(
future,
new SpanFinishingCallback(instrumenter, context, message),
new SendSpanFinishingCallback(instrumenter, context, message),
MoreExecutors.directExecutor());
}
}
}

public static class SpanFinishingCallback implements FutureCallback<SendReceiptImpl> {
private final Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter;
private final Context context;
private final PublishingMessageImpl message;

public SpanFinishingCallback(
Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter,
Context context,
PublishingMessageImpl message) {
this.instrumenter = instrumenter;
this.context = context;
this.message = message;
}

@Override
public void onSuccess(SendReceiptImpl sendReceipt) {
instrumenter.end(context, message, sendReceipt, null);
}

@Override
public void onFailure(Throwable t) {
instrumenter.end(context, message, null, t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.rocketmq.client.java.message.MessageImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation {
final class PublishingMessageImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
Expand All @@ -44,10 +44,10 @@ public void transform(TypeTransformer transformer) {
takesArgument(
1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings")))
.and(takesArgument(2, boolean.class)),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
PublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
transformer.applyAdviceToMethod(
isMethod().and(named("getProperties")).and(isPublic()),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
PublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
}

@SuppressWarnings("unused")
Expand All @@ -56,7 +56,7 @@ public static class ConstructorAdvice {
* The constructor of {@link PublishingMessageImpl} is always called in the same thread that
* user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link
* Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link
* RocketMqProducerInstrumentation}.
* ProducerImplInstrumentation}.
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.This PublishingMessageImpl message) {
Expand All @@ -66,7 +66,7 @@ public static void onExit(@Advice.This PublishingMessageImpl message) {

@SuppressWarnings("unused")
public static class GetPropertiesAdvice {
/** Update the message properties to propagate context recorded by {@link MapSetter}. */
/** Update the message properties to propagate context recorded by {@link MessageMapSetter}. */
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This MessageImpl messageImpl,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import java.util.List;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;

public final class ReceiveSpanFinishingCallback implements FutureCallback<ReceiveMessageResult> {

private final ReceiveMessageRequest request;
private final Timer timer;

public ReceiveSpanFinishingCallback(ReceiveMessageRequest request, Timer timer) {
this.request = request;
this.timer = timer;
}

@Override
public void onSuccess(ReceiveMessageResult receiveMessageResult) {
List<MessageViewImpl> messageViews = receiveMessageResult.getMessageViewImpls();
// Don't create spans when no messages were received.
if (messageViews.isEmpty()) {
return;
}
String consumerGroup = request.getGroup().getName();
for (MessageViewImpl messageView : messageViews) {
VirtualFieldStore.setConsumerGroupByMessage(messageView, consumerGroup);
}
Instrumenter<ReceiveMessageRequest, List<MessageView>> receiveInstrumenter =
RocketMqSingletons.consumerReceiveInstrumenter();
Context parentContext = Context.current();
if (receiveInstrumenter.shouldStart(parentContext, request)) {
Context context =
InstrumenterUtil.startAndEnd(
receiveInstrumenter,
parentContext,
request,
null,
null,
timer.startTime(),
timer.now());
for (MessageViewImpl messageView : messageViews) {
VirtualFieldStore.setContextByMessage(messageView, context);
}
}
}

@Override
public void onFailure(Throwable throwable) {
Instrumenter<ReceiveMessageRequest, List<MessageView>> receiveInstrumenter =
RocketMqSingletons.consumerReceiveInstrumenter();
Context parentContext = Context.current();
if (receiveInstrumenter.shouldStart(parentContext, request)) {
InstrumenterUtil.startAndEnd(
receiveInstrumenter,
parentContext,
request,
null,
throwable,
timer.startTime(),
timer.now());
}
}
}
Loading