Skip to content

Commit 1632b15

Browse files
committed
Improve connection state logging for Jaeger exporter
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
1 parent 96b226a commit 1632b15

File tree

5 files changed

+259
-12
lines changed

5 files changed

+259
-12
lines changed

exporter/jaegerexporter/exporter.go

Lines changed: 102 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@ package jaegerexporter
1717
import (
1818
"context"
1919
"fmt"
20+
"sync"
21+
"time"
2022

2123
jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
24+
"go.opencensus.io/stats"
25+
"go.opencensus.io/tag"
2226
"go.uber.org/zap"
2327
"google.golang.org/grpc"
28+
"google.golang.org/grpc/connectivity"
2429
"google.golang.org/grpc/metadata"
2530

2631
"go.opentelemetry.io/collector/component"
@@ -40,21 +45,23 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
4045
return nil, err
4146
}
4247

43-
client, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
48+
conn, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
4449
if err != nil {
4550
return nil, err
4651
}
4752

48-
collectorServiceClient := jaegerproto.NewCollectorServiceClient(client)
49-
s := &protoGRPCSender{
50-
logger: logger,
51-
client: collectorServiceClient,
52-
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
53-
waitForReady: cfg.WaitForReady,
54-
}
55-
53+
collectorServiceClient := jaegerproto.NewCollectorServiceClient(conn)
54+
s := newProtoGRPCSender(logger,
55+
cfg.NameVal,
56+
collectorServiceClient,
57+
metadata.New(cfg.GRPCClientSettings.Headers),
58+
cfg.WaitForReady,
59+
conn,
60+
)
5661
exp, err := exporterhelper.NewTraceExporter(
5762
cfg, logger, s.pushTraceData,
63+
exporterhelper.WithStart(s.start),
64+
exporterhelper.WithShutdown(s.shutdown),
5865
exporterhelper.WithTimeout(cfg.TimeoutSettings),
5966
exporterhelper.WithRetry(cfg.RetrySettings),
6067
exporterhelper.WithQueue(cfg.QueueSettings),
@@ -66,10 +73,40 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
6673
// protoGRPCSender forwards spans encoded in the jaeger proto
6774
// format, to a grpc server.
6875
type protoGRPCSender struct {
76+
name string
6977
logger *zap.Logger
7078
client jaegerproto.CollectorServiceClient
7179
metadata metadata.MD
7280
waitForReady bool
81+
82+
conn stateReporter
83+
connStateReporterInterval time.Duration
84+
stateChangeCallbacks []func(connectivity.State)
85+
86+
stopCh chan (struct{})
87+
stopped bool
88+
stopLock sync.Mutex
89+
}
90+
91+
func newProtoGRPCSender(logger *zap.Logger, name string, cl jaegerproto.CollectorServiceClient, md metadata.MD, waitForReady bool, conn stateReporter) *protoGRPCSender {
92+
s := &protoGRPCSender{
93+
name: name,
94+
logger: logger,
95+
client: cl,
96+
metadata: md,
97+
waitForReady: waitForReady,
98+
99+
conn: conn,
100+
connStateReporterInterval: time.Second,
101+
102+
stopCh: make(chan (struct{})),
103+
}
104+
s.AddStateChangeCallback(s.onStateChange)
105+
return s
106+
}
107+
108+
type stateReporter interface {
109+
GetState() connectivity.State
73110
}
74111

75112
func (s *protoGRPCSender) pushTraceData(
@@ -100,3 +137,59 @@ func (s *protoGRPCSender) pushTraceData(
100137

101138
return 0, nil
102139
}
140+
141+
func (s *protoGRPCSender) shutdown(context.Context) error {
142+
s.stopLock.Lock()
143+
s.stopped = true
144+
s.stopLock.Unlock()
145+
close(s.stopCh)
146+
return nil
147+
}
148+
149+
func (s *protoGRPCSender) start(context.Context, component.Host) error {
150+
go s.startConnectionStatusReporter()
151+
return nil
152+
}
153+
154+
func (s *protoGRPCSender) startConnectionStatusReporter() {
155+
connState := s.conn.GetState()
156+
s.propagateStateChange(connState)
157+
158+
ticker := time.NewTicker(s.connStateReporterInterval)
159+
for {
160+
select {
161+
case <-ticker.C:
162+
s.stopLock.Lock()
163+
if s.stopped {
164+
s.stopLock.Unlock()
165+
return
166+
}
167+
168+
st := s.conn.GetState()
169+
if connState != st {
170+
// state has changed, report it
171+
connState = st
172+
s.propagateStateChange(st)
173+
}
174+
s.stopLock.Unlock()
175+
case <-s.stopCh:
176+
return
177+
}
178+
}
179+
}
180+
181+
func (s *protoGRPCSender) propagateStateChange(st connectivity.State) {
182+
for _, callback := range s.stateChangeCallbacks {
183+
callback(st)
184+
}
185+
}
186+
187+
func (s *protoGRPCSender) onStateChange(st connectivity.State) {
188+
mCtx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("exporter_name"), s.name))
189+
stats.Record(mCtx, mLastConnectionState.M(int64(st)))
190+
s.logger.Info("State of the connection with the Jaeger Collector backend", zap.Stringer("state", st))
191+
}
192+
193+
func (s *protoGRPCSender) AddStateChangeCallback(f func(connectivity.State)) {
194+
s.stateChangeCallbacks = append(s.stateChangeCallbacks, f)
195+
}

