Skip to content

Commit 38e76ef

Browse files
authored
Add a split protocol driver for otlp exporter (#1418)
* Add a split protocol driver This is a wrapper around two other protocol drivers, so it makes it possible to send traces using a different protocol than the one used for metrics. * Add an example and tests for multi GRPC endpoint driver * Update changelog * Document the split driver
1 parent 439cd31 commit 38e76ef

File tree

7 files changed

+392
-37
lines changed

7 files changed

+392
-37
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1212

1313
- Add the `ReadOnlySpan` and `ReadWriteSpan` interfaces to provide better control for accessing span data. (#1360)
1414
- `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369)
15+
- `NewSplitDriver` for OTLP exporter that allows sending traces and metrics to different endpoints. (#1418)
1516

1617
### Changed
1718

exporters/otlp/example_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ import (
2424

2525
"go.opentelemetry.io/otel"
2626
"go.opentelemetry.io/otel/exporters/otlp"
27+
"go.opentelemetry.io/otel/metric"
28+
"go.opentelemetry.io/otel/sdk/metric/controller/push"
29+
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
30+
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
2731
sdktrace "go.opentelemetry.io/otel/sdk/trace"
2832
)
2933

@@ -51,6 +55,13 @@ func Example_insecure() {
5155
sdktrace.WithMaxExportBatchSize(10),
5256
),
5357
)
58+
defer func() {
59+
ctx, cancel := context.WithTimeout(ctx, time.Second)
60+
defer cancel()
61+
if err := tp.Shutdown(ctx); err != nil {
62+
otel.Handle(err)
63+
}
64+
}()
5465
otel.SetTracerProvider(tp)
5566

5667
tracer := otel.Tracer("test-tracer")
@@ -97,6 +108,13 @@ func Example_withTLS() {
97108
sdktrace.WithMaxExportBatchSize(10),
98109
),
99110
)
111+
defer func() {
112+
ctx, cancel := context.WithTimeout(ctx, time.Second)
113+
defer cancel()
114+
if err := tp.Shutdown(ctx); err != nil {
115+
otel.Handle(err)
116+
}
117+
}()
100118
otel.SetTracerProvider(tp)
101119

102120
tracer := otel.Tracer("test-tracer")
@@ -111,3 +129,91 @@ func Example_withTLS() {
111129
iSpan.End()
112130
}
113131
}
132+
133+
func Example_withDifferentSignalCollectors() {
134+
135+
// Set different endpoints for the metrics and traces collectors
136+
metricsDriver := otlp.NewGRPCDriver(
137+
otlp.WithInsecure(),
138+
otlp.WithAddress("localhost:30080"),
139+
)
140+
tracesDriver := otlp.NewGRPCDriver(
141+
otlp.WithInsecure(),
142+
otlp.WithAddress("localhost:30082"),
143+
)
144+
splitCfg := otlp.SplitConfig{
145+
ForMetrics: metricsDriver,
146+
ForTraces: tracesDriver,
147+
}
148+
driver := otlp.NewSplitDriver(splitCfg)
149+
ctx := context.Background()
150+
exp, err := otlp.NewExporter(ctx, driver)
151+
if err != nil {
152+
log.Fatalf("failed to create the collector exporter: %v", err)
153+
}
154+
155+
defer func() {
156+
ctx, cancel := context.WithTimeout(ctx, time.Second)
157+
defer cancel()
158+
if err := exp.Shutdown(ctx); err != nil {
159+
otel.Handle(err)
160+
}
161+
}()
162+
163+
tp := sdktrace.NewTracerProvider(
164+
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
165+
sdktrace.WithBatcher(
166+
exp,
167+
// add following two options to ensure flush
168+
sdktrace.WithBatchTimeout(5),
169+
sdktrace.WithMaxExportBatchSize(10),
170+
),
171+
)
172+
defer func() {
173+
ctx, cancel := context.WithTimeout(ctx, time.Second)
174+
defer cancel()
175+
if err := tp.Shutdown(ctx); err != nil {
176+
otel.Handle(err)
177+
}
178+
}()
179+
otel.SetTracerProvider(tp)
180+
181+
pusher := push.New(
182+
basic.New(
183+
simple.NewWithExactDistribution(),
184+
exp,
185+
),
186+
exp,
187+
push.WithPeriod(2*time.Second),
188+
)
189+
otel.SetMeterProvider(pusher.MeterProvider())
190+
191+
pusher.Start()
192+
defer pusher.Stop() // pushes any last exports to the receiver
193+
194+
tracer := otel.Tracer("test-tracer")
195+
meter := otel.Meter("test-meter")
196+
197+
// Recorder metric example
198+
valuerecorder := metric.Must(meter).
199+
NewFloat64Counter(
200+
"an_important_metric",
201+
metric.WithDescription("Measures the cumulative epicness of the app"),
202+
)
203+
204+
// work begins
205+
ctx, span := tracer.Start(
206+
ctx,
207+
"DifferentCollectors-Example")
208+
defer span.End()
209+
for i := 0; i < 10; i++ {
210+
_, iSpan := tracer.Start(ctx, fmt.Sprintf("Sample-%d", i))
211+
log.Printf("Doing really hard work (%d / 10)\n", i+1)
212+
valuerecorder.Add(ctx, 1.0)
213+
214+
<-time.After(time.Second)
215+
iSpan.End()
216+
}
217+
218+
log.Printf("Done!")
219+
}

exporters/otlp/otlp_integration_test.go

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -97,25 +97,7 @@ func newGRPCExporter(t *testing.T, ctx context.Context, address string, addition
9797
return exp
9898
}
9999

100-
func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) {
101-
mc := runMockColAtAddr(t, "localhost:56561")
102-
103-
defer func() {
104-
_ = mc.stop()
105-
}()
106-
107-
<-time.After(5 * time.Millisecond)
108-
109-
ctx := context.Background()
110-
exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...)
111-
defer func() {
112-
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
113-
defer cancel()
114-
if err := exp.Shutdown(ctx); err != nil {
115-
panic(err)
116-
}
117-
}()
118-
100+
func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTraces, mcMetrics *mockCol) {
119101
pOpts := []sdktrace.TracerProviderOption{
120102
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
121103
sdktrace.WithBatcher(
@@ -239,10 +221,11 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
239221

240222
// Shutdown the collector too so that we can begin
241223
// verification checks of expected data back.
242-
_ = mc.stop()
224+
_ = mcTraces.stop()
225+
_ = mcMetrics.stop()
243226

244227
// Now verify that we only got two resources
245-
rss := mc.getResourceSpans()
228+
rss := mcTraces.getResourceSpans()
246229
if got, want := len(rss), 2; got != want {
247230
t.Fatalf("resource span count: got %d, want %d\n", got, want)
248231
}
@@ -273,7 +256,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
273256
}
274257
}
275258

276-
metrics := mc.getMetrics()
259+
metrics := mcMetrics.getMetrics()
277260
assert.Len(t, metrics, len(instruments), "not enough metrics exported")
278261
seen := make(map[string]struct{}, len(instruments))
279262
for _, m := range metrics {
@@ -342,6 +325,28 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
342325
}
343326
}
344327

328+
func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) {
329+
mc := runMockColAtAddr(t, "localhost:56561")
330+
331+
defer func() {
332+
_ = mc.stop()
333+
}()
334+
335+
<-time.After(5 * time.Millisecond)
336+
337+
ctx := context.Background()
338+
exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...)
339+
defer func() {
340+
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
341+
defer cancel()
342+
if err := exp.Shutdown(ctx); err != nil {
343+
panic(err)
344+
}
345+
}()
346+
347+
runEndToEndTest(t, ctx, exp, mc, mc)
348+
}
349+
345350
func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
346351
mc := runMockCol(t)
347352
defer func() {
@@ -761,3 +766,44 @@ func TestFailedMetricTransform(t *testing.T) {
761766

762767
assert.Error(t, exp.Export(ctx, failCheckpointSet{}))
763768
}
769+
770+
func TestMultiConnectionDriver(t *testing.T) {
771+
mcTraces := runMockCol(t)
772+
mcMetrics := runMockCol(t)
773+
774+
defer func() {
775+
_ = mcTraces.stop()
776+
_ = mcMetrics.stop()
777+
}()
778+
779+
<-time.After(5 * time.Millisecond)
780+
781+
commonOpts := []otlp.GRPCConnectionOption{
782+
otlp.WithInsecure(),
783+
otlp.WithReconnectionPeriod(50 * time.Millisecond),
784+
otlp.WithGRPCDialOption(grpc.WithBlock()),
785+
}
786+
optsTraces := append([]otlp.GRPCConnectionOption{
787+
otlp.WithAddress(mcTraces.address),
788+
}, commonOpts...)
789+
optsMetrics := append([]otlp.GRPCConnectionOption{
790+
otlp.WithAddress(mcMetrics.address),
791+
}, commonOpts...)
792+
793+
tracesDriver := otlp.NewGRPCDriver(optsTraces...)
794+
metricsDriver := otlp.NewGRPCDriver(optsMetrics...)
795+
splitCfg := otlp.SplitConfig{
796+
ForMetrics: metricsDriver,
797+
ForTraces: tracesDriver,
798+
}
799+
driver := otlp.NewSplitDriver(splitCfg)
800+
ctx := context.Background()
801+
exp, err := otlp.NewExporter(ctx, driver)
802+
if err != nil {
803+
t.Fatalf("failed to create a new collector exporter: %v", err)
804+
}
805+
defer func() {
806+
assert.NoError(t, exp.Shutdown(ctx))
807+
}()
808+
runEndToEndTest(t, ctx, exp, mcTraces, mcMetrics)
809+
}

exporters/otlp/otlp_metric_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package otlp
15+
package otlp_test
1616

1717
import (
1818
"context"
@@ -23,6 +23,7 @@ import (
2323
"github.com/stretchr/testify/assert"
2424
"github.com/stretchr/testify/require"
2525

26+
"go.opentelemetry.io/otel/exporters/otlp"
2627
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
2728
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
2829
resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1"
@@ -692,8 +693,8 @@ func TestStatelessExportKind(t *testing.T) {
692693
t.Run(k.name, func(t *testing.T) {
693694
runMetricExportTests(
694695
t,
695-
[]ExporterOption{
696-
WithMetricExportKindSelector(
696+
[]otlp.ExporterOption{
697+
otlp.WithMetricExportKindSelector(
697698
metricsdk.StatelessExportKindSelector(),
698699
),
699700
},
@@ -740,7 +741,7 @@ func TestStatelessExportKind(t *testing.T) {
740741
}
741742
}
742743

743-
func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
744+
func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
744745
exp, driver := newExporter(t, opts...)
745746

746747
recs := map[label.Distinct][]metricsdk.Record{}

exporters/otlp/otlp_span_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package otlp
15+
package otlp_test
1616

1717
import (
1818
"context"

0 commit comments

Comments
 (0)