Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions exporter/jaegerexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jaegerexporter

import (
"context"
"fmt"

jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
"google.golang.org/grpc"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *protoGRPCSender) pushTraceData(

batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
if err != nil {
return td.SpanCount(), consumererror.Permanent(err)
return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err))
}

if s.metadata.Len() > 0 {
Expand All @@ -83,7 +84,7 @@ func (s *protoGRPCSender) pushTraceData(
ctx,
&jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady))
if err != nil {
return td.SpanCount() - sentSpans, err
return td.SpanCount() - sentSpans, fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err)
}
sentSpans += len(batch.Spans)
}
Expand Down
8 changes: 4 additions & 4 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
code: errAlreadyStopped,
msg: "OpenCensus exporter was already stopped.",
}
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err)
}

err := exporter.ExportTraceServiceRequest(
Expand All @@ -168,7 +168,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
)
oce.exporters <- exporter
if err != nil {
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err)
}
return 0, nil
}
Expand All @@ -181,7 +181,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
code: errAlreadyStopped,
msg: "OpenCensus exporter was already stopped.",
}
return exporterhelper.NumTimeSeries(md), err
return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err)
}

req := &agentmetricspb.ExportMetricsServiceRequest{
Expand All @@ -192,7 +192,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
err := exporter.ExportMetricsServiceRequest(req)
oce.exporters <- exporter
if err != nil {
return exporterhelper.NumTimeSeries(md), err
return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err)
}
return 0, nil
}
6 changes: 3 additions & 3 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in
err := oce.exporter.exportTrace(ctx, request)

if err != nil {
return td.SpanCount(), err
return td.SpanCount(), fmt.Errorf("failed to push trace data via OTLP exporter: %w", err)
}
return 0, nil
}
Expand All @@ -171,7 +171,7 @@ func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics)
err := oce.exporter.exportMetrics(ctx, request)

if err != nil {
return imd.MetricCount(), err
return imd.MetricCount(), fmt.Errorf("failed to push metrics data via OTLP exporter: %w", err)
}
return 0, nil
}
Expand All @@ -183,7 +183,7 @@ func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int,
err := oce.exporter.exportLogs(ctx, request)

if err != nil {
return logs.LogRecordCount(), err
return logs.LogRecordCount(), fmt.Errorf("failed to push log data via OTLP exporter: %w", err)
}
return 0, nil
}
13 changes: 6 additions & 7 deletions exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,33 +83,32 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) {
return ze, nil
}

func (ze *zipkinExporter) PushTraceData(_ context.Context, td consumerdata.TraceData) (int, error) {
func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (int, error) {
tbatch := make([]*zipkinmodel.SpanModel, 0, len(td.Spans))

var resource *resourcepb.Resource = td.Resource

for _, span := range td.Spans {
zs, err := zipkin.OCSpanProtoToZipkin(td.Node, resource, span, ze.defaultServiceName)
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}
tbatch = append(tbatch, zs)
}

body, err := ze.serializer.Serialize(tbatch)
if err != nil {
return len(td.Spans), consumererror.Permanent(err)
return len(td.Spans), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err))
}

req, err := http.NewRequest("POST", ze.url, bytes.NewReader(body))
req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body))
if err != nil {
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
req.Header.Set("Content-Type", ze.serializer.ContentType())

resp, err := ze.client.Do(req)
if err != nil {
return len(td.Spans), err
return len(td.Spans), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)
}
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
Expand Down
6 changes: 4 additions & 2 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,11 @@ func (bp *batchTraceProcessor) resetTimer() {
func (bp *batchTraceProcessor) sendItems(measure *stats.Int64Measure) {
// Add that it came form the trace pipeline?
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
stats.RecordWithTags(context.Background(), statsTags, measure.M(1))

_ = bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData())
if err := bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData()); err != nil {
bp.logger.Warn("Sender failed", zap.Error(err))
}
bp.batchTraces.reset()
}

Expand Down