Skip to content

Commit f33f064

Browse files
committed
Improve connection state logging for Jaeger exporter
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
1 parent 2962d95 commit f33f064

File tree

5 files changed

+208
-6
lines changed

5 files changed

+208
-6
lines changed

exporter/jaegerexporter/exporter.go

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@ package jaegerexporter
1717
import (
1818
"context"
1919
"fmt"
20+
"sync"
21+
"time"
2022

2123
jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
24+
"go.opencensus.io/stats"
25+
"go.opencensus.io/tag"
2226
"go.uber.org/zap"
2327
"google.golang.org/grpc"
28+
"google.golang.org/grpc/connectivity"
2429
"google.golang.org/grpc/metadata"
2530

2631
"go.opentelemetry.io/collector/component"
@@ -40,21 +45,30 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
4045
return nil, err
4146
}
4247

43-
client, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
48+
conn, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
4449
if err != nil {
4550
return nil, err
4651
}
4752

48-
collectorServiceClient := jaegerproto.NewCollectorServiceClient(client)
53+
collectorServiceClient := jaegerproto.NewCollectorServiceClient(conn)
4954
s := &protoGRPCSender{
55+
name: cfg.NameVal,
5056
logger: logger,
5157
client: collectorServiceClient,
5258
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
5359
waitForReady: cfg.WaitForReady,
60+
61+
conn: conn,
62+
connStateReporterInterval: time.Second,
63+
64+
stopCh: make(chan (struct{})),
5465
}
66+
s.AddStateChangeCallback(s.onStateChange)
5567

5668
exp, err := exporterhelper.NewTraceExporter(
5769
cfg, logger, s.pushTraceData,
70+
exporterhelper.WithStart(s.start),
71+
exporterhelper.WithShutdown(s.shutdown),
5872
exporterhelper.WithTimeout(cfg.TimeoutSettings),
5973
exporterhelper.WithRetry(cfg.RetrySettings),
6074
exporterhelper.WithQueue(cfg.QueueSettings),
@@ -66,10 +80,22 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
6680
// protoGRPCSender forwards spans encoded in the jaeger proto
6781
// format, to a grpc server.
6882
type protoGRPCSender struct {
83+
name string
6984
logger *zap.Logger
7085
client jaegerproto.CollectorServiceClient
7186
metadata metadata.MD
7287
waitForReady bool
88+
89+
conn stateReporter
90+
connStateReporterInterval time.Duration
91+
stateChangeCallbacks []func(connectivity.State)
92+
93+
stopCh chan (struct{})
94+
stopWg sync.WaitGroup
95+
}
96+
97+
type stateReporter interface {
98+
GetState() connectivity.State
7399
}
74100

75101
func (s *protoGRPCSender) pushTraceData(
@@ -100,3 +126,52 @@ func (s *protoGRPCSender) pushTraceData(
100126

101127
return 0, nil
102128
}
129+
130+
func (s *protoGRPCSender) shutdown(context.Context) error {
131+
close(s.stopCh)
132+
s.stopWg.Wait()
133+
return nil
134+
}
135+
136+
func (s *protoGRPCSender) start(context.Context, component.Host) error {
137+
go s.startConnectionStatusReporter()
138+
return nil
139+
}
140+
141+
func (s *protoGRPCSender) startConnectionStatusReporter() {
142+
connState := s.conn.GetState()
143+
s.propagateStateChange(connState)
144+
145+
ticker := time.NewTicker(s.connStateReporterInterval)
146+
for {
147+
select {
148+
case <-ticker.C:
149+
s.stopWg.Add(1)
150+
st := s.conn.GetState()
151+
if connState != st {
152+
// state has changed, report it
153+
connState = st
154+
s.propagateStateChange(st)
155+
}
156+
s.stopWg.Done()
157+
case <-s.stopCh:
158+
return
159+
}
160+
}
161+
}
162+
163+
func (s *protoGRPCSender) propagateStateChange(st connectivity.State) {
164+
for _, callback := range s.stateChangeCallbacks {
165+
callback(st)
166+
}
167+
}
168+
169+
func (s *protoGRPCSender) onStateChange(st connectivity.State) {
170+
mCtx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("exporter_name"), s.name))
171+
stats.Record(mCtx, mLastConnectionState.M(int64(st)))
172+
s.logger.Info("State of the connection with the Jaeger Collector backend", zap.Stringer("state", st))
173+
}
174+
175+
func (s *protoGRPCSender) AddStateChangeCallback(f func(connectivity.State)) {
176+
s.stateChangeCallbacks = append(s.stateChangeCallbacks, f)
177+
}

