Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 334 additions & 0 deletions internal/e2e/error_propagation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package e2e

import (
"bytes"
"context"
"io"
"net"
"net/http"
"net/http/httptest"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
"go.opentelemetry.io/collector/internal/testutil"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/testdata"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
)

var _ plogotlp.GRPCServer = &logsServer{}

type logsServer struct {
plogotlp.UnimplementedGRPCServer

exportError error
}

func (r *logsServer) Export(_ context.Context, _ plogotlp.ExportRequest) (plogotlp.ExportResponse, error) {
return plogotlp.NewExportResponse(), r.exportError
}

func TestGRPCToGRPC(t *testing.T) {
// gRPC supports 17 different status codes.
// Source: https://github.com/grpc/grpc/blob/41788c90bc66caf29f28ef808d066db806389792/doc/statuscodes.md
for i := range uint32(16) {
s := status.New(codes.Code(i), "Testing error")
t.Run("Code "+s.Code().String(), func(t *testing.T) {
e := createGRPCExporter(t, s)
assertOnGRPCCode(t, e, s)
})
}
}

func TestHTTPToGRPC(t *testing.T) {
testCases := []struct {
grpc codes.Code
http int
}{
{codes.OK, http.StatusOK},
{codes.Canceled, http.StatusServiceUnavailable},
{codes.DeadlineExceeded, http.StatusServiceUnavailable},
{codes.Aborted, http.StatusServiceUnavailable},
{codes.OutOfRange, http.StatusServiceUnavailable},
{codes.Unavailable, http.StatusServiceUnavailable},
{codes.DataLoss, http.StatusServiceUnavailable},
{codes.ResourceExhausted, http.StatusTooManyRequests},
{codes.InvalidArgument, http.StatusBadRequest},
{codes.Unauthenticated, http.StatusUnauthorized},
{codes.PermissionDenied, http.StatusForbidden},
{codes.Unimplemented, http.StatusNotFound},
}

for _, tt := range testCases {
s := status.New(tt.grpc, "Testing error")
t.Run("Code "+s.Code().String(), func(t *testing.T) {
e := createGRPCExporter(t, s)
assertOnHTTPCode(t, e, tt.http)
})
}
}

func TestGRPCToHTTP(t *testing.T) {
testCases := []struct {
http int
grpc codes.Code
}{
{http.StatusOK, codes.OK},
{http.StatusBadRequest, codes.InvalidArgument},
{http.StatusUnauthorized, codes.Unauthenticated},
{http.StatusForbidden, codes.PermissionDenied},
{http.StatusNotFound, codes.Unimplemented},
{http.StatusTooManyRequests, codes.ResourceExhausted},
{http.StatusBadGateway, codes.Unavailable},
{http.StatusServiceUnavailable, codes.Unavailable},
{http.StatusGatewayTimeout, codes.Unavailable},
}

for _, tt := range testCases {
s := status.New(tt.grpc, "Testing error")
t.Run("Code "+s.Code().String(), func(t *testing.T) {
e := createHTTPExporter(t, tt.http)
assertOnGRPCCode(t, e, s)
})
}
}

func TestHTTPToHTTP(t *testing.T) {
testCases := []struct {
code int
mapping int
}{
{code: http.StatusOK},
{code: http.StatusServiceUnavailable},
{code: http.StatusTooManyRequests},
{code: http.StatusBadRequest},
{code: http.StatusUnauthorized},
{code: http.StatusForbidden},
{code: http.StatusNotFound},
{code: http.StatusInternalServerError},
// Mappings won't be necessary once the OTLP/HTTP Exporter returns consumererror.Error types.
{code: http.StatusBadGateway, mapping: http.StatusServiceUnavailable},
{code: http.StatusGatewayTimeout, mapping: http.StatusServiceUnavailable},
{code: http.StatusTeapot, mapping: http.StatusInternalServerError},
{code: http.StatusConflict, mapping: http.StatusInternalServerError},
}

for _, tt := range testCases {
t.Run("Code "+strconv.Itoa(tt.code), func(t *testing.T) {
e := createHTTPExporter(t, tt.code)
code := tt.code
if tt.mapping != 0 {
code = tt.mapping
}
assertOnHTTPCode(t, e, code)
})
}
}

func createGRPCExporter(t *testing.T, s *status.Status) consumer.Logs {
t.Helper()

ln, err := net.Listen("tcp", "localhost:")
require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err)

srv := grpc.NewServer()
rcv := &logsServer{
exportError: s.Err(),
}

plogotlp.RegisterGRPCServer(srv, rcv)
go func() {
assert.NoError(t, srv.Serve(ln))
}()
t.Cleanup(func() {
srv.Stop()
})

