Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
<suppress checks="IllegalImport" files=".*[/\\]com[/\\]azure[/\\]core[/\\](metrics|tracing)[/\\]opentelemetry[/\\]*"/>
<suppress checks="IllegalImport" files=".*[/\\]com[/\\]azure[/\\]monitor[/\\]opentelemetry[/\\]exporter[/\\]*"/>
<suppress checks="IllegalImport" files=".*[/\\]com[/\\]azure[/\\]identity[/\\]*"/>
<suppress checks="IllegalImport" files="com.azure.messaging.servicebus.TracingIntegrationTests.java"/>

<!-- Suppress warnings for Event Processor until the usage of "Client" is discussed and resolved:
https://github.com/Azure/azure-sdk/issues/321 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ public String getCallConnectionId() {
*
* @return the mediaSubscriptionId value.
*/
public String getMediaSubscriptionId() {
public String getMediaSubscriptionId() {
return mediaSubscriptionId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private SpanBuilder createSpanBuilder(String spanName,
SpanBuilder spanBuilder = tracer.spanBuilder(spanNameKey)
.setSpanKind(spanKind);

io.opentelemetry.context.Context parentContext = getTraceContextOrDefault(context, io.opentelemetry.context.Context.current());
io.opentelemetry.context.Context parentContext = getTraceContextOrDefault(context, io.opentelemetry.context.Context.current());
// if remote parent is provided, it has higher priority
if (remoteParentContext != null) {
spanBuilder.setParent(parentContext.with(Span.wrap(remoteParentContext)));
Expand Down
21 changes: 21 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,27 @@
<version>4.5.1</version> <!-- {x-version-update;org.mockito:mockito-core;external_dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-tracing-opentelemetry</artifactId>
<version>1.0.0-beta.28</version> <!-- {x-version-update;com.azure:azure-core-tracing-opentelemetry;dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.14.0</version> <!-- {x-version-update;io.opentelemetry:opentelemetry-api;external_dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>1.14.0</version> <!-- {x-version-update;io.opentelemetry:opentelemetry-sdk;external_dependency} -->
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.util.Context;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiverTracer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;

import java.util.Objects;

/**
* Flux operator that traces receive and process calls
*/
final class FluxTrace extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
static final String PROCESS_ERROR_KEY = "process-error";
private final ServiceBusReceiverTracer tracer;

FluxTrace(Flux<? extends ServiceBusMessageContext> upstream, ServiceBusReceiverTracer tracer) {
super(upstream);
this.tracer = tracer;
}

@Override
public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");

source.subscribe(new TracingSubscriber(coreSubscriber, tracer));
}

private static class TracingSubscriber extends BaseSubscriber<ServiceBusMessageContext> {

private final CoreSubscriber<? super ServiceBusMessageContext> downstream;
private final ServiceBusReceiverTracer tracer;
TracingSubscriber(CoreSubscriber<? super ServiceBusMessageContext> downstream, ServiceBusReceiverTracer tracer) {
this.downstream = downstream;
this.tracer = tracer;
}

@Override
public reactor.util.context.Context currentContext() {
return downstream.currentContext();
}

@Override
protected void hookOnSubscribe(Subscription subscription) {
downstream.onSubscribe(this);
}

@Override
protected void hookOnNext(ServiceBusMessageContext message) {
if (tracer == null || tracer.isSync()) {
downstream.onNext(message);
return;
}

Throwable exception = null;
Context span = tracer.startProcessSpan("ServiceBus.process", message.getMessage(), Context.NONE);
AutoCloseable scope = tracer.makeSpanCurrent(span);

try {
downstream.onNext(message);
} catch (Throwable t) {
exception = t;
} finally {
Context context = message.getMessage().getContext();
if (context != null) {
Object processorException = context.getData(PROCESS_ERROR_KEY).orElse(null);
if (processorException instanceof Throwable) {
exception = (Throwable) processorException;
}
}
tracer.endSpan(exception, span, scope);
}
}

@Override
protected void hookOnError(Throwable throwable) {
downstream.onError(throwable);
}

@Override
protected void hookOnComplete() {
downstream.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.amqp.models.CbsAuthorizationType;
import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.annotation.ServiceClientProtocol;
Expand All @@ -34,14 +33,15 @@
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiverTracer;
import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential;
import com.azure.messaging.servicebus.implementation.ServiceBusTracer;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
Expand All @@ -59,7 +59,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand Down Expand Up @@ -212,7 +211,6 @@ public final class ServiceBusClientBuilder implements

private final Object connectionLock = new Object();
private final MessageSerializer messageSerializer = new ServiceBusMessageSerializer();
private final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));

private ClientOptions clientOptions;
private Configuration configuration;
Expand Down Expand Up @@ -963,8 +961,9 @@ public ServiceBusSenderAsyncClient buildAsyncClient() {
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusSenderTracer tracer = new ServiceBusSenderTracer(ServiceBusTracer.getDefaultTracer(), connectionProcessor.getFullyQualifiedNamespace(), entityName);
return new ServiceBusSenderAsyncClient(entityName, entityType, connectionProcessor, retryOptions,
tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier);
tracer, messageSerializer, ServiceBusClientBuilder.this::onClientClose, null, clientIdentifier);
}

/**
Expand Down Expand Up @@ -1047,8 +1046,7 @@ public final class ServiceBusSessionProcessorClientBuilder {
private ServiceBusSessionProcessorClientBuilder() {
sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder();
processorClientOptions = new ServiceBusProcessorClientOptions()
.setMaxConcurrentCalls(1)
.setTracerProvider(tracerProvider);
.setMaxConcurrentCalls(1);
sessionReceiverClientBuilder.maxConcurrentSessions(1);
}

Expand Down Expand Up @@ -1444,11 +1442,11 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
}

final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
connectionProcessor, tracerProvider, messageSerializer, receiverOptions, clientIdentifier);

connectionProcessor, messageSerializer, receiverOptions, clientIdentifier);
final ServiceBusReceiverTracer tracer = new ServiceBusReceiverTracer(ServiceBusTracer.getDefaultTracer(), connectionProcessor.getFullyQualifiedNamespace(), entityPath, false);
return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, sessionManager);
tracer, messageSerializer, ServiceBusClientBuilder.this::onClientClose, sessionManager);
}

/**
Expand All @@ -1466,7 +1464,7 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusSessionReceiverAsyncClient buildAsyncClient() {
return buildAsyncClient(true);
return buildAsyncClient(true, false);
}

/**
Expand All @@ -1484,12 +1482,12 @@ public ServiceBusSessionReceiverAsyncClient buildAsyncClient() {
*/
public ServiceBusSessionReceiverClient buildClient() {
final boolean isPrefetchDisabled = prefetchCount == 0;
return new ServiceBusSessionReceiverClient(buildAsyncClient(false),
return new ServiceBusSessionReceiverClient(buildAsyncClient(false, true),
isPrefetchDisabled,
MessageUtils.getTotalTimeout(retryOptions));
}

private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, boolean syncConsumer) {
final MessagingEntityType entityType = validateEntityPaths(connectionStringEntityName, topicName,
queueName);
final String entityPath = getEntityPath(entityType, queueName, topicName, subscriptionName,
Expand Down Expand Up @@ -1520,8 +1518,10 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusReceiverTracer tracer = new ServiceBusReceiverTracer(ServiceBusTracer.getDefaultTracer(),
connectionProcessor.getFullyQualifiedNamespace(), entityPath, syncConsumer);
return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer,
entityPath, entityType, receiverOptions, connectionProcessor, tracer, messageSerializer,
ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
}
}
Expand Down Expand Up @@ -1582,8 +1582,7 @@ public final class ServiceBusProcessorClientBuilder {
private ServiceBusProcessorClientBuilder() {
serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder();
processorClientOptions = new ServiceBusProcessorClientOptions()
.setMaxConcurrentCalls(1)
.setTracerProvider(tracerProvider);
.setMaxConcurrentCalls(1);
}

/**
Expand Down Expand Up @@ -1908,7 +1907,7 @@ public ServiceBusReceiverClientBuilder topicName(String topicName) {
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverAsyncClient buildAsyncClient() {
return buildAsyncClient(true);
return buildAsyncClient(true, false);
}

/**
Expand All @@ -1926,12 +1925,12 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
*/
public ServiceBusReceiverClient buildClient() {
final boolean isPrefetchDisabled = prefetchCount == 0;
return new ServiceBusReceiverClient(buildAsyncClient(false),
return new ServiceBusReceiverClient(buildAsyncClient(false, true),
isPrefetchDisabled,
MessageUtils.getTotalTimeout(retryOptions));
}

ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, boolean syncConsumer) {
final MessagingEntityType entityType = validateEntityPaths(connectionStringEntityName, topicName,
queueName);
final String entityPath = getEntityPath(entityType, queueName, topicName, subscriptionName,
Expand Down Expand Up @@ -1962,9 +1961,11 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
clientIdentifier = UUID.randomUUID().toString();
}

final ServiceBusReceiverTracer tracer = new ServiceBusReceiverTracer(ServiceBusTracer.getDefaultTracer(),
connectionProcessor.getFullyQualifiedNamespace(), entityPath, syncConsumer);
return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
tracer, messageSerializer, ServiceBusClientBuilder.this::onClientClose, clientIdentifier);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.message.Message;

Expand All @@ -17,7 +16,6 @@
import java.util.Locale;
import java.util.Objects;

import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan;

/**
* A class for aggregating {@link ServiceBusMessage messages} into a single, size-limited, batch. It is treated as a
Expand All @@ -31,19 +29,19 @@ public final class ServiceBusMessageBatch {
private final List<ServiceBusMessage> serviceBusMessageList;
private final byte[] eventBytes;
private int sizeInBytes;
private final TracerProvider tracerProvider;
private final ServiceBusSenderTracer tracer;
private final String entityPath;
private final String hostname;

ServiceBusMessageBatch(int maxMessageSize, ErrorContextProvider contextProvider, TracerProvider tracerProvider,
ServiceBusMessageBatch(int maxMessageSize, ErrorContextProvider contextProvider, ServiceBusSenderTracer tracer,
MessageSerializer serializer, String entityPath, String hostname) {
this.maxMessageSize = maxMessageSize;
this.contextProvider = contextProvider;
this.serializer = serializer;
this.serviceBusMessageList = new ArrayList<>();
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
this.eventBytes = new byte[maxMessageSize];
this.tracerProvider = tracerProvider;
this.tracer = tracer;
this.entityPath = entityPath;
this.hostname = hostname;
}
Expand Down Expand Up @@ -94,15 +92,11 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
if (serviceBusMessage == null) {
throw LOGGER.logExceptionAsWarning(new NullPointerException("'serviceBusMessage' cannot be null"));
}
ServiceBusMessage serviceBusMessageUpdated =
tracerProvider.isEnabled()
? traceMessageSpan(serviceBusMessage, serviceBusMessage.getContext(), hostname, entityPath,
tracerProvider)
: serviceBusMessage;
tracer.createMessageSpan(serviceBusMessage);

final int size;
try {
size = getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty());
size = getSize(serviceBusMessage, serviceBusMessageList.isEmpty());
} catch (BufferOverflowException exception) {
final RuntimeException ex = new ServiceBusException(
new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
Expand All @@ -117,7 +111,7 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
}

this.sizeInBytes += size;
this.serviceBusMessageList.add(serviceBusMessageUpdated);
this.serviceBusMessageList.add(serviceBusMessage);
return true;
}

Expand Down
Loading