exporter/jaegerexporter/exporter_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@ import (
2020
"path"
2121
"sync"
2222
"testing"
23+
"time"
2324

2425
"github.com/jaegertracing/jaeger/model"
2526
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
2829
"go.uber.org/zap"
2930
"google.golang.org/grpc"
31+
"google.golang.org/grpc/connectivity"
3032
"google.golang.org/grpc/credentials"
3133

3234
"go.opentelemetry.io/collector/component"
35+
"go.opentelemetry.io/collector/component/componenttest"
3336
"go.opentelemetry.io/collector/config/configgrpc"
3437
"go.opentelemetry.io/collector/config/configtls"
3538
"go.opentelemetry.io/collector/consumer/pdata"
@@ -248,6 +251,56 @@ func TestMutualTLS(t *testing.T) {
248251
assert.Equal(t, jTraceID, requestes[0].GetBatch().Spans[0].TraceID)
249252
}
250253

254+
func TestConnectionStateChange(t *testing.T) {
255+
var state connectivity.State
256+
257+
wg := sync.WaitGroup{}
258+
sr := &mockStateReporter{
259+
state: connectivity.Connecting,
260+
}
261+
sender := &protoGRPCSender{
262+
logger: zap.NewNop(),
263+
stopCh: make(chan (struct{})),
264+
conn: sr,
265+
connStateReporterInterval: 10 * time.Millisecond,
266+
}
267+
268+
wg.Add(1)
269+
sender.AddStateChangeCallback(func(c connectivity.State) {
270+
state = c
271+
wg.Done()
272+
})
273+
274+
sender.start(context.Background(), componenttest.NewNopHost())
275+
defer sender.shutdown(context.Background())
276+
wg.Wait() // wait for the initial state to be propagated
277+
278+
// test
279+
wg.Add(1)
280+
sr.SetState(connectivity.Ready)
281+
282+
// verify
283+
wg.Wait() // wait until we get the state change
284+
assert.Equal(t, connectivity.Ready, state)
285+
}
286+
287+
type mockStateReporter struct {
288+
state connectivity.State
289+
mu sync.RWMutex
290+
}
291+
292+
func (m *mockStateReporter) GetState() connectivity.State {
293+
m.mu.RLock()
294+
st := m.state
295+
m.mu.RUnlock()
296+
return st
297+
}
298+
func (m *mockStateReporter) SetState(st connectivity.State) {
299+
m.mu.Lock()
300+
m.state = st
301+
m.mu.Unlock()
302+
}
303+
251304
func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) {
252305
server := grpc.NewServer(opts...)
253306
lis, err := net.Listen("tcp", "localhost:0")

exporter/jaegerexporter/metrics.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package jaegerexporter
16+
17+
import (
18+
"go.opencensus.io/stats"
19+
"go.opencensus.io/stats/view"
20+
"go.opencensus.io/tag"
21+
)
22+
23+
var (
24+
mLastConnectionState = stats.Int64("jaegerexporter_conn_state", "Last connection state: 0 = Idle, 1 = Connecting, 2 = Ready, 3 = TransientFailure, 4 = Shutdown", stats.UnitDimensionless)
25+
vLastConnectionState = &view.View{
26+
Name: mLastConnectionState.Name(),
27+
Measure: mLastConnectionState,
28+
Description: mLastConnectionState.Description(),
29+
Aggregation: view.LastValue(),
30+
TagKeys: []tag.Key{
31+
tag.MustNewKey("exporter_name"),
32+
},
33+
}
34+
)
35+
36+
// MetricViews return the metrics views according to given telemetry level.
37+
func MetricViews() []*view.View {
38+
return []*view.View{vLastConnectionState}
39+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package jaegerexporter
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
)
22+
23+
func TestProcessorMetrics(t *testing.T) {
24+
expectedViewNames := []string{
25+
"jaegerexporter_conn_state",
26+
}
27+
28+
views := MetricViews()
29+
for i, viewName := range expectedViewNames {
30+
assert.Equal(t, viewName, views[i].Name)
31+
}
32+
}

service/telemetry.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"go.uber.org/zap"
2626

2727
"go.opentelemetry.io/collector/config/configtelemetry"
28+
"go.opentelemetry.io/collector/exporter/jaegerexporter"
2829
"go.opentelemetry.io/collector/internal/collector/telemetry"
2930
"go.opentelemetry.io/collector/obsreport"
3031
"go.opentelemetry.io/collector/processor"
@@ -63,13 +64,15 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
6364
}
6465

6566
var views []*view.View
66-
views = append(views, obsreport.Configure(level)...)
67-
views = append(views, processor.MetricViews()...)
68-
views = append(views, queuedprocessor.MetricViews()...)
6967
views = append(views, batchprocessor.MetricViews()...)
68+
views = append(views, fluentobserv.MetricViews()...)
69+
views = append(views, jaegerexporter.MetricViews()...)
7070
views = append(views, kafkareceiver.MetricViews()...)
71+
views = append(views, obsreport.Configure(level)...)
7172
views = append(views, processMetricsViews.Views()...)
72-
views = append(views, fluentobserv.MetricViews()...)
73+
views = append(views, processor.MetricViews()...)
74+
views = append(views, queuedprocessor.MetricViews()...)
75+
7376
tel.views = views
7477
if err = view.Register(views...); err != nil {
7578
return err

0 commit comments

Comments
 (0)