Skip to content

Commit 23230cc

Browse files
committed
fix test
1 parent 4184718 commit 23230cc

4 files changed

Lines changed: 85 additions & 20 deletions

File tree

exporter/kafkaexporter/converter.go

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import (
2222
type MessageConverter interface {
2323
// ConvertMessages generate Messages ready to be sent
2424
ConvertMessages([]Message, string) []*sarama.ProducerMessage
25+
26+
// Encoding return encoding for this converter
27+
Encoding() string
2528
}
2629

2730
// OTLPProtoConverter is the converter for otlp_proto encoding
@@ -40,12 +43,39 @@ func (mp *OTLPProtoConverter) ConvertMessages(messages []Message, topic string)
4043
return producerMessages
4144
}
4245

43-
// JaegerConverter is the converter for jaeger_proto and jaeger_json encoding
44-
type JaegerConverter struct {
46+
// Encoding return encoding for otlp_proto
47+
func (mp *OTLPProtoConverter) Encoding() string {
48+
return "otlp_proto"
49+
}
50+
51+
// JaegerProtoConverter is the converter for jaeger_proto and jaeger_json encoding
52+
type JaegerProtoConverter struct {
53+
}
54+
55+
// JaegerJSONConverter is the converter for jaeger_proto and jaeger_json encoding
56+
type JaegerJSONConverter struct {
57+
}
58+
59+
// ConvertMessages converter for jaeger_proto encoding
60+
func (mp *JaegerProtoConverter) ConvertMessages(messages []Message, topic string) []*sarama.ProducerMessage {
61+
producerMessages := make([]*sarama.ProducerMessage, len(messages))
62+
for i := range messages {
63+
producerMessages[i] = &sarama.ProducerMessage{
64+
Topic: topic,
65+
Value: sarama.ByteEncoder(messages[i].Value),
66+
Key: sarama.ByteEncoder(messages[i].Key),
67+
}
68+
}
69+
return producerMessages
70+
}
71+
72+
// Encoding return encoding for jaeger_proto
73+
func (mp *JaegerProtoConverter) Encoding() string {
74+
return "jaeger_proto"
4575
}
4676

47-
// ConvertMessages converter for jaeger-ish encoding
48-
func (mp *JaegerConverter) ConvertMessages(messages []Message, topic string) []*sarama.ProducerMessage {
77+
// ConvertMessages converter for jaeger_json encoding
78+
func (mp *JaegerJSONConverter) ConvertMessages(messages []Message, topic string) []*sarama.ProducerMessage {
4979
producerMessages := make([]*sarama.ProducerMessage, len(messages))
5080
for i := range messages {
5181
producerMessages[i] = &sarama.ProducerMessage{
@@ -57,17 +87,20 @@ func (mp *JaegerConverter) ConvertMessages(messages []Message, topic string) []*
5787
return producerMessages
5888
}
5989

60-
// getConverters returns all pre-configured converters for supported encodings. For OTLP proto we use the encoding
61-
// of traces marshaler. If future change make the encoding name different between traces/metrics/logs, then this
62-
// may needs to adjust accordingly.
90+
// Encoding return encoding for jaeger_proto
91+
func (mp *JaegerJSONConverter) Encoding() string {
92+
return "jaeger_json"
93+
}
94+
95+
// getConverters returns all pre-configured converters for supported encodings
6396
func getConverters() map[string]MessageConverter {
64-
otlppb := &otlpTracesPbMarshaler{}
65-
jaegerProto := &jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}}
66-
jaegerJSON := &jaegerMarshaler{marshaler: newJaegerJSONMarshaler()}
97+
otlppb := &OTLPProtoConverter{}
98+
jaegerProto := &JaegerProtoConverter{}
99+
jaegerJSON := &JaegerJSONConverter{}
67100

68101
return map[string]MessageConverter{
69-
otlppb.Encoding(): &OTLPProtoConverter{},
70-
jaegerProto.Encoding(): &JaegerConverter{},
71-
jaegerJSON.Encoding(): &JaegerConverter{},
102+
otlppb.Encoding(): otlppb,
103+
jaegerProto.Encoding(): jaegerProto,
104+
jaegerJSON.Encoding(): jaegerJSON,
72105
}
73106
}

exporter/kafkaexporter/factory.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
5050
}
5151
}
5252

