-
Notifications
You must be signed in to change notification settings - Fork 202
Description
Hey,
I have been mucking around with getting XRay support for my message listener library and I use ElasticMQ for my local development.
I am using the AWS XRay SDK V2 Instrumentor to automatically populate the trace information into the request, more specifically you can see this being completed by the TracingInterceptor. This interceptor will auto inject itself into the default SqsAsyncClient that is built.
While using ElasticMQ for my mock SQS Server, the AWSTraceHeader message attribute is not being returned in my SQS message however when I switched to a SQS queue running on AWS the trace header was present.
It isn't a big deal as I can test this using an actual AWS queue but just wanting to document this if someone else has the same problem. I have attached minimal reproducable examples for it not working with ElasticMQ and the other working with the actual SQS queue on AWS.
Using ElasticMQ
Shows that the XRay Trace Header is not present in the request. Note that the trace header is automatically being included via the com.amazonaws:aws-xray-recorder-sdk-aws-sdk-v2-instrumentor:2.6.1 dependency.
Dependencies
dependencies {
implementation(platform("software.amazon.awssdk:bom:2.13.7"))
implementation("software.amazon.awssdk:sqs")
implementation("org.slf4j:slf4j-api:1.7.30")
implementation("ch.qos.logback:logback-core:1.2.3")
implementation("ch.qos.logback:logback-classic:1.2.3")
implementation("com.amazonaws:aws-xray-recorder-sdk-aws-sdk-v2-instrumentor:2.6.1")
implementation("org.elasticmq:elasticmq-rest-sqs_2.12:0.15.6")
}Java Main
package com.minimal;
import akka.http.scaladsl.Http;
import com.amazonaws.xray.AWSXRay;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import java.net.URI;
import java.util.List;
public class MinReproducableApplication {
private static final Logger log = LoggerFactory.getLogger(MinReproducableApplication.class);
public static void main(String[] args) throws Exception {
SQSRestServer sqsRestServer = SQSRestServerBuilder
.withInterface("localhost")
.withDynamicPort()
.start();
Http.ServerBinding serverBinding = sqsRestServer.waitUntilStarted();
String serverUrl = "http://localhost:" + serverBinding.localAddress().getPort();
SqsAsyncClient client = SqsAsyncClient.builder()
.endpointOverride(new URI(serverUrl))
.region(Region.of("elasticmq"))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("accessKeyId", "secretAccessKey")))
.build();
AWSXRay.beginSegment("some-segment");
String queueUrl = client.createQueue(builder -> builder.queueName("test-queue"))
.thenApply(CreateQueueResponse::queueUrl)
.get();
client.sendMessage(builder -> builder.queueUrl(queueUrl).messageBody("some message")).get();
List<Message> messages = client.receiveMessage(builder -> builder
.visibilityTimeout(30)
.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.ALL)
.messageAttributeNames(QueueAttributeName.ALL.toString())
.maxNumberOfMessages(1)
.waitTimeSeconds(20)
).get().messages();
AWSXRay.endSegment();
log.info("Message body: {}, Tracer Header: {}", messages.get(0).body(), messages.get(0).attributes().get(MessageSystemAttributeName.AWS_TRACE_HEADER));
sqsRestServer.stopAndWait();
}
}Output
17:13:54.620 [main] INFO com.minimal.MinReproducableApplication - Message body: some message, Tracer Header: null
Other information
It looks like the header is being sent in the request as this is the information being sent to the ElasticMQ server (note the trace ID at the bottom):
17:28:47.716 [aws-java-sdk-NettyEventLoop-1-8] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x795e1038, L:/127.0.0.1:57337 - R:localhost/127.0.0.1:57272] WRITE: software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor$StreamedRequest(DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
POST / HTTP/1.1
Host: localhost:57272
amz-sdk-invocation-id: 9a3007f8-13cf-6e00-adcd-f979d98d3c31
amz-sdk-retry: 0/0/
Authorization: AWS4-HMAC-SHA256 Credential=accessKeyId/20200709/elasticmq/sqs/aws4_request, SignedHeaders=amz-sdk-invocation-id;amz-sdk-retry;content-length;content-type;host;x-amz-date, Signature=e6ee8c28f4bb4e229b2a01876ef6c73af52b1d4017c976e54fd03ac49db4281c
Content-Length: 123
Content-Type: application/x-www-form-urlencoded; charset=utf-8
User-Agent: aws-sdk-java/2.13.7 Windows_10/10.0 Java_HotSpot_TM__64-Bit_Server_VM/25.161-b12 Java/1.8.0_161 scala/2.12.11 vendor/Oracle_Corporation io/async http/NettyNio
X-Amz-Date: 20200709T222847Z
X-Amzn-Trace-Id: Root=1-5f079a08-08c0c38edf05408e0c2a49ce;Parent=3b4e01bcf499fd34;Sampled=1)
Example showing XRay working with AWS
This shows that the XRAY trace header is being included in a SQS message when it is using on an actual AWS SQS queue. Replace the variables with your own IAM key ID, IAM secret key, AWS Queue Url and AWS Region.
Dependencies
dependencies {
implementation(platform("software.amazon.awssdk:bom:2.13.7"))
implementation("software.amazon.awssdk:sqs")
implementation("org.slf4j:slf4j-api:1.7.30")
implementation("ch.qos.logback:logback-core:1.2.3")
implementation("ch.qos.logback:logback-classic:1.2.3")
implementation("com.amazonaws:aws-xray-recorder-sdk-aws-sdk-v2-instrumentor:2.6.1")
}Java Main
package com.minimal;
import com.amazonaws.xray.AWSXRay;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import java.util.List;
public class MinReproducableApplication {
private static final Logger log = LoggerFactory.getLogger(MinReproducableApplication.class);
public static void main(String[] args) throws Exception {
String region = "{insertRegionHere}";
String accessKey = "{insertAccessKeyHere}";
String secretKey = "{insertSecretKeyHere}";
String queueUrl = "{insertQueueUrlHere}";
SqsAsyncClient client = SqsAsyncClient.builder()
.region(Region.of(region))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)))
.build();
AWSXRay.beginSegment("some-segment");
client.sendMessage(builder -> builder.queueUrl(queueUrl).messageBody("some message")).get();
List<Message> messages = client.receiveMessage(builder -> builder
.visibilityTimeout(30)
.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.ALL)
.messageAttributeNames(QueueAttributeName.ALL.toString())
.maxNumberOfMessages(1)
.waitTimeSeconds(20)
).get().messages();
AWSXRay.endSegment();
log.info("Message body: {}, Tracer Header: {}", messages.get(0).body(), messages.get(0).attributes().get(MessageSystemAttributeName.AWS_TRACE_HEADER));
}
}Output
17:20:01.497 [main] INFO com.minimal.MinReproducableApplication - Message body: some message, Tracer Header: Root=1-5f07980f-7e310a9e3becb5a6386ce0e6;Parent=3ee6c885d5b1663b;Sampled=1