Skip to content

Commit de4901c

Browse files
committed
Improve connection state logging for Jaeger exporter
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
1 parent ecb27f4 commit de4901c

File tree

4 files changed

+205
-2
lines changed

4 files changed

+205
-2
lines changed

exporter/jaegerexporter/exporter.go

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,17 @@ package jaegerexporter
1717
import (
1818
"context"
1919
"fmt"
20+
"sync"
21+
"time"
2022

2123
jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
24+
"go.opencensus.io/stats"
25+
"go.opencensus.io/stats/view"
26+
"go.opencensus.io/tag"
2227
"go.uber.org/zap"
2328
"google.golang.org/grpc"
29+
"google.golang.org/grpc/backoff"
30+
"google.golang.org/grpc/connectivity"
2431
"google.golang.org/grpc/metadata"
2532

2633
"go.opentelemetry.io/collector/component"
@@ -40,21 +47,37 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
4047
return nil, err
4148
}
4249

43-
client, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
50+
opts = append(opts, grpc.WithConnectParams(
51+
grpc.ConnectParams{
52+
Backoff: backoff.DefaultConfig,
53+
MinConnectTimeout: 10 * time.Second,
54+
},
55+
))
56+
57+
conn, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
4458
if err != nil {
4559
return nil, err
4660
}
4761

48-
collectorServiceClient := jaegerproto.NewCollectorServiceClient(client)
62+
collectorServiceClient := jaegerproto.NewCollectorServiceClient(conn)
4963
s := &protoGRPCSender{
64+
name: cfg.NameVal,
5065
logger: logger,
5166
client: collectorServiceClient,
5267
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
5368
waitForReady: cfg.WaitForReady,
69+
70+
conn: conn,
71+
connStateReporterInterval: time.Second,
72+
73+
stopCh: make(chan (struct{})),
5474
}
75+
s.AddStateChangeCallback(s.onStateChange)
5576

