diff --git a/internal/e2e/error_propagation_test.go b/internal/e2e/error_propagation_test.go new file mode 100644 index 00000000000..bb2180416d2 --- /dev/null +++ b/internal/e2e/error_propagation_test.go @@ -0,0 +1,334 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package e2e + +import ( + "bytes" + "context" + "io" + "net" + "net/http" + "net/http/httptest" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/config/configoptional" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/otlpexporter" + "go.opentelemetry.io/collector/exporter/otlphttpexporter" + "go.opentelemetry.io/collector/internal/testutil" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/otlpreceiver" +) + +var _ plogotlp.GRPCServer = &logsServer{} + +type logsServer struct { + plogotlp.UnimplementedGRPCServer + + exportError error +} + +func (r *logsServer) Export(_ context.Context, _ plogotlp.ExportRequest) (plogotlp.ExportResponse, error) { + return plogotlp.NewExportResponse(), r.exportError +} + +func TestGRPCToGRPC(t *testing.T) { + // gRPC supports 17 different status codes. + // Source: https://github.com/grpc/grpc/blob/41788c90bc66caf29f28ef808d066db806389792/doc/statuscodes.md + for i := range uint32(16) { + s := status.New(codes.Code(i), "Testing error") + t.Run("Code "+s.Code().String(), func(t *testing.T) { + e := createGRPCExporter(t, s) + assertOnGRPCCode(t, e, s) + }) + } +} + +func TestHTTPToGRPC(t *testing.T) { + testCases := []struct { + grpc codes.Code + http int + }{ + {codes.OK, http.StatusOK}, + {codes.Canceled, http.StatusServiceUnavailable}, + {codes.DeadlineExceeded, http.StatusServiceUnavailable}, + {codes.Aborted, http.StatusServiceUnavailable}, + {codes.OutOfRange, http.StatusServiceUnavailable}, + {codes.Unavailable, http.StatusServiceUnavailable}, + {codes.DataLoss, http.StatusServiceUnavailable}, + {codes.ResourceExhausted, http.StatusTooManyRequests}, + {codes.InvalidArgument, http.StatusBadRequest}, + {codes.Unauthenticated, http.StatusUnauthorized}, + {codes.PermissionDenied, http.StatusForbidden}, + {codes.Unimplemented, http.StatusNotFound}, + } + + for _, tt := range testCases { + s := status.New(tt.grpc, "Testing error") + t.Run("Code "+s.Code().String(), func(t *testing.T) { + e := createGRPCExporter(t, s) + assertOnHTTPCode(t, e, tt.http) + }) + } +} + +func TestGRPCToHTTP(t *testing.T) { + testCases := []struct { + http int + grpc codes.Code + }{ + {http.StatusOK, codes.OK}, + {http.StatusBadRequest, codes.InvalidArgument}, + {http.StatusUnauthorized, codes.Unauthenticated}, + {http.StatusForbidden, codes.PermissionDenied}, + {http.StatusNotFound, codes.Unimplemented}, + {http.StatusTooManyRequests, codes.ResourceExhausted}, + {http.StatusBadGateway, codes.Unavailable}, + {http.StatusServiceUnavailable, codes.Unavailable}, + {http.StatusGatewayTimeout, codes.Unavailable}, + } + + for _, tt := range testCases { + s := status.New(tt.grpc, "Testing error") + t.Run("Code "+s.Code().String(), func(t *testing.T) { + e := createHTTPExporter(t, tt.http) + assertOnGRPCCode(t, e, s) + }) + } +} + +func TestHTTPToHTTP(t *testing.T) { + testCases := []struct { + code int + mapping int + }{ + {code: http.StatusOK}, + {code: http.StatusServiceUnavailable}, + {code: http.StatusTooManyRequests}, + {code: http.StatusBadRequest}, + {code: http.StatusUnauthorized}, + {code: http.StatusForbidden}, + {code: http.StatusNotFound}, + {code: http.StatusInternalServerError}, + // Mappings won't be necessary once the OTLP/HTTP Exporter returns consumererror.Error types. + {code: http.StatusBadGateway, mapping: http.StatusServiceUnavailable}, + {code: http.StatusGatewayTimeout, mapping: http.StatusServiceUnavailable}, + {code: http.StatusTeapot, mapping: http.StatusInternalServerError}, + {code: http.StatusConflict, mapping: http.StatusInternalServerError}, + } + + for _, tt := range testCases { + t.Run("Code "+strconv.Itoa(tt.code), func(t *testing.T) { + e := createHTTPExporter(t, tt.code) + code := tt.code + if tt.mapping != 0 { + code = tt.mapping + } + assertOnHTTPCode(t, e, code) + }) + } +} + +func createGRPCExporter(t *testing.T, s *status.Status) consumer.Logs { + t.Helper() + + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) + + srv := grpc.NewServer() + rcv := &logsServer{ + exportError: s.Err(), + } + + plogotlp.RegisterGRPCServer(srv, rcv) + go func() { + assert.NoError(t, srv.Serve(ln)) + }() + t.Cleanup(func() { + srv.Stop() + }) + + f := otlpexporter.NewFactory() + cfg := f.CreateDefaultConfig().(*otlpexporter.Config) + cfg.QueueConfig.Enabled = false + cfg.RetryConfig.Enabled = false + cfg.ClientConfig = configgrpc.ClientConfig{ + Endpoint: ln.Addr().String(), + TLS: configtls.ClientConfig{ + Insecure: true, + }, + } + e, err := f.CreateLogs(context.Background(), exportertest.NewNopSettings(component.MustNewType("otlp")), cfg) + require.NoError(t, err) + err = e.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, e.Shutdown(context.Background())) + }) + + return e +} + +func createHTTPExporter(t *testing.T, code int) consumer.Logs { + t.Helper() + + mux := http.NewServeMux() + mux.HandleFunc("/v1/logs", func(writer http.ResponseWriter, _ *http.Request) { + writer.WriteHeader(code) + }) + + srv := httptest.NewServer(mux) + t.Cleanup(func() { + srv.Close() + }) + + f := otlphttpexporter.NewFactory() + cfg := f.CreateDefaultConfig().(*otlphttpexporter.Config) + cfg.QueueConfig.Enabled = false + cfg.RetryConfig.Enabled = false + cfg.Encoding = otlphttpexporter.EncodingProto + cfg.LogsEndpoint = srv.URL + "/v1/logs" + e, err := f.CreateLogs(context.Background(), exportertest.NewNopSettings(component.MustNewType("otlphttp")), cfg) + require.NoError(t, err) + err = e.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, e.Shutdown(context.Background())) + }) + + return e +} + +func assertOnGRPCCode(t *testing.T, l consumer.Logs, s *status.Status) { + t.Helper() + + rf := otlpreceiver.NewFactory() + rcfg := rf.CreateDefaultConfig().(*otlpreceiver.Config) + rcfg.GRPC = configoptional.Some( + configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + Transport: confignet.TransportTypeTCP, + }, + }, + ) + r, err := rf.CreateLogs(context.Background(), receiver.Settings{ + ID: component.MustNewID("otlp"), + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, rcfg, l) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, r.Shutdown(context.Background())) + }) + + conn, err := grpc.NewClient(rcfg.GRPC.Get().NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, conn.Close()) + }) + + ld := testdata.GenerateLogs(2) + + acc := plogotlp.NewGRPCClient(conn) + req := plogotlp.NewExportRequestFromLogs(ld) + res, err := acc.Export(context.Background(), req) + + if s.Code() == codes.OK { + require.NoError(t, err) + } else { + got := status.Convert(err).Code() + require.Equal(t, s.Code(), got, "Expected code %s but got %s", s.Code().String(), got.String()) + } + require.NotNil(t, res) +} + +func assertOnHTTPCode(t *testing.T, l consumer.Logs, code int) { + t.Helper() + + ld := testdata.GenerateLogs(2) + protoMarshaler := &plog.ProtoMarshaler{} + logProto, err := protoMarshaler.MarshalLogs(ld) + require.NoError(t, err) + + rf := otlpreceiver.NewFactory() + rcfg := rf.CreateDefaultConfig().(*otlpreceiver.Config) + rcfg.HTTP = configoptional.Some( + otlpreceiver.HTTPConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: testutil.GetAvailableLocalAddress(t), + }, + LogsURLPath: "/v1/logs", + }, + ) + r, err := rf.CreateLogs(context.Background(), receiver.Settings{ + ID: component.MustNewID("otlp"), + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, rcfg, l) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, r.Shutdown(context.Background())) + }) + + doHTTPRequest(t, rcfg.HTTP.Get().ServerConfig.Endpoint+"/v1/logs", logProto, code) +} + +func doHTTPRequest( + t *testing.T, + url string, + data []byte, + expectStatusCode int, +) []byte { + req := createHTTPRequest(t, url, data) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + respBytes, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + require.NoError(t, resp.Body.Close()) + + if expectStatusCode == 0 { + require.Equal(t, http.StatusOK, resp.StatusCode) + } else { + require.Equal(t, expectStatusCode, resp.StatusCode) + } + + return respBytes +} + +func createHTTPRequest( + t *testing.T, + url string, + data []byte, +) *http.Request { + buf := bytes.NewBuffer(data) + + req, err := http.NewRequest(http.MethodPost, "http://"+url, buf) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/x-protobuf") + + return req +} diff --git a/internal/e2e/go.mod b/internal/e2e/go.mod index 43dc4b794e1..50892af5c17 100644 --- a/internal/e2e/go.mod +++ b/internal/e2e/go.mod @@ -37,6 +37,7 @@ require ( go.opentelemetry.io/collector/service v0.130.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + google.golang.org/grpc v1.73.0 ) require ( @@ -143,7 +144,6 @@ require ( gonum.org/v1/gonum v0.16.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect - google.golang.org/grpc v1.73.0 // indirect google.golang.org/protobuf v1.36.6 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.5.0 // indirect