Skip to content

Commit db73e4c

Browse files
committed
Wait jaeger receiver server goroutines exit on shutdown
1 parent d10b842 commit db73e4c

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

receiver/jaegerreceiver/trace_receiver.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ type jReceiver struct {
9595
agentProcessors []processors.Processor
9696
agentServer *http.Server
9797

98+
goroutines sync.WaitGroup
99+
98100
logger *zap.Logger
99101
}
100102

@@ -213,7 +215,6 @@ func (jr *jReceiver) Shutdown(context.Context) error {
213215
if aerr := jr.agentServer.Close(); aerr != nil {
214216
errs = append(errs, aerr)
215217
}
216-
jr.agentServer = nil
217218
}
218219
for _, processor := range jr.agentProcessors {
219220
processor.Stop()
@@ -223,12 +224,12 @@ func (jr *jReceiver) Shutdown(context.Context) error {
223224
if cerr := jr.collectorServer.Close(); cerr != nil {
224225
errs = append(errs, cerr)
225226
}
226-
jr.collectorServer = nil
227227
}
228228
if jr.grpc != nil {
229229
jr.grpc.Stop()
230-
jr.grpc = nil
231230
}
231+
232+
jr.goroutines.Wait()
232233
err = consumererror.Combine(errs)
233234
})
234235

@@ -303,7 +304,7 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
303304
return &api_v2.PostSpansResponse{}, nil
304305
}
305306

306-
func (jr *jReceiver) startAgent(_ component.Host) error {
307+
func (jr *jReceiver) startAgent(host component.Host) error {
307308
if !jr.agentBinaryThriftEnabled() && !jr.agentCompactThriftEnabled() && !jr.agentHTTPEnabled() {
308309
return nil
309310
}
@@ -334,8 +335,12 @@ func (jr *jReceiver) startAgent(_ component.Host) error {
334335
jr.agentProcessors = append(jr.agentProcessors, processor)
335336
}
336337

338+
jr.goroutines.Add(len(jr.agentProcessors))
337339
for _, processor := range jr.agentProcessors {
338-
go processor.Serve()
340+
go func(p processors.Processor) {
341+
defer jr.goroutines.Done()
342+
p.Serve()
343+
}(processor)
339344
}
340345

341346
// Start upstream grpc client before serving sampling endpoints over HTTP
@@ -357,9 +362,11 @@ func (jr *jReceiver) startAgent(_ component.Host) error {
357362
if jr.agentHTTPEnabled() {
358363
jr.agentServer = httpserver.NewHTTPServer(jr.agentHTTPAddr(), jr, metrics.NullFactory)
359364

365+
jr.goroutines.Add(1)
360366
go func() {
367+
defer jr.goroutines.Done()
361368
if err := jr.agentServer.ListenAndServe(); err != http.ErrServerClosed {
362-
jr.logger.Error("http server failure", zap.Error(err))
369+
host.ReportFatalError(fmt.Errorf("jaeger agent server error: %w", err))
363370
}
364371
}()
365372
}
@@ -465,8 +472,12 @@ func (jr *jReceiver) startCollector(host component.Host) error {
465472
nr := mux.NewRouter()
466473
nr.HandleFunc("/api/traces", jr.HandleThriftHTTPBatch).Methods(http.MethodPost)
467474
jr.collectorServer = &http.Server{Handler: nr}
475+
jr.goroutines.Add(1)
468476
go func() {
469-
_ = jr.collectorServer.Serve(cln)
477+
defer jr.goroutines.Done()
478+
if err := jr.collectorServer.Serve(cln); err != http.ErrServerClosed {
479+
host.ReportFatalError(err)
480+
}
470481
}()
471482
}
472483

@@ -489,8 +500,10 @@ func (jr *jReceiver) startCollector(host component.Host) error {
489500
}
490501
api_v2.RegisterSamplingManagerServer(jr.grpc, collectorSampling.NewGRPCHandler(ss))
491502

503+
jr.goroutines.Add(1)
492504
go func() {
493-
if err := jr.grpc.Serve(gln); err != nil {
505+
defer jr.goroutines.Done()
506+
if err := jr.grpc.Serve(gln); err != nil && err != grpc.ErrServerStopped {
494507
host.ReportFatalError(err)
495508
}
496509
}()

0 commit comments

Comments
 (0)