diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 75553059d16..4cfb7568cc5 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -195,4 +195,6 @@ func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceive // We maintain this map because the Factory is asked trace and metric receivers separately // when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not // create separate objects, they must use one otlpReceiver object per configuration. +// When the receiver is shutdown it should be removed from this map so the same configuration +// can be recreated successfully. var receivers = map[*Config]*otlpReceiver{} diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index 4ab6fd675b1..e88fda13cf2 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -51,6 +51,7 @@ type otlpReceiver struct { stopOnce sync.Once startServerOnce sync.Once + shutdownWG sync.WaitGroup logger *zap.Logger } @@ -96,8 +97,11 @@ func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host if err != nil { return err } + r.shutdownWG.Add(1) go func() { - if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { + defer r.shutdownWG.Done() + + if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil && errGrpc != grpc.ErrServerStopped { host.ReportFatalError(errGrpc) } }() @@ -111,8 +115,11 @@ func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host if err != nil { return err } + r.shutdownWG.Add(1) go func() { - if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { + defer r.shutdownWG.Done() + + if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && errHTTP != http.ErrServerClosed { host.ReportFatalError(errHTTP) } }() @@ -180,6 +187,13 @@ func (r *otlpReceiver) Shutdown(ctx context.Context) error { if r.serverGRPC != nil { r.serverGRPC.GracefulStop() } + + r.shutdownWG.Wait() + + // delete the receiver from the map so it doesn't leak and it becomes possible to create + // another instance with the same configuration that functions properly. Notice that an + // OTLP object can only be started and shutdown once. + delete(receivers, r.cfg) }) return err } diff --git a/service/defaultcomponents/default_receivers_test.go b/service/defaultcomponents/default_receivers_test.go index deb9de7e5da..b1001c7b782 100644 --- a/service/defaultcomponents/default_receivers_test.go +++ b/service/defaultcomponents/default_receivers_test.go @@ -60,8 +60,7 @@ func TestDefaultReceivers(t *testing.T) { skipLifecyle: true, // TODO: Usage of CMux doesn't allow proper shutdown. }, { - receiver: "otlp", - skipLifecyle: true, // TODO: Upcoming PR to fix zipkin lifecycle. + receiver: "otlp", }, { receiver: "prometheus",