Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,6 @@ func createReceiver(cfg configmodels.Receiver, logger *zap.Logger) (*otlpReceive
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = map[*Config]*otlpReceiver{}
18 changes: 16 additions & 2 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type otlpReceiver struct {

stopOnce sync.Once
startServerOnce sync.Once
shutdownWG sync.WaitGroup

logger *zap.Logger
}
Expand Down Expand Up @@ -96,8 +97,11 @@ func (r *otlpReceiver) startGRPCServer(cfg *configgrpc.GRPCServerSettings, host
if err != nil {
return err
}
r.shutdownWG.Add(1)
go func() {
if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil {
defer r.shutdownWG.Done()

if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil && errGrpc != grpc.ErrServerStopped {
host.ReportFatalError(errGrpc)
}
}()
Expand All @@ -111,8 +115,11 @@ func (r *otlpReceiver) startHTTPServer(cfg *confighttp.HTTPServerSettings, host
if err != nil {
return err
}
r.shutdownWG.Add(1)
go func() {
if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil {
defer r.shutdownWG.Done()

if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && errHTTP != http.ErrServerClosed {
host.ReportFatalError(errHTTP)
}
}()
Expand Down Expand Up @@ -180,6 +187,13 @@ func (r *otlpReceiver) Shutdown(ctx context.Context) error {
if r.serverGRPC != nil {
r.serverGRPC.GracefulStop()
}

r.shutdownWG.Wait()

// delete the receiver from the map so it doesn't leak and it becomes possible to create
// another instance with the same configuration that functions properly. Notice that an
// OTLP object can only be started and shutdown once.
delete(receivers, r.cfg)
})
return err
}
Expand Down
3 changes: 1 addition & 2 deletions service/defaultcomponents/default_receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func TestDefaultReceivers(t *testing.T) {
skipLifecyle: true, // TODO: Usage of CMux doesn't allow proper shutdown.
},
{
receiver: "otlp",
skipLifecyle: true, // TODO: Upcoming PR to fix zipkin lifecycle.
receiver: "otlp",
},
{
receiver: "prometheus",
Expand Down