Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka_test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the license header to this file.


import (
"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"
kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

var (
testGroupID = "gotest"
testTopic = "gotest"
)

// This example shows how a span context can be passed from a producer to a consumer.
func ExampleDistributedTracing() {

tracer.Start()
defer tracer.Stop()

c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{
"go.events.channel.enable": true, // required for the events channel to be turned on
"group.id": testGroupID,
"socket.timeout.ms": 10,
"session.timeout.ms": 10,
"enable.auto.offset.store": false,
})

err = c.Subscribe(testTopic, nil)
if err != nil {
panic(err)
}

// Create the span to be passed
parentSpan := tracer.StartSpan("test_parent_span")

/// Produce a message with a span
go func() {
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 1,
Offset: 1,
},
Key: []byte("key1"),
Value: []byte("value1"),
}

// Inject the span context in the message to be produced
carrier := kafkatrace.NewMessageCarrier(msg)
tracer.Inject(parentSpan.Context(), carrier)

c.Consumer.Events() <- msg

}()

msg := (<-c.Events()).(*kafka.Message)

// Extract the context from the message
carrier := kafkatrace.NewMessageCarrier(msg)
spanContext, err := tracer.Extract(carrier)
if err != nil {
panic(err)
}

parentContext := parentSpan.Context()

// Validate that the context passed is the context sent via the message
if spanContext.TraceID() == parentContext.TraceID() {
fmt.Println("Span context passed sucessfully from producer to consumer")
} else {
fmt.Println("Span context not passed")
}

c.Close()
// wait for the events channel to be closed
<-c.Events()
// Output: Span context passed sucessfully from producer to consumer
}
5 changes: 5 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span {
if !math.IsNaN(p.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate))
}
//if there's a span context in the headers, use that as the parent
carrier := NewMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}

span, _ := tracer.StartSpanFromContext(p.cfg.ctx, "kafka.produce", opts...)
// inject the span context so consumers can pick it up
tracer.Inject(span.Context(), carrier)
Expand Down
70 changes: 70 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package kafka

import (
"context"
"errors"
"os"
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/confluentinc/confluent-kafka-go/kafka"

Expand Down Expand Up @@ -207,3 +209,71 @@ func TestConsumerFunctional(t *testing.T) {
})
}
}

// This tests the deprecated behavior of using cfg.context as the context passed via kafka messages
// instead of the one passed in the message.
func TestDeprecatedContext(t *testing.T) {
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
t.Skip("to enable integration test, set the INTEGRATION environment variable")
}

tracer.Start()
defer tracer.Stop()

// Create the span to be passed
parentSpan, ctx := tracer.StartSpanFromContext(context.Background(), "test_parent_context")

c, err := NewConsumer(&kafka.ConfigMap{
"go.events.channel.enable": true, // required for the events channel to be turned on
"group.id": testGroupID,
"socket.timeout.ms": 10,
"session.timeout.ms": 10,
"enable.auto.offset.store": false,
}, WithContext(ctx)) // Adds the parent context containing a span

err = c.Subscribe(testTopic, nil)
assert.NoError(t, err)

// This span context will be ignored
messageSpan, _ := tracer.StartSpanFromContext(context.Background(), "test_context_from_message")
messageSpanContext := messageSpan.Context()

/// Produce a message with a span
go func() {
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &testTopic,
Partition: 1,
Offset: 1,
},
Key: []byte("key1"),
Value: []byte("value1"),
}

// Inject the span context in the message to be produced
carrier := NewMessageCarrier(msg)
tracer.Inject(messageSpan.Context(), carrier)

c.Consumer.Events() <- msg

}()

msg := (<-c.Events()).(*kafka.Message)

// Extract the context from the message
carrier := NewMessageCarrier(msg)
spanContext, err := tracer.Extract(carrier)
assert.NoError(t, err)

parentContext := parentSpan.Context()

/// The context passed is the one from the parent context
assert.EqualValues(t, parentContext.TraceID(), spanContext.TraceID())
/// The context passed is not the one passed in the message
assert.NotEqualValues(t, messageSpanContext.TraceID(), spanContext.TraceID())

c.Close()
// wait for the events channel to be closed
<-c.Events()

}
2 changes: 2 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/kafka/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func newConfig(opts ...Option) *config {
}

// WithContext sets the config context to ctx.
// Deprecated: This is deprecated in favor of passing the context
// via the message headers
func WithContext(ctx context.Context) Option {
return func(cfg *config) {
cfg.ctx = ctx
Expand Down