Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{})

// CreateTracesReceiver creates a trace receiver based on provided config.
func createTraceReceiver(
ctx context.Context,
_ context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.TracesConsumer,
Expand All @@ -130,15 +130,15 @@ func createTraceReceiver(
if err != nil {
return nil, err
}
if err = r.registerTraceConsumer(ctx, nextConsumer); err != nil {
if err = r.registerTraceConsumer(nextConsumer); err != nil {
return nil, err
}
return r, nil
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func createMetricsReceiver(
ctx context.Context,
_ context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumer,
Expand All @@ -147,15 +147,15 @@ func createMetricsReceiver(
if err != nil {
return nil, err
}
if err = r.registerMetricsConsumer(ctx, consumer); err != nil {
if err = r.registerMetricsConsumer(consumer); err != nil {
return nil, err
}
return r, nil
}

// CreateLogReceiver creates a log receiver based on provided config.
func createLogReceiver(
ctx context.Context,
_ context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.LogsConsumer,
Expand All @@ -164,7 +164,7 @@ func createLogReceiver(
if err != nil {
return nil, err
}
if err = r.registerLogsConsumer(ctx, consumer); err != nil {
if err = r.registerLogsConsumer(consumer); err != nil {
return nil, err
}
return r, nil
Expand Down
59 changes: 49 additions & 10 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type otlpReceiver struct {

stopOnce sync.Once
startServerOnce sync.Once
shutdownWG sync.WaitGroup

logger *zap.Logger
}
Expand Down Expand Up @@ -96,8 +97,11 @@ func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host
if err != nil {
return err
}
r.shutdownWG.Add(1)
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
defer r.shutdownWG.Done()

if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil && errGrpc != grpc.ErrServerStopped {
host.ReportFatalError(errGrpc)
}
}()
Expand All @@ -111,15 +115,36 @@ func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host
if err != nil {
return err
}
r.shutdownWG.Add(1)
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
defer r.shutdownWG.Done()

if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && errHTTP != http.ErrServerClosed {
host.ReportFatalError(errHTTP)
}
}()
return nil
}

func (r *otlpReceiver) startProtocolServers(host component.Host) error {
func (r *otlpReceiver) startProtocolServers(ctx context.Context, host component.Host) error {
if r.traceReceiver != nil {
if err := r.registerTraceServers(ctx); err != nil {
return err
}
}

if r.metricsReceiver != nil {
if err := r.registerMetricsServers(ctx); err != nil {
return err
}
}

if r.logReceiver != nil {
if err := r.registerLogsServers(ctx); err != nil {
return err
}
}

var err error
if r.cfg.GRPC != nil {
err = r.startGRPCServer(r.cfg.GRPC, host)
Expand Down Expand Up @@ -155,14 +180,14 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {

// Start runs the trace receiver on the gRPC server. Currently
// it also enables the metrics receiver too.
func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
func (r *otlpReceiver) Start(ctx context.Context, host component.Host) error {
if r.traceReceiver == nil && r.metricsReceiver == nil && r.logReceiver == nil {
return errors.New("cannot start receiver: no consumers were specified")
}

var err error
r.startServerOnce.Do(func() {
err = r.startProtocolServers(host)
err = r.startProtocolServers(ctx, host)
})
return err
}
Expand All @@ -180,15 +205,21 @@ func (r *otlpReceiver) Shutdown(ctx context.Context) error {
if r.serverGRPC != nil {
r.serverGRPC.GracefulStop()
}

r.shutdownWG.Wait()
})
return err
}

func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.TracesConsumer) error {
func (r *otlpReceiver) registerTraceConsumer(tc consumer.TracesConsumer) error {
if tc == nil {
return componenterror.ErrNilNextConsumer
}
r.traceReceiver = trace.New(r.cfg.Name(), tc)
return nil
}

func (r *otlpReceiver) registerTraceServers(ctx context.Context) error {
if r.serverGRPC != nil {
collectortrace.RegisterTraceServiceServer(r.serverGRPC, r.traceReceiver)
}
Expand All @@ -203,11 +234,15 @@ func (r *otlpReceiver) registerTraceConsumer(ctx context.Context, tc consumer.Tr
return nil
}

func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer.MetricsConsumer) error {
func (r *otlpReceiver) registerMetricsConsumer(mc consumer.MetricsConsumer) error {
if mc == nil {
return componenterror.ErrNilNextConsumer
}
r.metricsReceiver = metrics.New(r.cfg.Name(), mc)
return nil
}

func (r *otlpReceiver) registerMetricsServers(ctx context.Context) error {
if r.serverGRPC != nil {
collectormetrics.RegisterMetricsServiceServer(r.serverGRPC, r.metricsReceiver)
}
Expand All @@ -217,11 +252,15 @@ func (r *otlpReceiver) registerMetricsConsumer(ctx context.Context, mc consumer.
return nil
}

func (r *otlpReceiver) registerLogsConsumer(ctx context.Context, tc consumer.LogsConsumer) error {
if tc == nil {
func (r *otlpReceiver) registerLogsConsumer(lc consumer.LogsConsumer) error {
if lc == nil {
return componenterror.ErrNilNextConsumer
}
r.logReceiver = logs.New(r.cfg.Name(), tc)
r.logReceiver = logs.New(r.cfg.Name(), lc)
return nil
}

func (r *otlpReceiver) registerLogsServers(ctx context.Context) error {
if r.serverGRPC != nil {
collectorlog.RegisterLogsServiceServer(r.serverGRPC, r.logReceiver)
}
Expand Down
3 changes: 1 addition & 2 deletions service/defaultcomponents/default_receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func TestDefaultReceivers(t *testing.T) {
skipLifecyle: true, // TODO: Usage of CMux doesn't allow proper shutdown.
},
{
receiver: "otlp",
skipLifecyle: true, // TODO: Upcoming PR to fix zipkin lifecycle.
receiver: "otlp",
},
{
receiver: "prometheus",
Expand Down