exporter/jaegerexporter/exporter_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@ import (
2020
"path"
2121
"sync"
2222
"testing"
23+
"time"
2324

2425
"github.com/jaegertracing/jaeger/model"
2526
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
2829
"go.uber.org/zap"
2930
"google.golang.org/grpc"
31+
"google.golang.org/grpc/connectivity"
3032
"google.golang.org/grpc/credentials"
3133

3234
"go.opentelemetry.io/collector/component"
35+
"go.opentelemetry.io/collector/component/componenttest"
3336
"go.opentelemetry.io/collector/config/configgrpc"
3437
"go.opentelemetry.io/collector/config/configtls"
3538
"go.opentelemetry.io/collector/consumer/pdata"
@@ -248,6 +251,83 @@ func TestMutualTLS(t *testing.T) {
248251
assert.Equal(t, jTraceID, requestes[0].GetBatch().Spans[0].TraceID)
249252
}
250253

254+
func TestConnectionStateChange(t *testing.T) {
255+
var state connectivity.State
256+
257+
wg := sync.WaitGroup{}
258+
sr := &mockStateReporter{
259+
state: connectivity.Connecting,
260+
}
261+
sender := &protoGRPCSender{
262+
logger: zap.NewNop(),
263+
stopCh: make(chan (struct{})),
264+
conn: sr,
265+
connStateReporterInterval: 10 * time.Millisecond,
266+
}
267+
268+
wg.Add(1)
269+
sender.AddStateChangeCallback(func(c connectivity.State) {
270+
state = c
271+
wg.Done()
272+
})
273+
274+
sender.start(context.Background(), componenttest.NewNopHost())
275+
defer sender.shutdown(context.Background())
276+
wg.Wait() // wait for the initial state to be propagated
277+
278+
// test
279+
wg.Add(1)
280+
sr.SetState(connectivity.Ready)
281+
282+
// verify
283+
wg.Wait() // wait until we get the state change
284+
assert.Equal(t, connectivity.Ready, state)
285+
}
286+
287+
func TestConnectionReporterEndsOnStopped(t *testing.T) {
288+
sr := &mockStateReporter{
289+
state: connectivity.Connecting,
290+
}
291+
292+
sender := &protoGRPCSender{
293+
logger: zap.NewNop(),
294+
stopCh: make(chan (struct{})),
295+
conn: sr,
296+
connStateReporterInterval: 10 * time.Millisecond,
297+
}
298+
299+
wg := sync.WaitGroup{}
300+
wg.Add(1)
301+
go func() {
302+
sender.startConnectionStatusReporter()
303+
wg.Done()
304+
}()
305+
306+
sender.stopLock.Lock()
307+
sender.stopped = true
308+
sender.stopLock.Unlock()
309+
310+
// if the test finishes, we are good... if it gets blocked, the conn status reporter didn't return when the sender was marked as stopped
311+
wg.Wait()
312+
}
313+
314+
type mockStateReporter struct {
315+
state connectivity.State
316+
mu sync.RWMutex
317+
}
318+
319+
func (m *mockStateReporter) GetState() connectivity.State {
320+
m.mu.RLock()
321+
st := m.state
322+
m.mu.RUnlock()
323+
return st
324+
}
325+
func (m *mockStateReporter) SetState(st connectivity.State) {
326+
m.mu.Lock()
327+
m.state = st
328+
m.mu.Unlock()
329+
}
330+
251331
func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) {
252332
server := grpc.NewServer(opts...)
253333
lis, err := net.Listen("tcp", "localhost:0")

exporter/jaegerexporter/metrics.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package jaegerexporter
16+
17+
import (
18+
"go.opencensus.io/stats"
19+
"go.opencensus.io/stats/view"
20+
"go.opencensus.io/tag"
21+
)
22+
23+
var (
24+
mLastConnectionState = stats.Int64("jaegerexporter_conn_state", "Last connection state: 0 = Idle, 1 = Connecting, 2 = Ready, 3 = TransientFailure, 4 = Shutdown", stats.UnitDimensionless)
25+
vLastConnectionState = &view.View{
26+
Name: mLastConnectionState.Name(),
27+
Measure: mLastConnectionState,
28+
Description: mLastConnectionState.Description(),
29+
Aggregation: view.LastValue(),
30+
TagKeys: []tag.Key{
31+
tag.MustNewKey("exporter_name"),
32+
},
33+
}
34+
)
35+
36+
// MetricViews return the metrics views according to given telemetry level.
37+
func MetricViews() []*view.View {
38+
return []*view.View{vLastConnectionState}
39+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package jaegerexporter
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
)
22+
23+
func TestProcessorMetrics(t *testing.T) {
24+
expectedViewNames := []string{
25+
"jaegerexporter_conn_state",
26+
}
27+
28+
views := MetricViews()
29+
for i, viewName := range expectedViewNames {
30+
assert.Equal(t, viewName, views[i].Name)
31+
}
32+
}

service/telemetry.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"go.uber.org/zap"
2626

2727
"go.opentelemetry.io/collector/config/configtelemetry"
28+
"go.opentelemetry.io/collector/exporter/jaegerexporter"
2829
"go.opentelemetry.io/collector/internal/collector/telemetry"
2930
"go.opentelemetry.io/collector/obsreport"
3031
"go.opentelemetry.io/collector/processor"
@@ -62,12 +63,14 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
6263
}
6364

6465
var views []*view.View
65-
views = append(views, obsreport.Configure(level)...)
66-
views = append(views, processor.MetricViews()...)
6766
views = append(views, batchprocessor.MetricViews()...)
67+
views = append(views, fluentobserv.MetricViews()...)
68+
views = append(views, jaegerexporter.MetricViews()...)
6869
views = append(views, kafkareceiver.MetricViews()...)
70+
views = append(views, obsreport.Configure(level)...)
6971
views = append(views, processMetricsViews.Views()...)
70-
views = append(views, fluentobserv.MetricViews()...)
72+
views = append(views, processor.MetricViews()...)
73+
7174
tel.views = views
7275
if err = view.Register(views...); err != nil {
7376
return err

0 commit comments

Comments
 (0)