53+
// WithConverters adds converters.
54+
func WithConverters(converters ...MessageConverter) FactoryOption {
55+
return func(factory *kafkaExporterFactory) {
56+
for _, converter := range converters {
57+
factory.converters[converter.Encoding()] = converter
58+
}
59+
}
60+
}
61+
5362
// NewFactory creates Kafka exporter factory.
5463
func NewFactory(options ...FactoryOption) component.ExporterFactory {
5564
f := &kafkaExporterFactory{

exporter/kafkaexporter/factory_test.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package kafkaexporter
1616

1717
import (
1818
"context"
19+
"github.com/Shopify/sarama"
1920
"testing"
2021

2122
"github.com/stretchr/testify/assert"
@@ -41,7 +42,7 @@ func TestCreateTracesExporter(t *testing.T) {
4142
cfg.ProtocolVersion = "2.0.0"
4243
// this disables contacting the broker so we can successfully create the exporter
4344
cfg.Metadata.Full = false
44-
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()}
45+
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers(), converters: getConverters()}
4546
r, err := f.createTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
4647
require.NoError(t, err)
4748
assert.NotNil(t, r)
@@ -53,7 +54,7 @@ func TestCreateMetricsExport(t *testing.T) {
5354
cfg.ProtocolVersion = "2.0.0"
5455
// this disables contacting the broker so we can successfully create the exporter
5556
cfg.Metadata.Full = false
56-
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()}
57+
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers(), converters: getConverters()}
5758
mr, err := mf.createMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
5859
require.NoError(t, err)
5960
assert.NotNil(t, mr)
@@ -65,7 +66,7 @@ func TestCreateLogsExport(t *testing.T) {
6566
cfg.ProtocolVersion = "2.0.0"
6667
// this disables contacting the broker so we can successfully create the exporter
6768
cfg.Metadata.Full = false
68-
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()}
69+
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers(), converters: getConverters()}
6970
mr, err := mf.createLogsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
7071
require.NoError(t, err)
7172
assert.NotNil(t, mr)
@@ -75,7 +76,7 @@ func TestCreateTracesExporter_err(t *testing.T) {
7576
cfg := createDefaultConfig().(*Config)
7677
cfg.Brokers = []string{"invalid:9092"}
7778
cfg.ProtocolVersion = "2.0.0"
78-
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers()}
79+
f := kafkaExporterFactory{tracesMarshalers: tracesMarshalers(), converters: getConverters()}
7980
r, err := f.createTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
8081
// no available broker
8182
require.Error(t, err)
@@ -86,7 +87,7 @@ func TestCreateMetricsExporter_err(t *testing.T) {
8687
cfg := createDefaultConfig().(*Config)
8788
cfg.Brokers = []string{"invalid:9092"}
8889
cfg.ProtocolVersion = "2.0.0"
89-
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers()}
90+
mf := kafkaExporterFactory{metricsMarshalers: metricsMarshalers(), converters: getConverters()}
9091
mr, err := mf.createMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
9192
require.Error(t, err)
9293
assert.Nil(t, mr)
@@ -96,15 +97,16 @@ func TestCreateLogsExporter_err(t *testing.T) {
9697
cfg := createDefaultConfig().(*Config)
9798
cfg.Brokers = []string{"invalid:9092"}
9899
cfg.ProtocolVersion = "2.0.0"
99-
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers()}
100+
mf := kafkaExporterFactory{logsMarshalers: logsMarshalers(), converters: getConverters()}
100101
mr, err := mf.createLogsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
101102
require.Error(t, err)
102103
assert.Nil(t, mr)
103104
}
104105

