Skip to content

Commit 36f80e5

Browse files
fix(redis): go-redis v9 regression missing metrics and reconnect hook (#13415) (#15275) (#17023)
* fix(redis): go-redis v9 regression missing metrics and reconnect hook * fix: golangci lint return values not checked in tests * chore: move dnsError var locally into func --------- Signed-off-by: phanama <[email protected]> Co-authored-by: Yudi A Phanama <[email protected]>
1 parent 2b45cc8 commit 36f80e5

File tree

5 files changed

+146
-62
lines changed

5 files changed

+146
-62
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ require (
217217
github.com/pjbgf/sha1cd v0.3.0 // indirect
218218
github.com/pkg/errors v0.9.1 // indirect
219219
github.com/pmezard/go-difflib v1.0.0 // indirect
220-
github.com/prometheus/client_model v0.3.0 // indirect
220+
github.com/prometheus/client_model v0.3.0
221221
github.com/prometheus/common v0.42.0 // indirect
222222
github.com/prometheus/procfs v0.10.1 // indirect
223223
github.com/rivo/uniseg v0.4.4 // indirect

util/cache/redis.go

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"encoding/json"
88
"fmt"
99
"io"
10+
"net"
1011
"time"
1112

1213
ioutil "github.com/argoproj/argo-cd/v2/util/io"
@@ -155,41 +156,27 @@ type MetricsRegistry interface {
155156
ObserveRedisRequestDuration(duration time.Duration)
156157
}
157158

158-
var metricStartTimeKey = struct{}{}
159-
160159
type redisHook struct {
161160
registry MetricsRegistry
162161
}
163162

164-
func (rh *redisHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
165-
return context.WithValue(ctx, metricStartTimeKey, time.Now()), nil
166-
}
167-
168-
func (rh *redisHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
169-
cmdErr := cmd.Err()
170-
rh.registry.IncRedisRequest(cmdErr != nil && cmdErr != redis.Nil)
171-
172-
startTime := ctx.Value(metricStartTimeKey).(time.Time)
173-
duration := time.Since(startTime)
174-
rh.registry.ObserveRedisRequestDuration(duration)
175-
176-
return nil
177-
}
178-
179-
func (redisHook) BeforeProcessPipeline(ctx context.Context, _ []redis.Cmder) (context.Context, error) {
180-
return ctx, nil
163+
func (rh *redisHook) DialHook(next redis.DialHook) redis.DialHook {
164+
return func(ctx context.Context, network, addr string) (net.Conn, error) {
165+
conn, err := next(ctx, network, addr)
166+
return conn, err
167+
}
181168
}
182169

183-
func (redisHook) AfterProcessPipeline(_ context.Context, _ []redis.Cmder) error {
184-
return nil
185-
}
170+
func (rh *redisHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
171+
return func(ctx context.Context, cmd redis.Cmder) error {
172+
startTime := time.Now()
186173

187-
func (redisHook) DialHook(next redis.DialHook) redis.DialHook {
188-
return nil
189-
}
174+
err := next(ctx, cmd)
175+
rh.registry.IncRedisRequest(err != nil && err != redis.Nil)
176+
rh.registry.ObserveRedisRequestDuration(time.Since(startTime))
190177

191-
func (redisHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
192-
return nil
178+
return err
179+
}
193180
}
194181

195182
func (redisHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {

util/cache/redis_hook.go

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ package cache
22

33
import (
44
"context"
5-
"strings"
5+
"errors"
6+
"net"
67

78
"github.com/redis/go-redis/v9"
89
log "github.com/sirupsen/logrus"
910
)
1011

11-
const NoSuchHostErr = "no such host"
12-
1312
type argoRedisHooks struct {
1413
reconnectCallback func()
1514
}
@@ -18,32 +17,23 @@ func NewArgoRedisHook(reconnectCallback func()) *argoRedisHooks {
1817
return &argoRedisHooks{reconnectCallback: reconnectCallback}
1918
}
2019

21-
func (hook *argoRedisHooks) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
22-
return ctx, nil
23-
}
24-
25-
func (hook *argoRedisHooks) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
26-
if cmd.Err() != nil && strings.Contains(cmd.Err().Error(), NoSuchHostErr) {
27-
log.Warnf("Reconnect to redis because error: \"%v\"", cmd.Err())
28-
hook.reconnectCallback()
29-
}
30-
return nil
31-
}
32-
33-
func (hook *argoRedisHooks) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
34-
return ctx, nil
35-
}
36-
37-
func (hook *argoRedisHooks) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
38-
return nil
39-
}
40-
4120
func (hook *argoRedisHooks) DialHook(next redis.DialHook) redis.DialHook {
42-
return nil
21+
return func(ctx context.Context, network, addr string) (net.Conn, error) {
22+
conn, err := next(ctx, network, addr)
23+
return conn, err
24+
}
4325
}
4426

4527
func (hook *argoRedisHooks) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
46-
return nil
28+
return func(ctx context.Context, cmd redis.Cmder) error {
29+
var dnsError *net.DNSError
30+
err := next(ctx, cmd)
31+
if err != nil && errors.As(err, &dnsError) {
32+
log.Warnf("Reconnect to redis because error: \"%v\"", err)
33+
hook.reconnectCallback()
34+
}
35+
return err
36+
}
4737
}
4838

4939
func (hook *argoRedisHooks) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {

util/cache/redis_hook_test.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,53 @@
11
package cache
22

33
import (
4-
"context"
5-
"errors"
64
"testing"
5+
"time"
76

7+
"github.com/alicebob/miniredis/v2"
88
"github.com/stretchr/testify/assert"
99

1010
"github.com/redis/go-redis/v9"
1111
)
1212

1313
func Test_ReconnectCallbackHookCalled(t *testing.T) {
14+
mr, err := miniredis.Run()
15+
if err != nil {
16+
panic(err)
17+
}
18+
defer mr.Close()
19+
1420
called := false
1521
hook := NewArgoRedisHook(func() {
1622
called = true
1723
})
1824

19-
cmd := &redis.StringCmd{}
20-
cmd.SetErr(errors.New("Failed to resync revoked tokens. retrying again in 1 minute: dial tcp: lookup argocd-redis on 10.179.0.10:53: no such host"))
21-
22-
_ = hook.AfterProcess(context.Background(), cmd)
25+
faultyDNSRedisClient := redis.NewClient(&redis.Options{Addr: "invalidredishost.invalid:12345"})
26+
faultyDNSRedisClient.AddHook(hook)
2327

28+
faultyDNSClient := NewRedisCache(faultyDNSRedisClient, 60*time.Second, RedisCompressionNone)
29+
err = faultyDNSClient.Set(&Item{Key: "baz", Object: "foo"})
2430
assert.Equal(t, called, true)
31+
assert.Error(t, err)
2532
}
2633

2734
func Test_ReconnectCallbackHookNotCalled(t *testing.T) {
35+
mr, err := miniredis.Run()
36+
if err != nil {
37+
panic(err)
38+
}
39+
defer mr.Close()
40+
2841
called := false
2942
hook := NewArgoRedisHook(func() {
3043
called = true
3144
})
32-
cmd := &redis.StringCmd{}
33-
cmd.SetErr(errors.New("Something wrong"))
3445

35-
_ = hook.AfterProcess(context.Background(), cmd)
46+
redisClient := redis.NewClient(&redis.Options{Addr: mr.Addr()})
47+
redisClient.AddHook(hook)
48+
client := NewRedisCache(redisClient, 60*time.Second, RedisCompressionNone)
3649

50+
err = client.Set(&Item{Key: "foo", Object: "bar"})
3751
assert.Equal(t, called, false)
52+
assert.NoError(t, err)
3853
}

util/cache/redis_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,59 @@ package cache
22

33
import (
44
"context"
5+
"strconv"
56
"testing"
67
"time"
78

9+
promcm "github.com/prometheus/client_model/go"
10+
811
"github.com/alicebob/miniredis/v2"
12+
"github.com/prometheus/client_golang/prometheus"
913
"github.com/redis/go-redis/v9"
1014
"github.com/stretchr/testify/assert"
1115
)
1216

17+
var (
18+
redisRequestCounter = prometheus.NewCounterVec(
19+
prometheus.CounterOpts{
20+
Name: "argocd_redis_request_total",
21+
},
22+
[]string{"initiator", "failed"},
23+
)
24+
redisRequestHistogram = prometheus.NewHistogramVec(
25+
prometheus.HistogramOpts{
26+
Name: "argocd_redis_request_duration",
27+
Buckets: []float64{0.1, 0.25, .5, 1, 2},
28+
},
29+
[]string{"initiator"},
30+
)
31+
)
32+
33+
type MockMetricsServer struct {
34+
registry *prometheus.Registry
35+
redisRequestCounter *prometheus.CounterVec
36+
redisRequestHistogram *prometheus.HistogramVec
37+
}
38+
39+
func NewMockMetricsServer() *MockMetricsServer {
40+
registry := prometheus.NewRegistry()
41+
registry.MustRegister(redisRequestCounter)
42+
registry.MustRegister(redisRequestHistogram)
43+
return &MockMetricsServer{
44+
registry: registry,
45+
redisRequestCounter: redisRequestCounter,
46+
redisRequestHistogram: redisRequestHistogram,
47+
}
48+
}
49+
50+
func (m *MockMetricsServer) IncRedisRequest(failed bool) {
51+
m.redisRequestCounter.WithLabelValues("mock", strconv.FormatBool(failed)).Inc()
52+
}
53+
54+
func (m *MockMetricsServer) ObserveRedisRequestDuration(duration time.Duration) {
55+
m.redisRequestHistogram.WithLabelValues("mock").Observe(duration.Seconds())
56+
}
57+
1358
func TestRedisSetCache(t *testing.T) {
1459
mr, err := miniredis.Run()
1560
if err != nil {
@@ -70,3 +115,50 @@ func TestRedisSetCacheCompressed(t *testing.T) {
70115

71116
assert.Equal(t, testValue, result)
72117
}
118+
119+
func TestRedisMetrics(t *testing.T) {
120+
mr, err := miniredis.Run()
121+
if err != nil {
122+
panic(err)
123+
}
124+
defer mr.Close()
125+
126+
metric := &promcm.Metric{}
127+
ms := NewMockMetricsServer()
128+
redisClient := redis.NewClient(&redis.Options{Addr: mr.Addr()})
129+
faultyRedisClient := redis.NewClient(&redis.Options{Addr: "invalidredishost.invalid:12345"})
130+
CollectMetrics(redisClient, ms)
131+
CollectMetrics(faultyRedisClient, ms)
132+
133+
client := NewRedisCache(redisClient, 60*time.Second, RedisCompressionNone)
134+
faultyClient := NewRedisCache(faultyRedisClient, 60*time.Second, RedisCompressionNone)
135+
var res string
136+
137+
//client successful request
138+
err = client.Set(&Item{Key: "foo", Object: "bar"})
139+
assert.NoError(t, err)
140+
err = client.Get("foo", &res)
141+
assert.NoError(t, err)
142+
143+
c, err := ms.redisRequestCounter.GetMetricWithLabelValues("mock", "false")
144+
assert.NoError(t, err)
145+
err = c.Write(metric)
146+
assert.NoError(t, err)
147+
assert.Equal(t, metric.Counter.GetValue(), float64(2))
148+
149+
//faulty client failed request
150+
err = faultyClient.Get("foo", &res)
151+
assert.Error(t, err)
152+
c, err = ms.redisRequestCounter.GetMetricWithLabelValues("mock", "true")
153+
assert.NoError(t, err)
154+
err = c.Write(metric)
155+
assert.NoError(t, err)
156+
assert.Equal(t, metric.Counter.GetValue(), float64(1))
157+
158+
//both clients histogram count
159+
o, err := ms.redisRequestHistogram.GetMetricWithLabelValues("mock")
160+
assert.NoError(t, err)
161+
err = o.(prometheus.Metric).Write(metric)
162+
assert.NoError(t, err)
163+
assert.Equal(t, int(metric.Histogram.GetSampleCount()), 3)
164+
}

0 commit comments

Comments
 (0)