f := otlpexporter.NewFactory()
cfg := f.CreateDefaultConfig().(*otlpexporter.Config)
cfg.QueueConfig.Enabled = false
cfg.RetryConfig.Enabled = false
cfg.ClientConfig = configgrpc.ClientConfig{
Endpoint: ln.Addr().String(),
TLS: configtls.ClientConfig{
Insecure: true,
},
}
e, err := f.CreateLogs(context.Background(), exportertest.NewNopSettings(component.MustNewType("otlp")), cfg)
require.NoError(t, err)
err = e.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, e.Shutdown(context.Background()))
})

return e
}

func createHTTPExporter(t *testing.T, code int) consumer.Logs {
t.Helper()

mux := http.NewServeMux()
mux.HandleFunc("/v1/logs", func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(code)
})

srv := httptest.NewServer(mux)
t.Cleanup(func() {
srv.Close()
})

f := otlphttpexporter.NewFactory()
cfg := f.CreateDefaultConfig().(*otlphttpexporter.Config)
cfg.QueueConfig.Enabled = false
cfg.RetryConfig.Enabled = false
cfg.Encoding = otlphttpexporter.EncodingProto
cfg.LogsEndpoint = srv.URL + "/v1/logs"
e, err := f.CreateLogs(context.Background(), exportertest.NewNopSettings(component.MustNewType("otlphttp")), cfg)
require.NoError(t, err)
err = e.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, e.Shutdown(context.Background()))
})

return e
}

func assertOnGRPCCode(t *testing.T, l consumer.Logs, s *status.Status) {
t.Helper()

rf := otlpreceiver.NewFactory()
rcfg := rf.CreateDefaultConfig().(*otlpreceiver.Config)
rcfg.GRPC = configoptional.Some(
configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: testutil.GetAvailableLocalAddress(t),
Transport: confignet.TransportTypeTCP,
},
},
)
r, err := rf.CreateLogs(context.Background(), receiver.Settings{
ID: component.MustNewID("otlp"),
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
}, rcfg, l)
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, r.Shutdown(context.Background()))
})

conn, err := grpc.NewClient(rcfg.GRPC.Get().NetAddr.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, conn.Close())
})

ld := testdata.GenerateLogs(2)

acc := plogotlp.NewGRPCClient(conn)
req := plogotlp.NewExportRequestFromLogs(ld)
res, err := acc.Export(context.Background(), req)

if s.Code() == codes.OK {
require.NoError(t, err)
} else {
got := status.Convert(err).Code()
require.Equal(t, s.Code(), got, "Expected code %s but got %s", s.Code().String(), got.String())
}
require.NotNil(t, res)
}

func assertOnHTTPCode(t *testing.T, l consumer.Logs, code int) {
t.Helper()

ld := testdata.GenerateLogs(2)
protoMarshaler := &plog.ProtoMarshaler{}
logProto, err := protoMarshaler.MarshalLogs(ld)
require.NoError(t, err)

rf := otlpreceiver.NewFactory()
rcfg := rf.CreateDefaultConfig().(*otlpreceiver.Config)
rcfg.HTTP = configoptional.Some(
otlpreceiver.HTTPConfig{
ServerConfig: confighttp.ServerConfig{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
LogsURLPath: "/v1/logs",
},
)
r, err := rf.CreateLogs(context.Background(), receiver.Settings{
ID: component.MustNewID("otlp"),
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
}, rcfg, l)
require.NoError(t, err)
err = r.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, r.Shutdown(context.Background()))
})

doHTTPRequest(t, rcfg.HTTP.Get().ServerConfig.Endpoint+"/v1/logs", logProto, code)
}

func doHTTPRequest(
t *testing.T,
url string,
data []byte,
expectStatusCode int,
) []byte {
req := createHTTPRequest(t, url, data)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)

respBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)

require.NoError(t, resp.Body.Close())

if expectStatusCode == 0 {
require.Equal(t, http.StatusOK, resp.StatusCode)
} else {
require.Equal(t, expectStatusCode, resp.StatusCode)
}

return respBytes
}

func createHTTPRequest(
t *testing.T,
url string,
data []byte,
) *http.Request {
buf := bytes.NewBuffer(data)

req, err := http.NewRequest(http.MethodPost, "http://"+url, buf)
require.NoError(t, err)
req.Header.Set("Content-Type", "application/x-protobuf")

return req
}
2 changes: 1 addition & 1 deletion internal/e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
go.opentelemetry.io/collector/service v0.130.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.73.0
)

require (
Expand Down Expand Up @@ -143,7 +144,6 @@ require (
gonum.org/v1/gonum v0.16.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/grpc v1.73.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.5.0 // indirect
Expand Down
Loading