105106
func TestWithMarshalers(t *testing.T) {
106107
cm := &customMarshaler{}
107-
f := NewFactory(WithTracesMarshalers(cm))
108+
cc := &customConverter{}
109+
f := NewFactory(WithTracesMarshalers(cm), WithConverters(cc))
108110
cfg := createDefaultConfig().(*Config)
109111
// disable contacting broker
110112
cfg.Metadata.Full = false
@@ -135,3 +137,15 @@ func (c customMarshaler) Marshal(_ pdata.Traces) ([]Message, error) {
135137
func (c customMarshaler) Encoding() string {
136138
return "custom"
137139
}
140+
141+
type customConverter struct {
142+
}
143+
144+
func (c customConverter) ConvertMessages(messages []Message, topic string) []*sarama.ProducerMessage {
145+
panic("implement me")
146+
}
147+
148+
func (c customConverter) Encoding() string {
149+
return "custom"
150+
}
151+
var _ MessageConverter = (*customConverter)(nil)

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ func TestTraceDataPusher(t *testing.T) {
125125
p := kafkaTracesProducer{
126126
producer: producer,
127127
marshaler: &otlpTracesPbMarshaler{},
128+
converter: &OTLPProtoConverter{},
128129
}
129130
t.Cleanup(func() {
130131
require.NoError(t, p.Close(context.Background()))
@@ -142,6 +143,7 @@ func TestTraceDataPusher_err(t *testing.T) {
142143
p := kafkaTracesProducer{
143144
producer: producer,
144145
marshaler: &otlpTracesPbMarshaler{},
146+
converter: &OTLPProtoConverter{},
145147
logger: zap.NewNop(),
146148
}
147149
t.Cleanup(func() {
@@ -156,6 +158,7 @@ func TestTraceDataPusher_marshal_error(t *testing.T) {
156158
expErr := fmt.Errorf("failed to marshal")
157159
p := kafkaTracesProducer{
158160
marshaler: &tracesErrorMarshaler{err: expErr},
161+
converter: &OTLPProtoConverter{},
159162
logger: zap.NewNop(),
160163
}
161164
td := testdata.GenerateTraceDataTwoSpansSameResource()
@@ -172,6 +175,7 @@ func TestMetricsDataPusher(t *testing.T) {
172175
p := kafkaMetricsProducer{
173176
producer: producer,
174177
marshaler: &otlpMetricsPbMarshaler{},
178+
converter: &OTLPProtoConverter{},
175179
}
176180
t.Cleanup(func() {
177181
require.NoError(t, p.Close(context.Background()))
@@ -189,6 +193,7 @@ func TestMetricsDataPusher_err(t *testing.T) {
189193
p := kafkaMetricsProducer{
190194
producer: producer,
191195
marshaler: &otlpMetricsPbMarshaler{},
196+
converter: &OTLPProtoConverter{},
192197
logger: zap.NewNop(),
193198
}
194199
t.Cleanup(func() {
@@ -203,6 +208,7 @@ func TestMetricsDataPusher_marshal_error(t *testing.T) {
203208
expErr := fmt.Errorf("failed to marshal")
204209
p := kafkaMetricsProducer{
205210
marshaler: &metricsErrorMarshaler{err: expErr},
211+
converter: &OTLPProtoConverter{},
206212
logger: zap.NewNop(),
207213
}
208214
md := testdata.GenerateMetricsTwoMetrics()
@@ -219,6 +225,7 @@ func TestLogsDataPusher(t *testing.T) {
219225
p := kafkaLogsProducer{
220226
producer: producer,
221227
marshaler: &otlpLogsPbMarshaler{},
228+
converter: &OTLPProtoConverter{},
222229
}
223230
t.Cleanup(func() {
224231
require.NoError(t, p.Close(context.Background()))
@@ -236,6 +243,7 @@ func TestLogsDataPusher_err(t *testing.T) {
236243
p := kafkaLogsProducer{
237244
producer: producer,
238245
marshaler: &otlpLogsPbMarshaler{},
246+
converter: &OTLPProtoConverter{},
239247
logger: zap.NewNop(),
240248
}
241249
t.Cleanup(func() {
@@ -250,6 +258,7 @@ func TestLogsDataPusher_marshal_error(t *testing.T) {
250258
expErr := fmt.Errorf("failed to marshal")
251259
p := kafkaLogsProducer{
252260
marshaler: &logsErrorMarshaler{err: expErr},
261+
converter: &OTLPProtoConverter{},
253262
logger: zap.NewNop(),
254263
}
255264
ld := testdata.GenerateLogDataOneLog()

0 commit comments

Comments
 (0)