5677
exp, err := exporterhelper.NewTraceExporter(
5778
cfg, logger, s.pushTraceData,
79+
exporterhelper.WithStart(s.start),
80+
exporterhelper.WithShutdown(s.shutdown),
5881
exporterhelper.WithTimeout(cfg.TimeoutSettings),
5982
exporterhelper.WithRetry(cfg.RetrySettings),
6083
exporterhelper.WithQueue(cfg.QueueSettings),
@@ -66,10 +89,22 @@ func newTraceExporter(cfg *Config, logger *zap.Logger) (component.TracesExporter
6689
// protoGRPCSender forwards spans encoded in the jaeger proto
6790
// format, to a grpc server.
6891
type protoGRPCSender struct {
92+
name string
6993
logger *zap.Logger
7094
client jaegerproto.CollectorServiceClient
7195
metadata metadata.MD
7296
waitForReady bool
97+
98+
conn StateReporter
99+
connStateReporterInterval time.Duration
100+
stateChangeCallbacks []func(connectivity.State)
101+
102+
stopCh chan (struct{})
103+
stopWg sync.WaitGroup
104+
}
105+
106+
type StateReporter interface {
107+
GetState() connectivity.State
73108
}
74109

75110
func (s *protoGRPCSender) pushTraceData(
@@ -100,3 +135,54 @@ func (s *protoGRPCSender) pushTraceData(
100135

101136
return 0, nil
102137
}
138+
139+
func (s *protoGRPCSender) shutdown(context.Context) error {
140+
close(s.stopCh)
141+
s.stopWg.Wait()
142+
view.Unregister(MetricViews()...)
143+
return nil
144+
}
145+
146+
func (s *protoGRPCSender) start(context.Context, component.Host) error {
147+
view.Register(MetricViews()...)
148+
go s.startConnectionStatusReporter()
149+
return nil
150+
}
151+
152+
func (s *protoGRPCSender) startConnectionStatusReporter() {
153+
connState := s.conn.GetState()
154+
s.propagateStateChange(connState)
155+
156+
ticker := time.NewTicker(s.connStateReporterInterval)
157+
for {
158+
select {
159+
case <-ticker.C:
160+
s.stopWg.Add(1)
161+
st := s.conn.GetState()
162+
if connState != st {
163+
// state has changed, report it
164+
connState = st
165+
s.propagateStateChange(st)
166+
}
167+
s.stopWg.Done()
168+
case <-s.stopCh:
169+
return
170+
}
171+
}
172+
}
173+
174+
func (s *protoGRPCSender) propagateStateChange(st connectivity.State) {
175+
for _, callback := range s.stateChangeCallbacks {
176+
callback(st)
177+
}
178+
}
179+
180+
func (s *protoGRPCSender) onStateChange(st connectivity.State) {
181+
mCtx, _ := tag.New(context.Background(), tag.Upsert(tag.MustNewKey("exporter_name"), s.name))
182+
stats.Record(mCtx, mLastConnectionState.M(int64(st)))
183+
s.logger.Info("State of the connection with the Jaeger Collector backend", zap.Stringer("state", st))
184+
}
185+
186+
func (s *protoGRPCSender) AddStateChangeCallback(f func(connectivity.State)) {
187+
s.stateChangeCallbacks = append(s.stateChangeCallbacks, f)
188+
}

exporter/jaegerexporter/exporter_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,24 @@ package jaegerexporter
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"net"
2021
"path"
2122
"sync"
2223
"testing"
24+
"time"
2325

2426
"github.com/jaegertracing/jaeger/model"
2527
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
2628
"github.com/stretchr/testify/assert"
2729
"github.com/stretchr/testify/require"
2830
"go.uber.org/zap"
2931
"google.golang.org/grpc"
32+
"google.golang.org/grpc/connectivity"
3033
"google.golang.org/grpc/credentials"
3134

3235
"go.opentelemetry.io/collector/component"
36+
"go.opentelemetry.io/collector/component/componenttest"
3337
"go.opentelemetry.io/collector/config/configgrpc"
3438
"go.opentelemetry.io/collector/config/configtls"
3539
"go.opentelemetry.io/collector/consumer/pdata"
@@ -248,6 +252,48 @@ func TestMutualTLS(t *testing.T) {
248252
assert.Equal(t, jTraceID, requestes[0].GetBatch().Spans[0].TraceID)
249253
}
250254

255+
func TestConnectionStateChange(t *testing.T) {
256+
var state connectivity.State
257+
258+
wg := sync.WaitGroup{}
259+
sr := &mockStateReporter{
260+
state: connectivity.Connecting,
261+
}
262+
sender := &protoGRPCSender{
263+
logger: zap.NewNop(),
264+
stopCh: make(chan (struct{})),
265+
conn: sr,
266+
connStateReporterInterval: 10 * time.Millisecond,
267+
}
268+
269+
wg.Add(1)
270+
sender.AddStateChangeCallback(func(c connectivity.State) {
271+
state = c
272+
wg.Done()
273+
})
274+
275+
sender.start(context.Background(), componenttest.NewNopHost())
276+
defer sender.shutdown(context.Background())
277+
wg.Wait() // wait for the initial state to be propagated
278+
279+
// test
280+
wg.Add(1)
281+
sr.state = connectivity.Ready
282+
283+
// verify
284+
wg.Wait() // wait until we get the state change
285+
assert.Equal(t, connectivity.Ready, state)
286+
}
287+
288+
type mockStateReporter struct {
289+
state connectivity.State
290+
}
291+
292+
func (m *mockStateReporter) GetState() connectivity.State {
293+
fmt.Printf("returning state: %d\n", m.state)
294+
return m.state
295+
}
296+
251297
func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server), opts ...grpc.ServerOption) (*grpc.Server, net.Addr) {
252298
server := grpc.NewServer(opts...)
253299
lis, err := net.Listen("tcp", "localhost:0")

exporter/jaegerexporter/metrics.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package jaegerexporter
16+
17+
import (
18+
"go.opencensus.io/stats"
19+
"go.opencensus.io/stats/view"
20+
"go.opencensus.io/tag"
21+
)
22+
23+
var (
24+
mLastConnectionState = stats.Int64("jaegerexporter_conn_state", "Last connection state: 0 = Idle, 1 = Connecting, 2 = Ready, 3 = TransientFailure, 4 = Shutdown", stats.UnitDimensionless)
25+
vLastConnectionState = &view.View{
26+
Name: mLastConnectionState.Name(),
27+
Measure: mLastConnectionState,
28+
Description: mLastConnectionState.Description(),
29+
Aggregation: view.LastValue(),
30+
TagKeys: []tag.Key{
31+
tag.MustNewKey("exporter_name"),
32+
},
33+
}
34+
)
35+
36+
// MetricViews return the metrics views according to given telemetry level.
37+
func MetricViews() []*view.View {
38+
return []*view.View{vLastConnectionState}
39+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package jaegerexporter
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
)
22+
23+
func TestProcessorMetrics(t *testing.T) {
24+
expectedViewNames := []string{
25+
"jaegerexporter_conn_state",
26+
}
27+
28+
views := MetricViews()
29+
for i, viewName := range expectedViewNames {
30+
assert.Equal(t, viewName, views[i].Name)
31+
}
32+
}

0 commit comments

Comments
 (0)