Skip to content

Commit edf0f1d

Browse files
authored
[chore] Add tests for network code propagation with consumererror (#13424)
This adds some E2E tests that show how network codes propagate from the OTLP(/HTTP) exporter to the OTLP receiver (both serving over HTTP and gRPC). The goal is that as we add the new `consumererror.Error` type and any related types to the pipeline, we can use these tests to understand the net effect of the errors. Most of the tests won't require changes after we add the new error type, but note the comment in `TestHTTPToHTTP` that shows where the error type will add functionality. These tests don't cover a few things right now: * Any transformation of the errors as they travel back up the pipeline by internal parts of the the pipeline graph (fanouts, obsreporter, etc.). These could impact what the OTLP receiver returns, so we may want to test them later as we touch those components. We may want these direct tests regardless since these components have a contract with each other that likely should exist outside our service implementation. * Retry handling. This hasn't been fully added to the errors yet, so I'll wait until that's closer to add tests.
1 parent 1dcce58 commit edf0f1d

File tree

2 files changed

+335
-1
lines changed

2 files changed

+335
-1
lines changed
Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package e2e
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"io"
10+
"net"
11+
"net/http"
12+
"net/http/httptest"
13+
"strconv"
14+
"testing"
15+
16+
"github.com/stretchr/testify/assert"
17+
"github.com/stretchr/testify/require"
18+
"google.golang.org/grpc"
19+
"google.golang.org/grpc/codes"
20+
"google.golang.org/grpc/credentials/insecure"
21+
"google.golang.org/grpc/status"
22+
23+
"go.opentelemetry.io/collector/component"
24+
"go.opentelemetry.io/collector/component/componenttest"
25+
"go.opentelemetry.io/collector/config/configgrpc"
26+
"go.opentelemetry.io/collector/config/confighttp"
27+
"go.opentelemetry.io/collector/config/confignet"
28+
"go.opentelemetry.io/collector/config/configoptional"
29+
"go.opentelemetry.io/collector/config/configtls"
30+
"go.opentelemetry.io/collector/consumer"
31+
"go.opentelemetry.io/collector/exporter/exportertest"
32+
"go.opentelemetry.io/collector/exporter/otlpexporter"
33+
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
34+
"go.opentelemetry.io/collector/internal/testutil"
35+
"go.opentelemetry.io/collector/pdata/plog"
36+
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
37+
"go.opentelemetry.io/collector/pdata/testdata"
38+
"go.opentelemetry.io/collector/receiver"
39+
"go.opentelemetry.io/collector/receiver/otlpreceiver"
40+
)
41+
42+
var _ plogotlp.GRPCServer = &logsServer{}
43+
44+
type logsServer struct {
45+
plogotlp.UnimplementedGRPCServer
46+
47+
exportError error
48+
}
49+
50+
func (r *logsServer) Export(_ context.Context, _ plogotlp.ExportRequest) (plogotlp.ExportResponse, error) {
51+
return plogotlp.NewExportResponse(), r.exportError
52+
}
53+
54+
func TestGRPCToGRPC(t *testing.T) {
55+
// gRPC supports 17 different status codes.
56+
// Source: https://github.com/grpc/grpc/blob/41788c90bc66caf29f28ef808d066db806389792/doc/statuscodes.md
57+
for i := range uint32(16) {
58+
s := status.New(codes.Code(i), "Testing error")
59+
t.Run("Code "+s.Code().String(), func(t *testing.T) {
60+
e := createGRPCExporter(t, s)
61+
assertOnGRPCCode(t, e, s)
62+
})
63+
}
64+
}
65+
66+
func TestHTTPToGRPC(t *testing.T) {
67+
testCases := []struct {
68+
grpc codes.Code
69+
http int
70+
}{
71+
{codes.OK, http.StatusOK},
72+
{codes.Canceled, http.StatusServiceUnavailable},
73+
{codes.DeadlineExceeded, http.StatusServiceUnavailable},
74+
{codes.Aborted, http.StatusServiceUnavailable},
75+
{codes.OutOfRange, http.StatusServiceUnavailable},
76+
{codes.Unavailable, http.StatusServiceUnavailable},
77+
{codes.DataLoss, http.StatusServiceUnavailable},
78+
{codes.ResourceExhausted, http.StatusTooManyRequests},
79+
{codes.InvalidArgument, http.StatusBadRequest},
80+
{codes.Unauthenticated, http.StatusUnauthorized},
81+
{codes.PermissionDenied, http.StatusForbidden},
82+
{codes.Unimplemented, http.StatusNotFound},
83+
}
84+
85+
for _, tt := range testCases {
86+
s := status.New(tt.grpc, "Testing error")
87+
t.Run("Code "+s.Code().String(), func(t *testing.T) {
88+
e := createGRPCExporter(t, s)
89+
assertOnHTTPCode(t, e, tt.http)
90+
})
91+
}
92+
}
93+
94+
func TestGRPCToHTTP(t *testing.T) {
95+
testCases := []struct {
96+
http int
97+
grpc codes.Code
98+
}{
99+
{http.StatusOK, codes.OK},
100+
{http.StatusBadRequest, codes.InvalidArgument},
101+
{http.StatusUnauthorized, codes.Unauthenticated},
102+
{http.StatusForbidden, codes.PermissionDenied},
103+
{http.StatusNotFound, codes.Unimplemented},
104+
{http.StatusTooManyRequests, codes.ResourceExhausted},
105+
{http.StatusBadGateway, codes.Unavailable},
106+
{http.StatusServiceUnavailable, codes.Unavailable},
107+
{http.StatusGatewayTimeout, codes.Unavailable},
108+
}
109+
110+
for _, tt := range testCases {
111+
s := status.New(tt.grpc, "Testing error")
112+
t.Run("Code "+s.Code().String(), func(t *testing.T) {
113+
e := createHTTPExporter(t, tt.http)
114+
assertOnGRPCCode(t, e, s)
115+
})
116+
}
117+
}
118+
119+
func TestHTTPToHTTP(t *testing.T) {
120+
testCases := []struct {
121+
code int
122+
mapping int
123+
}{
124+
{code: http.StatusOK},
125+
{code: http.StatusServiceUnavailable},
126+
{code: http.StatusTooManyRequests},
127+
{code: http.StatusBadRequest},
128+
{code: http.StatusUnauthorized},
129+
{code: http.StatusForbidden},
130+
{code: http.StatusNotFound},
131+
{code: http.StatusInternalServerError},
132+
// Mappings won't be necessary once the OTLP/HTTP Exporter returns consumererror.Error types.
133+
{code: http.StatusBadGateway, mapping: http.StatusServiceUnavailable},
134+
{code: http.StatusGatewayTimeout, mapping: http.StatusServiceUnavailable},
135+
{code: http.StatusTeapot, mapping: http.StatusInternalServerError},
136+
{code: http.StatusConflict, mapping: http.StatusInternalServerError},
137+
}
138+
139+
for _, tt := range testCases {
140+
t.Run("Code "+strconv.Itoa(tt.code), func(t *testing.T) {
141+
e := createHTTPExporter(t, tt.code)
142+
code := tt.code
143+
if tt.mapping != 0 {
144+
code = tt.mapping
145+
}
146+
assertOnHTTPCode(t, e, code)
147+
})
148+
}
149+
}
150+
151+
func createGRPCExporter(t *testing.T, s *status.Status) consumer.Logs {
152+
t.Helper()
153+
154+
ln, err := net.Listen("tcp", "localhost:")
155+
require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err)
156+
157+
srv := grpc.NewServer()
158+
rcv := &logsServer{
159+
exportError: s.Err(),
160+
}
161+
162+
plogotlp.RegisterGRPCServer(srv, rcv)
163+
go func() {
164+
assert.NoError(t, srv.Serve(ln))
165+
}()
166+
t.Cleanup(func() {
167+
srv.Stop()
168+
})
169+
170+
f := otlpexporter.NewFactory()
171+
cfg := f.CreateDefaultConfig().(*otlpexporter.Config)
172+
cfg.QueueConfig.Enabled = false
173+
cfg.RetryConfig.Enabled = false
174+
cfg.ClientConfig = configgrpc.ClientConfig{
175+
Endpoint: ln.Addr().String(),
176+
TLS: configtls.ClientConfig{
177+
Insecure: true,
178+
},
179+
}
180+
e, err := f.CreateLogs(context.Background(), exportertest.NewNopSettings(component.MustNewType("otlp")), cfg)
181+
require.NoError(t, err)
182+
err = e.Start(context.Background(), componenttest.NewNopHost())
183+
require.NoError(t, err)
184+
t.Cleanup(func() {
185+
require.NoError(t, e.Shutdown(context.Background()))
186+
})
187+
188+
return e
189+
}
190+
191+
func createHTTPExporter(t *testing.T, code int) consumer.Logs {
192+
t.Helper()
193+
194+
mux := http.NewServeMux()
195+
mux.HandleFunc("/v1/logs", func(writer http.ResponseWriter, _ *http.Request) {
196+
writer.WriteHeader(code)
197+
})
198+
199+
srv := httptest.NewServer(mux)
200+
t.Cleanup(func() {
201+
srv.Close()
202+
})
203+
204+
f := otlphttpexporter.NewFactory()
205+
cfg := f.CreateDefaultConfig().(*otlphttpexporter.Config)
206+
cfg.QueueConfig.Enabled = false
207+
cfg.RetryConfig.Enabled = false
208+
cfg.Encoding = otlphttpexporter.EncodingProto
209+
cfg.LogsEndpoint = srv.URL + "/v1/logs"
210+
e, err := f.CreateLogs(context.Background(), exportertest.NewNopSettings(component.MustNewType("otlphttp")), cfg)
211+
require.NoError(t, err)
212+
err = e.Start(context.Background(), componenttest.NewNopHost())
213+
require.NoError(t, err)
214+
t.Cleanup(func() {
215+
require.NoError(t, e.Shutdown(context.Background()))
216+
})
217+
218+
return e
219+
}
220+
221+
func assertOnGRPCCode(t *testing.T, l consumer.Logs, s *status.Status) {
222+
t.Helper()
223+
224+
rf := otlpreceiver.NewFactory()
225+
rcfg := rf.CreateDefaultConfig().(*otlpreceiver.Config)
226+
rcfg.GRPC = configoptional.Some(
227+
configgrpc.ServerConfig{
228+
NetAddr: confignet.AddrConfig{
229+
Endpoint: testutil.GetAvailableLocalAddress(t),
230+
Transport: confignet.TransportTypeTCP,
231+
},
232+
},
233+
)
234+
r, err := rf.CreateLogs(context.Background(), receiver.Settings{
235+
ID: component.MustNewID("otlp"),
236+
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
237+
}, rcfg, l)
238+
require.NoError(t, err)
239+
err = r.Start(context.Background(), componenttest.NewNopHost())
240+
require.NoError(t, err)
241+
t.Cleanup(func() {
242+
require.NoError(t, r.Shutdown(context.Background()))
243+
})
244+
245+
conn, err := grpc.NewClient(rcfg.GRPC.Get().NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
246+
require.NoError(t, err)
247+
t.Cleanup(func() {
248+
require.NoError(t, conn.Close())
249+
})
250+
251+
ld := testdata.GenerateLogs(2)
252+
253+
acc := plogotlp.NewGRPCClient(conn)
254+
req := plogotlp.NewExportRequestFromLogs(ld)
255+
res, err := acc.Export(context.Background(), req)
256+
257+
if s.Code() == codes.OK {
258+
require.NoError(t, err)
259+
} else {
260+
got := status.Convert(err).Code()
261+
require.Equal(t, s.Code(), got, "Expected code %s but got %s", s.Code().String(), got.String())
262+
}
263+
require.NotNil(t, res)
264+
}
265+
266+
func assertOnHTTPCode(t *testing.T, l consumer.Logs, code int) {
267+
t.Helper()
268+
269+
ld := testdata.GenerateLogs(2)
270+
protoMarshaler := &plog.ProtoMarshaler{}
271+
logProto, err := protoMarshaler.MarshalLogs(ld)
272+
require.NoError(t, err)
273+
274+
rf := otlpreceiver.NewFactory()
275+
rcfg := rf.CreateDefaultConfig().(*otlpreceiver.Config)
276+
rcfg.HTTP = configoptional.Some(
277+
otlpreceiver.HTTPConfig{
278+
ServerConfig: confighttp.ServerConfig{
279+
Endpoint: testutil.GetAvailableLocalAddress(t),
280+
},
281+
LogsURLPath: "/v1/logs",
282+
},
283+
)
284+
r, err := rf.CreateLogs(context.Background(), receiver.Settings{
285+
ID: component.MustNewID("otlp"),
286+
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
287+
}, rcfg, l)
288+
require.NoError(t, err)
289+
err = r.Start(context.Background(), componenttest.NewNopHost())
290+
require.NoError(t, err)
291+
t.Cleanup(func() {
292+
require.NoError(t, r.Shutdown(context.Background()))
293+
})
294+
295+
doHTTPRequest(t, rcfg.HTTP.Get().ServerConfig.Endpoint+"/v1/logs", logProto, code)
296+
}
297+
298+
func doHTTPRequest(
299+
t *testing.T,
300+
url string,
301+
data []byte,
302+
expectStatusCode int,
303+
) []byte {
304+
req := createHTTPRequest(t, url, data)
305+
resp, err := http.DefaultClient.Do(req)
306+
require.NoError(t, err)
307+
308+
respBytes, err := io.ReadAll(resp.Body)
309+
require.NoError(t, err)
310+
311+
require.NoError(t, resp.Body.Close())
312+
313+
if expectStatusCode == 0 {
314+
require.Equal(t, http.StatusOK, resp.StatusCode)
315+
} else {
316+
require.Equal(t, expectStatusCode, resp.StatusCode)
317+
}
318+
319+
return respBytes
320+
}
321+
322+
func createHTTPRequest(
323+
t *testing.T,
324+
url string,
325+
data []byte,
326+
) *http.Request {
327+
buf := bytes.NewBuffer(data)
328+
329+
req, err := http.NewRequest(http.MethodPost, "http://"+url, buf)
330+
require.NoError(t, err)
331+
req.Header.Set("Content-Type", "application/x-protobuf")
332+
333+
return req
334+
}

internal/e2e/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ require (
3737
go.opentelemetry.io/collector/service v0.130.0
3838
go.uber.org/goleak v1.3.0
3939
go.uber.org/zap v1.27.0
40+
google.golang.org/grpc v1.73.0
4041
)
4142

4243
require (
@@ -143,7 +144,6 @@ require (
143144
gonum.org/v1/gonum v0.16.0 // indirect
144145
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect
145146
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
146-
google.golang.org/grpc v1.73.0 // indirect
147147
google.golang.org/protobuf v1.36.6 // indirect
148148
gopkg.in/yaml.v3 v3.0.1 // indirect
149149
sigs.k8s.io/yaml v1.5.0 // indirect

0 commit comments

Comments
 (0)