diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index a0d9ee5c954..dde546fa9bc 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -95,6 +95,8 @@ type jReceiver struct { agentProcessors []processors.Processor agentServer *http.Server + goroutines sync.WaitGroup + logger *zap.Logger } @@ -213,7 +215,6 @@ func (jr *jReceiver) Shutdown(context.Context) error { if aerr := jr.agentServer.Close(); aerr != nil { errs = append(errs, aerr) } - jr.agentServer = nil } for _, processor := range jr.agentProcessors { processor.Stop() @@ -223,12 +224,12 @@ func (jr *jReceiver) Shutdown(context.Context) error { if cerr := jr.collectorServer.Close(); cerr != nil { errs = append(errs, cerr) } - jr.collectorServer = nil } if jr.grpc != nil { jr.grpc.Stop() - jr.grpc = nil } + + jr.goroutines.Wait() err = consumererror.Combine(errs) }) @@ -303,7 +304,7 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) return &api_v2.PostSpansResponse{}, nil } -func (jr *jReceiver) startAgent(_ component.Host) error { +func (jr *jReceiver) startAgent(host component.Host) error { if !jr.agentBinaryThriftEnabled() && !jr.agentCompactThriftEnabled() && !jr.agentHTTPEnabled() { return nil } @@ -334,8 +335,12 @@ func (jr *jReceiver) startAgent(_ component.Host) error { jr.agentProcessors = append(jr.agentProcessors, processor) } + jr.goroutines.Add(len(jr.agentProcessors)) for _, processor := range jr.agentProcessors { - go processor.Serve() + go func(p processors.Processor) { + defer jr.goroutines.Done() + p.Serve() + }(processor) } // Start upstream grpc client before serving sampling endpoints over HTTP @@ -357,9 +362,11 @@ func (jr *jReceiver) startAgent(_ component.Host) error { if jr.agentHTTPEnabled() { jr.agentServer = httpserver.NewHTTPServer(jr.agentHTTPAddr(), jr, metrics.NullFactory) + jr.goroutines.Add(1) go func() { + defer jr.goroutines.Done() if err := jr.agentServer.ListenAndServe(); err != http.ErrServerClosed { - jr.logger.Error("http server failure", zap.Error(err)) + host.ReportFatalError(fmt.Errorf("jaeger agent server error: %w", err)) } }() } @@ -465,8 +472,12 @@ func (jr *jReceiver) startCollector(host component.Host) error { nr := mux.NewRouter() nr.HandleFunc("/api/traces", jr.HandleThriftHTTPBatch).Methods(http.MethodPost) jr.collectorServer = &http.Server{Handler: nr} + jr.goroutines.Add(1) go func() { - _ = jr.collectorServer.Serve(cln) + defer jr.goroutines.Done() + if err := jr.collectorServer.Serve(cln); err != http.ErrServerClosed { + host.ReportFatalError(err) + } }() } @@ -489,8 +500,10 @@ func (jr *jReceiver) startCollector(host component.Host) error { } api_v2.RegisterSamplingManagerServer(jr.grpc, collectorSampling.NewGRPCHandler(ss)) + jr.goroutines.Add(1) go func() { - if err := jr.grpc.Serve(gln); err != nil { + defer jr.goroutines.Done() + if err := jr.grpc.Serve(gln); err != nil && err != grpc.ErrServerStopped { host.ReportFatalError(err) } }()