Skip to content

Commit 4da23d1

Browse files
Add OTLP trace data types, factories and helpers (#534)
This is step one in converting in-memory representation from OC to OTLP. This change has no effect except in tests. The introduced code is not reachable. Issue: #478
1 parent 7d44a4c commit 4da23d1

File tree

11 files changed

+405
-44
lines changed

11 files changed

+405
-44
lines changed

consumer/consumer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,10 @@ type MetricsConsumer interface {
3636
type TraceConsumer interface {
3737
ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error
3838
}
39+
40+
// OTLPTraceConsumer is an interface that receives consumerdata.OTLPTraceData, processes it
41+
// as needed, and sends it to the next processing node if any or to the destination.
42+
type OTLPTraceConsumer interface {
43+
// ConsumeOTLPTrace receives consumerdata.OTLPTraceData for processing.
44+
ConsumeOTLPTrace(ctx context.Context, td consumerdata.OTLPTraceData) error
45+
}

consumer/consumerdata/consumerdata.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
2121
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
2222
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
23+
otlptrace "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
2324
)
2425

2526
// MetricsData is a struct that groups proto metrics with a unique node and a resource.
@@ -36,3 +37,18 @@ type TraceData struct {
3637
Spans []*tracepb.Span
3738
SourceFormat string
3839
}
40+
41+
// OTLPTraceData is a struct that groups proto spans with a resource. This is the
42+
// newer version of TraceData, using OTLP-based representation.
43+
type OTLPTraceData struct {
44+
ResourceSpanList []*otlptrace.ResourceSpans
45+
}
46+
47+
// SpanCount calculates the total number of spans.
48+
func (td OTLPTraceData) SpanCount() int {
49+
spanCount := 0
50+
for _, rsl := range td.ResourceSpanList {
51+
spanCount += len(rsl.Spans)
52+
}
53+
return spanCount
54+
}

exporter/exporter.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,19 @@ type Exporter interface {
2525
component.Component
2626
}
2727

28-
// TraceExporter composes TraceConsumer with some additional exporter-specific functions.
28+
// TraceExporter is a TraceConsumer that is also an Exporter.
2929
type TraceExporter interface {
3030
consumer.TraceConsumer
3131
Exporter
3232
}
3333

34-
// MetricsExporter composes MetricsConsumer with some additional exporter-specific functions.
34+
// OTLPTraceExporter is an OTLPTraceConsumer that is also an Exporter.
35+
type OTLPTraceExporter interface {
36+
consumer.OTLPTraceConsumer
37+
Exporter
38+
}
39+
40+
// MetricsExporter is a MetricsConsumer that is also an Exporter.
3541
type MetricsExporter interface {
3642
consumer.MetricsConsumer
3743
Exporter

exporter/exporterhelper/constants.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
var (
2222
// errNilConfig is returned when an empty name is given.
2323
errNilConfig = errors.New("nil config")
24-
// errNilPushTraceData is returned when a nil pushTraceData is given.
25-
errNilPushTraceData = errors.New("nil pushTraceData")
24+
// errNilPushTraceData is returned when a nil traceDataPusher is given.
25+
errNilPushTraceData = errors.New("nil traceDataPusher")
2626
// errNilPushMetricsData is returned when a nil pushMetricsData is given.
2727
errNilPushMetricsData = errors.New("nil pushMetricsData")
2828
)

exporter/exporterhelper/tracehelper.go

Lines changed: 144 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,33 @@ import (
2626
"github.com/open-telemetry/opentelemetry-collector/observability"
2727
)
2828

29-
// PushTraceData is a helper function that is similar to ConsumeTraceData but also returns
30-
// the number of dropped spans.
31-
type PushTraceData func(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error)
29+
// Suffix to use for span names emitted by exporter for observability purposes.
30+
const spanNameSuffix = ".ExportTraceData"
3231

32+
// traceDataPusher is a helper function that is similar to ConsumeTraceData but also
33+
// returns the number of dropped spans.
34+
type traceDataPusher func(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error)
35+
36+
// otlpTraceDataPusher is a helper function that is similar to ConsumeTraceData but also
37+
// returns the number of dropped spans.
38+
type otlpTraceDataPusher func(ctx context.Context, td consumerdata.OTLPTraceData) (droppedSpans int, err error)
39+
40+
// traceExporter implements the exporter with additional helper options.
3341
type traceExporter struct {
3442
exporterFullName string
35-
pushTraceData PushTraceData
43+
dataPusher traceDataPusher
3644
shutdown Shutdown
3745
}
3846

39-
var _ (exporter.TraceExporter) = (*traceExporter)(nil)
47+
var _ exporter.TraceExporter = (*traceExporter)(nil)
4048

4149
func (te *traceExporter) Start(host component.Host) error {
4250
return nil
4351
}
4452

4553
func (te *traceExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
4654
exporterCtx := observability.ContextWithExporterName(ctx, te.exporterFullName)
47-
_, err := te.pushTraceData(exporterCtx, td)
55+
_, err := te.dataPusher(exporterCtx, td)
4856
return err
4957
}
5058

@@ -53,28 +61,34 @@ func (te *traceExporter) Shutdown() error {
5361
return te.shutdown()
5462
}
5563

56-
// NewTraceExporter creates an TraceExporter that can record metrics and can wrap every request with a Span.
57-
// If no options are passed it just adds the exporter format as a tag in the Context.
58-
// TODO: Add support for retries.
59-
func NewTraceExporter(config configmodels.Exporter, pushTraceData PushTraceData, options ...ExporterOption) (exporter.TraceExporter, error) {
64+
// NewTraceExporter creates an TraceExporter that can record metrics and can wrap every
65+
// request with a Span. If no options are passed it just adds the exporter format as a
66+
// tag in the Context.
67+
func NewTraceExporter(
68+
config configmodels.Exporter,
69+
dataPusher traceDataPusher,
70+
options ...ExporterOption,
71+
) (exporter.TraceExporter, error) {
72+
6073
if config == nil {
6174
return nil, errNilConfig
6275
}
6376

64-
if pushTraceData == nil {
77+
if dataPusher == nil {
6578
return nil, errNilPushTraceData
6679
}
6780

6881
opts := newExporterOptions(options...)
6982
if opts.recordMetrics {
70-
pushTraceData = pushTraceDataWithMetrics(pushTraceData)
83+
dataPusher = dataPusher.withMetrics()
7184
}
7285

7386
if opts.recordTrace {
74-
pushTraceData = pushTraceDataWithSpan(pushTraceData, config.Name()+".ExportTraceData")
87+
spanName := config.Name() + spanNameSuffix
88+
dataPusher = dataPusher.withSpan(spanName)
7589
}
7690

77-
// The default shutdown function returns nil.
91+
// The default shutdown function does nothing.
7892
if opts.shutdown == nil {
7993
opts.shutdown = func() error {
8094
return nil
@@ -83,27 +97,31 @@ func NewTraceExporter(config configmodels.Exporter, pushTraceData PushTraceData,
8397

8498
return &traceExporter{
8599
exporterFullName: config.Name(),
86-
pushTraceData: pushTraceData,
100+
dataPusher: dataPusher,
87101
shutdown: opts.shutdown,
88102
}, nil
89103
}
90104

91-
func pushTraceDataWithMetrics(next PushTraceData) PushTraceData {
105+
// withMetrics wraps the current pusher into a function that records the metrics of the
106+
// pusher execution.
107+
func (p traceDataPusher) withMetrics() traceDataPusher {
92108
return func(ctx context.Context, td consumerdata.TraceData) (int, error) {
93-
// TODO: Add retry logic here if we want to support because we need to record special metrics.
94-
droppedSpans, err := next(ctx, td)
109+
// Forward the data to the next consumer (this pusher is the next).
110+
droppedSpans, err := p(ctx, td)
95111
// TODO: How to record the reason of dropping?
96112
observability.RecordMetricsForTraceExporter(ctx, len(td.Spans), droppedSpans)
97113
return droppedSpans, err
98114
}
99115
}
100116

101-
func pushTraceDataWithSpan(next PushTraceData, spanName string) PushTraceData {
117+
// withSpan wraps the current pusher into a function that records a span during
118+
// pusher execution.
119+
func (p traceDataPusher) withSpan(spanName string) traceDataPusher {
102120
return func(ctx context.Context, td consumerdata.TraceData) (int, error) {
103121
ctx, span := trace.StartSpan(ctx, spanName)
104122
defer span.End()
105-
// Call next stage.
106-
droppedSpans, err := next(ctx, td)
123+
// Forward the data to the next consumer (this pusher is the next).
124+
droppedSpans, err := p(ctx, td)
107125
if span.IsRecordingEvents() {
108126
span.AddAttributes(
109127
trace.Int64Attribute(numReceivedSpansAttribute, int64(len(td.Spans))),
@@ -116,3 +134,108 @@ func pushTraceDataWithSpan(next PushTraceData, spanName string) PushTraceData {
116134
return droppedSpans, err
117135
}
118136
}
137+
138+
type otlpTraceExporter struct {
139+
exporterFullName string
140+
dataPusher otlpTraceDataPusher
141+
shutdown Shutdown
142+
}
143+
144+
var _ exporter.OTLPTraceExporter = (*otlpTraceExporter)(nil)
145+
146+
func (te *otlpTraceExporter) Start(host component.Host) error {
147+
return nil
148+
}
149+
150+
func (te *otlpTraceExporter) ConsumeOTLPTrace(
151+
ctx context.Context,
152+
td consumerdata.OTLPTraceData,
153+
) error {
154+
exporterCtx := observability.ContextWithExporterName(ctx, te.exporterFullName)
155+
_, err := te.dataPusher(exporterCtx, td)
156+
return err
157+
}
158+
159+
// Shutdown stops the exporter and is invoked during shutdown.
160+
func (te *otlpTraceExporter) Shutdown() error {
161+
return te.shutdown()
162+
}
163+
164+
// NewOTLPTraceExporter creates an OTLPTraceExporter that can record metrics and can wrap
165+
// every request with a Span.
166+
func NewOTLPTraceExporter(
167+
config configmodels.Exporter,
168+
dataPusher otlpTraceDataPusher,
169+
options ...ExporterOption,
170+
) (exporter.OTLPTraceExporter, error) {
171+
172+
if config == nil {
173+
return nil, errNilConfig
174+
}
175+
176+
if dataPusher == nil {
177+
return nil, errNilPushTraceData
178+
}
179+
180+
opts := newExporterOptions(options...)
181+
if opts.recordMetrics {
182+
dataPusher = dataPusher.withMetrics()
183+
}
184+
185+
if opts.recordTrace {
186+
spanName := config.Name() + spanNameSuffix
187+
dataPusher = dataPusher.withSpan(spanName)
188+
}
189+
190+
// The default shutdown function does nothing.
191+
if opts.shutdown == nil {
192+
opts.shutdown = func() error {
193+
return nil
194+
}
195+
}
196+
197+
return &otlpTraceExporter{
198+
exporterFullName: config.Name(),
199+
dataPusher: dataPusher,
200+
shutdown: opts.shutdown,
201+
}, nil
202+
}
203+
204+
// withMetrics wraps the current pusher into a function that records the metrics of the
205+
// pusher execution.
206+
func (p otlpTraceDataPusher) withMetrics() otlpTraceDataPusher {
207+
return func(ctx context.Context, td consumerdata.OTLPTraceData) (int, error) {
208+
// Forward the data to the next consumer (this pusher is the next).
209+
droppedSpans, err := p(ctx, td)
210+
211+
// Record the results as metrics.
212+
observability.RecordMetricsForTraceExporter(ctx, td.SpanCount(), droppedSpans)
213+
214+
return droppedSpans, err
215+
}
216+
}
217+
218+
// withSpan wraps the current pusher into a function that records a span during
219+
// pusher execution.
220+
func (p otlpTraceDataPusher) withSpan(spanName string) otlpTraceDataPusher {
221+
return func(ctx context.Context, td consumerdata.OTLPTraceData) (int, error) {
222+
// Start a span.
223+
ctx, span := trace.StartSpan(ctx, spanName)
224+
225+
// End the span after this function is done.
226+
defer span.End()
227+
228+
// Forward the data to the next consumer (this pusher is the next).
229+
droppedSpans, err := p(ctx, td)
230+
if span.IsRecordingEvents() {
231+
span.AddAttributes(
232+
trace.Int64Attribute(numReceivedSpansAttribute, int64(td.SpanCount())),
233+
trace.Int64Attribute(numDroppedSpansAttribute, int64(droppedSpans)),
234+
)
235+
if err != nil {
236+
span.SetStatus(errToStatus(err))
237+
}
238+
}
239+
return droppedSpans, err
240+
}
241+
}

0 commit comments

Comments
 (0)