Skip to content

Commit 5c1ff94

Browse files
authored
contrib/confluentinc/confluent-kafka-go/kafka: Use kafka headers as context when producing (#943)
Before this patch when producing a message, the general producer context was used when producing messages. This made it impossible to attach a parent span to a produced message. With this patch messages headers are checked when producing messages for a span context and the span is created as a child if a parent span is present. This patch deprecates the WithContext() option for consumer and producers. Fixes #941
1 parent 5f6503d commit 5c1ff94

File tree

4 files changed

+162
-0
lines changed

4 files changed

+162
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016 Datadog, Inc.
5+
6+
package kafka_test
7+
8+
import (
9+
"fmt"
10+
11+
"github.com/confluentinc/confluent-kafka-go/kafka"
12+
kafkatrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka"
13+
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
14+
)
15+
16+
var (
17+
testGroupID = "gotest"
18+
testTopic = "gotest"
19+
)
20+
21+
// This example shows how a span context can be passed from a producer to a consumer.
22+
func ExampleDistributedTracing() {
23+
24+
tracer.Start()
25+
defer tracer.Stop()
26+
27+
c, err := kafkatrace.NewConsumer(&kafka.ConfigMap{
28+
"go.events.channel.enable": true, // required for the events channel to be turned on
29+
"group.id": testGroupID,
30+
"socket.timeout.ms": 10,
31+
"session.timeout.ms": 10,
32+
"enable.auto.offset.store": false,
33+
})
34+
35+
err = c.Subscribe(testTopic, nil)
36+
if err != nil {
37+
panic(err)
38+
}
39+
40+
// Create the span to be passed
41+
parentSpan := tracer.StartSpan("test_parent_span")
42+
43+
/// Produce a message with a span
44+
go func() {
45+
msg := &kafka.Message{
46+
TopicPartition: kafka.TopicPartition{
47+
Topic: &testTopic,
48+
Partition: 1,
49+
Offset: 1,
50+
},
51+
Key: []byte("key1"),
52+
Value: []byte("value1"),
53+
}
54+
55+
// Inject the span context in the message to be produced
56+
carrier := kafkatrace.NewMessageCarrier(msg)
57+
tracer.Inject(parentSpan.Context(), carrier)
58+
59+
c.Consumer.Events() <- msg
60+
61+
}()
62+
63+
msg := (<-c.Events()).(*kafka.Message)
64+
65+
// Extract the context from the message
66+
carrier := kafkatrace.NewMessageCarrier(msg)
67+
spanContext, err := tracer.Extract(carrier)
68+
if err != nil {
69+
panic(err)
70+
}
71+
72+
parentContext := parentSpan.Context()
73+
74+
// Validate that the context passed is the context sent via the message
75+
if spanContext.TraceID() == parentContext.TraceID() {
76+
fmt.Println("Span context passed sucessfully from producer to consumer")
77+
} else {
78+
fmt.Println("Span context not passed")
79+
}
80+
81+
c.Close()
82+
// wait for the events channel to be closed
83+
<-c.Events()
84+
// Output: Span context passed sucessfully from producer to consumer
85+
}

contrib/confluentinc/confluent-kafka-go/kafka/kafka.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,12 @@ func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span {
205205
if !math.IsNaN(p.cfg.analyticsRate) {
206206
opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate))
207207
}
208+
//if there's a span context in the headers, use that as the parent
208209
carrier := NewMessageCarrier(msg)
210+
if spanctx, err := tracer.Extract(carrier); err == nil {
211+
opts = append(opts, tracer.ChildOf(spanctx))
212+
}
213+
209214
span, _ := tracer.StartSpanFromContext(p.cfg.ctx, "kafka.produce", opts...)
210215
// inject the span context so consumers can pick it up
211216
tracer.Inject(span.Context(), carrier)

contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
package kafka
77

88
import (
9+
"context"
910
"errors"
1011
"os"
1112
"testing"
1213
"time"
1314

1415
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
1516
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
17+
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
1618

1719
"github.com/confluentinc/confluent-kafka-go/kafka"
1820

@@ -207,3 +209,71 @@ func TestConsumerFunctional(t *testing.T) {
207209
})
208210
}
209211
}
212+
213+
// This tests the deprecated behavior of using cfg.context as the context passed via kafka messages
214+
// instead of the one passed in the message.
215+
func TestDeprecatedContext(t *testing.T) {
216+
if _, ok := os.LookupEnv("INTEGRATION"); !ok {
217+
t.Skip("to enable integration test, set the INTEGRATION environment variable")
218+
}
219+
220+
tracer.Start()
221+
defer tracer.Stop()
222+
223+
// Create the span to be passed
224+
parentSpan, ctx := tracer.StartSpanFromContext(context.Background(), "test_parent_context")
225+
226+
c, err := NewConsumer(&kafka.ConfigMap{
227+
"go.events.channel.enable": true, // required for the events channel to be turned on
228+
"group.id": testGroupID,
229+
"socket.timeout.ms": 10,
230+
"session.timeout.ms": 10,
231+
"enable.auto.offset.store": false,
232+
}, WithContext(ctx)) // Adds the parent context containing a span
233+
234+
err = c.Subscribe(testTopic, nil)
235+
assert.NoError(t, err)
236+
237+
// This span context will be ignored
238+
messageSpan, _ := tracer.StartSpanFromContext(context.Background(), "test_context_from_message")
239+
messageSpanContext := messageSpan.Context()
240+
241+
/// Produce a message with a span
242+
go func() {
243+
msg := &kafka.Message{
244+
TopicPartition: kafka.TopicPartition{
245+
Topic: &testTopic,
246+
Partition: 1,
247+
Offset: 1,
248+
},
249+
Key: []byte("key1"),
250+
Value: []byte("value1"),
251+
}
252+
253+
// Inject the span context in the message to be produced
254+
carrier := NewMessageCarrier(msg)
255+
tracer.Inject(messageSpan.Context(), carrier)
256+
257+
c.Consumer.Events() <- msg
258+
259+
}()
260+
261+
msg := (<-c.Events()).(*kafka.Message)
262+
263+
// Extract the context from the message
264+
carrier := NewMessageCarrier(msg)
265+
spanContext, err := tracer.Extract(carrier)
266+
assert.NoError(t, err)
267+
268+
parentContext := parentSpan.Context()
269+
270+
/// The context passed is the one from the parent context
271+
assert.EqualValues(t, parentContext.TraceID(), spanContext.TraceID())
272+
/// The context passed is not the one passed in the message
273+
assert.NotEqualValues(t, messageSpanContext.TraceID(), spanContext.TraceID())
274+
275+
c.Close()
276+
// wait for the events channel to be closed
277+
<-c.Events()
278+
279+
}

contrib/confluentinc/confluent-kafka-go/kafka/option.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ func newConfig(opts ...Option) *config {
4444
}
4545

4646
// WithContext sets the config context to ctx.
47+
// Deprecated: This is deprecated in favor of passing the context
48+
// via the message headers
4749
func WithContext(ctx context.Context) Option {
4850
return func(cfg *config) {
4951
cfg.ctx = ctx

0 commit comments

Comments
 (0)