From de6aa0a597185992d10f85f33b7bab66dcaa1ed0 Mon Sep 17 00:00:00 2001 From: Antoine Tremblay Date: Wed, 2 Jun 2021 13:52:28 -0400 Subject: [PATCH] contrib/confluentinc/confluent-kafka-go/kafka: Use kafka headers as context when producing Before this patch when producing a message, the general producer context was used when producing messages. This made it impossible to attach a parent span to a produced message. With this patch messages headers are checked when producing messages for a span context and the span is created as a child if a parent span is present. This patch deprecates the WithContext() option for consumer and producers. Fixed #941 --- .../confluent-kafka-go/kafka/example_test.go | 85 +++++++++++++++++++ .../confluent-kafka-go/kafka/kafka.go | 5 ++ .../confluent-kafka-go/kafka/kafka_test.go | 70 +++++++++++++++ .../confluent-kafka-go/kafka/option.go | 2 + 4 files changed, 162 insertions(+) create mode 100644 contrib/confluentinc/confluent-kafka-go/kafka/example_test.go diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/example_test.go b/contrib/confluentinc/confluent-kafka-go/kafka/example_test.go new file mode 100644 index 0000000000..87f6e6c5e4 --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/kafka/example_test.go @@ -0,0 +1,85 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package kafka_test + +import ( + "fmt" + + "github.com/confluentinc/confluent-kafka-go/kafka" + kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +var ( + testGroupID = "gotest" + testTopic = "gotest" +) + +// This example shows how a span context can be passed from a producer to a consumer. +func ExampleDistributedTracing() { + + tracer.Start() + defer tracer.Stop() + + c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{ + "go.events.channel.enable": true, // required for the events channel to be turned on + "group.id": testGroupID, + "socket.timeout.ms": 10, + "session.timeout.ms": 10, + "enable.auto.offset.store": false, + }) + + err = c.Subscribe(testTopic, nil) + if err != nil { + panic(err) + } + + // Create the span to be passed + parentSpan := tracer.StartSpan("test_parent_span") + + /// Produce a message with a span + go func() { + msg := &kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &testTopic, + Partition: 1, + Offset: 1, + }, + Key: []byte("key1"), + Value: []byte("value1"), + } + + // Inject the span context in the message to be produced + carrier := kafkatrace.NewMessageCarrier(msg) + tracer.Inject(parentSpan.Context(), carrier) + + c.Consumer.Events() <- msg + + }() + + msg := (<-c.Events()).(*kafka.Message) + + // Extract the context from the message + carrier := kafkatrace.NewMessageCarrier(msg) + spanContext, err := tracer.Extract(carrier) + if err != nil { + panic(err) + } + + parentContext := parentSpan.Context() + + // Validate that the context passed is the context sent via the message + if spanContext.TraceID() == parentContext.TraceID() { + fmt.Println("Span context passed sucessfully from producer to consumer") + } else { + fmt.Println("Span context not passed") + } + + c.Close() + // wait for the events channel to be closed + <-c.Events() + // Output: Span context passed sucessfully from producer to consumer +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go index ac36a9168a..d3685a71a5 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go @@ -205,7 +205,12 @@ func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span { if !math.IsNaN(p.cfg.analyticsRate) { opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate)) } + //if there's a span context in the headers, use that as the parent carrier := NewMessageCarrier(msg) + if spanctx, err := tracer.Extract(carrier); err == nil { + opts = append(opts, tracer.ChildOf(spanctx)) + } + span, _ := tracer.StartSpanFromContext(p.cfg.ctx, "kafka.produce", opts...) // inject the span context so consumers can pick it up tracer.Inject(span.Context(), carrier) diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go index 469fa1c846..b144596af1 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go @@ -6,6 +6,7 @@ package kafka import ( + "context" "errors" "os" "testing" @@ -13,6 +14,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "github.com/confluentinc/confluent-kafka-go/kafka" @@ -207,3 +209,71 @@ func TestConsumerFunctional(t *testing.T) { }) } } + +// This tests the deprecated behavior of using cfg.context as the context passed via kafka messages +// instead of the one passed in the message. +func TestDeprecatedContext(t *testing.T) { + if _, ok := os.LookupEnv("INTEGRATION"); !ok { + t.Skip("to enable integration test, set the INTEGRATION environment variable") + } + + tracer.Start() + defer tracer.Stop() + + // Create the span to be passed + parentSpan, ctx := tracer.StartSpanFromContext(context.Background(), "test_parent_context") + + c, err := NewConsumer(&kafka.ConfigMap{ + "go.events.channel.enable": true, // required for the events channel to be turned on + "group.id": testGroupID, + "socket.timeout.ms": 10, + "session.timeout.ms": 10, + "enable.auto.offset.store": false, + }, WithContext(ctx)) // Adds the parent context containing a span + + err = c.Subscribe(testTopic, nil) + assert.NoError(t, err) + + // This span context will be ignored + messageSpan, _ := tracer.StartSpanFromContext(context.Background(), "test_context_from_message") + messageSpanContext := messageSpan.Context() + + /// Produce a message with a span + go func() { + msg := &kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &testTopic, + Partition: 1, + Offset: 1, + }, + Key: []byte("key1"), + Value: []byte("value1"), + } + + // Inject the span context in the message to be produced + carrier := NewMessageCarrier(msg) + tracer.Inject(messageSpan.Context(), carrier) + + c.Consumer.Events() <- msg + + }() + + msg := (<-c.Events()).(*kafka.Message) + + // Extract the context from the message + carrier := NewMessageCarrier(msg) + spanContext, err := tracer.Extract(carrier) + assert.NoError(t, err) + + parentContext := parentSpan.Context() + + /// The context passed is the one from the parent context + assert.EqualValues(t, parentContext.TraceID(), spanContext.TraceID()) + /// The context passed is not the one passed in the message + assert.NotEqualValues(t, messageSpanContext.TraceID(), spanContext.TraceID()) + + c.Close() + // wait for the events channel to be closed + <-c.Events() + +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/option.go b/contrib/confluentinc/confluent-kafka-go/kafka/option.go index 2d4e310696..0e2e71914c 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/option.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/option.go @@ -44,6 +44,8 @@ func newConfig(opts ...Option) *config { } // WithContext sets the config context to ctx. +// Deprecated: This is deprecated in favor of passing the context +// via the message headers func WithContext(ctx context.Context) Option { return func(cfg *config) { cfg.ctx = ctx