-
Notifications
You must be signed in to change notification settings - Fork 2.6k
fix: connection closure metrics and enable all metric groups by default #3735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
9f26be2
fix: connection closure metrics and enable all metric groups by default
ofekshenawa 92c94e3
fix: upgrade OTel SDK to v1.40.0 to address GHSA-9h8m-3fm2-qjrq
ofekshenawa 226e454
fix: add reason parameter to CloseConn for accurate connection closur…
ofekshenawa 2e5d0a9
fix: add state change callback to CloseConn and update test mocks
ofekshenawa ccc4cf0
fix: add fromState parameter to CloseConn for explicit state tracking
ofekshenawa af37801
Merge branch 'master' of https://github.com/redis/go-redis into fix/m…
ofekshenawa 9c3a0a3
Use consts for connection close metric
ofekshenawa a2217ab
fix: add context.Context parameter to CloseConn for trace-to-metric c…
ofekshenawa dfbc5d7
Merge branch 'master' into fix/metrics-improvements
ofekshenawa File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| package redisotel | ||
|
|
||
| import "testing" | ||
|
|
||
| // Expected metric names per OTel semantic conventions. | ||
| // Reference: https://opentelemetry.io/docs/specs/semconv/database/database-metrics/ | ||
| const ( | ||
| semconvOperationDuration = "db.client.operation.duration" | ||
| semconvConnectionCount = "db.client.connection.count" | ||
| semconvConnectionCreateTime = "db.client.connection.create_time" | ||
| semconvConnectionWaitTime = "db.client.connection.wait_time" | ||
| semconvConnectionPending = "db.client.connection.pending_requests" | ||
| ) | ||
|
|
||
| // TestMetricDefinitionsMatchSemconv verifies metric names match OTel semantic conventions. | ||
| func TestMetricDefinitionsMatchSemconv(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| got string | ||
| expected string | ||
| }{ | ||
| {"db.client.operation.duration", MetricOperationDuration, semconvOperationDuration}, | ||
| {"db.client.connection.count", MetricConnectionCount, semconvConnectionCount}, | ||
| {"db.client.connection.create_time", MetricConnectionCreateTime, semconvConnectionCreateTime}, | ||
| {"db.client.connection.wait_time", MetricConnectionWaitTime, semconvConnectionWaitTime}, | ||
| {"db.client.connection.pending_requests", MetricConnectionPendingReqs, semconvConnectionPending}, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| if tt.got != tt.expected { | ||
| t.Errorf("got %q, want %q", tt.got, tt.expected) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // TestSemconvMetricTypes documents expected metric instrument types. | ||
| // Semconv specifies UpDownCounter for connection.count and pending_requests, | ||
| // but this implementation uses ObservableGauge (known deviation, see issue #3730). | ||
| func TestSemconvMetricTypes(t *testing.T) { | ||
| t.Run("connection.count uses Gauge (semconv specifies UpDownCounter)", func(t *testing.T) { | ||
| // Known deviation: using ObservableGauge instead of UpDownCounter | ||
| }) | ||
|
|
||
| t.Run("pending_requests uses Gauge (semconv specifies UpDownCounter)", func(t *testing.T) { | ||
| // Known deviation: using ObservableGauge instead of UpDownCounter | ||
| }) | ||
|
|
||
| t.Run("operation.duration uses Histogram (correct)", func(t *testing.T) { | ||
| // Matches semconv: Float64Histogram | ||
| }) | ||
|
|
||
| t.Run("connection.create_time uses Histogram (correct)", func(t *testing.T) { | ||
| // Matches semconv: Float64Histogram | ||
| }) | ||
|
|
||
| t.Run("connection.wait_time uses Histogram (correct)", func(t *testing.T) { | ||
| // Matches semconv: Float64Histogram | ||
| }) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,220 @@ | ||
| package redisotel | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "math/rand" | ||
| "sync" | ||
| "sync/atomic" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/redis/go-redis/v9" | ||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/sdk/metric" | ||
| "go.opentelemetry.io/otel/sdk/metric/metricdata" | ||
| ) | ||
|
|
||
| const ( | ||
| stressTestDuration = 30 * time.Second | ||
| stressTestConcurrency = 50 | ||
| stressTestMinDelay = 10 * time.Millisecond | ||
| stressTestMaxDelay = 100 * time.Millisecond | ||
| stressTestStatusInterval = 5 * time.Second | ||
| ) | ||
|
|
||
| // TestMetricsUnderStress validates metrics recording under concurrent load. | ||
| func TestMetricsUnderStress(t *testing.T) { | ||
| ctx := context.Background() | ||
| testClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) | ||
| if err := testClient.Ping(ctx).Err(); err != nil { | ||
| testClient.Close() | ||
| t.Skip("Redis not available at localhost:6379") | ||
| } | ||
| testClient.Close() | ||
|
|
||
| reader := metric.NewManualReader() | ||
| meterProvider := metric.NewMeterProvider(metric.WithReader(reader)) | ||
| defer func() { | ||
| _ = meterProvider.Shutdown(ctx) | ||
| }() | ||
|
|
||
| otel.SetMeterProvider(meterProvider) | ||
| resetObservabilityForTest() | ||
|
|
||
| otelInstance := GetObservabilityInstance() | ||
| config := NewConfig(). | ||
| WithEnabled(true). | ||
| WithMeterProvider(meterProvider). | ||
| WithMetricGroups(MetricGroupAll) | ||
|
|
||
| if err := otelInstance.Init(config); err != nil { | ||
| t.Fatalf("Failed to initialize OTel: %v", err) | ||
| } | ||
| defer otelInstance.Shutdown() | ||
|
|
||
| rdb := redis.NewClient(&redis.Options{ | ||
| Addr: "localhost:6379", | ||
| PoolSize: stressTestConcurrency, | ||
| MinIdleConns: 10, | ||
| }) | ||
| defer rdb.Close() | ||
|
|
||
| var opsCompleted atomic.Int64 | ||
| var opsErrors atomic.Int64 | ||
| startTime := time.Now() | ||
| deadline := startTime.Add(stressTestDuration) | ||
|
|
||
| statusTicker := time.NewTicker(stressTestStatusInterval) | ||
| defer statusTicker.Stop() | ||
| done := make(chan struct{}) | ||
|
|
||
| go func() { | ||
| for { | ||
| select { | ||
| case <-statusTicker.C: | ||
| elapsed := time.Since(startTime).Seconds() | ||
| ops := opsCompleted.Load() | ||
| t.Logf("[%.0fs] %d ops, %.1f ops/sec", elapsed, ops, float64(ops)/elapsed) | ||
| case <-done: | ||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| var wg sync.WaitGroup | ||
| for i := 0; i < stressTestConcurrency; i++ { | ||
| wg.Add(1) | ||
| go func(workerID int) { | ||
| defer wg.Done() | ||
| for time.Now().Before(deadline) { | ||
| key := fmt.Sprintf("stress_test_key_%d_%d", workerID, rand.Int63()) | ||
| value := fmt.Sprintf("value_%d", time.Now().UnixNano()) | ||
|
|
||
| if err := rdb.Set(ctx, key, value, time.Minute).Err(); err != nil { | ||
| opsErrors.Add(1) | ||
| } else { | ||
| opsCompleted.Add(1) | ||
| } | ||
|
|
||
| if _, err := rdb.Get(ctx, key).Result(); err != nil && err != redis.Nil { | ||
| opsErrors.Add(1) | ||
| } else { | ||
| opsCompleted.Add(1) | ||
| } | ||
|
|
||
| delay := stressTestMinDelay + time.Duration(rand.Int63n(int64(stressTestMaxDelay-stressTestMinDelay))) | ||
| time.Sleep(delay) | ||
| } | ||
| }(i) | ||
| } | ||
|
|
||
| wg.Wait() | ||
| close(done) | ||
|
|
||
| totalOps := opsCompleted.Load() | ||
| totalErrors := opsErrors.Load() | ||
| elapsed := time.Since(startTime) | ||
| t.Logf("Completed: %d ops in %v (%.1f ops/sec), %d errors", | ||
| totalOps, elapsed.Round(time.Second), float64(totalOps)/elapsed.Seconds(), totalErrors) | ||
|
|
||
| var rm metricdata.ResourceMetrics | ||
| if err := reader.Collect(ctx, &rm); err != nil { | ||
| t.Fatalf("Failed to collect metrics: %v", err) | ||
| } | ||
|
|
||
| validateMetrics(t, rm) | ||
| } | ||
|
|
||
| func validateMetrics(t *testing.T, rm metricdata.ResourceMetrics) { | ||
| metricsFound := make(map[string]bool) | ||
| for _, sm := range rm.ScopeMetrics { | ||
| for _, m := range sm.Metrics { | ||
| metricsFound[m.Name] = true | ||
| } | ||
| } | ||
|
|
||
| required := []string{ | ||
| MetricConnectionCount, | ||
| MetricConnectionCreateTime, | ||
| MetricOperationDuration, | ||
| } | ||
|
|
||
| for _, name := range required { | ||
| if !metricsFound[name] { | ||
| t.Errorf("Required metric not found: %s", name) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func resetObservabilityForTest() { | ||
| observabilityInstanceOnce = sync.Once{} | ||
| observabilityInstance = nil | ||
| } | ||
|
|
||
| // TestTracingAndMetricsCompatibility verifies that redisotel (tracing) and | ||
| // redisotel-native (metrics) can be used together without conflicts. | ||
| // Tracing uses AddHook (per-client), metrics uses SetOTelRecorder (global). | ||
| func TestTracingAndMetricsCompatibility(t *testing.T) { | ||
| ctx := context.Background() | ||
| testClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"}) | ||
| if err := testClient.Ping(ctx).Err(); err != nil { | ||
| testClient.Close() | ||
| t.Skip("Redis not available at localhost:6379") | ||
| } | ||
| testClient.Close() | ||
|
|
||
| reader := metric.NewManualReader() | ||
| meterProvider := metric.NewMeterProvider(metric.WithReader(reader)) | ||
| defer meterProvider.Shutdown(ctx) | ||
|
|
||
| otel.SetMeterProvider(meterProvider) | ||
| resetObservabilityForTest() | ||
|
|
||
| otelInstance := GetObservabilityInstance() | ||
| config := NewConfig(). | ||
| WithEnabled(true). | ||
| WithMeterProvider(meterProvider). | ||
| WithMetricGroups(MetricGroupAll) | ||
|
|
||
| if err := otelInstance.Init(config); err != nil { | ||
| t.Fatalf("Failed to initialize OTel metrics: %v", err) | ||
| } | ||
| defer otelInstance.Shutdown() | ||
|
|
||
| rdb := redis.NewClient(&redis.Options{ | ||
| Addr: "localhost:6379", | ||
| PoolSize: 5, | ||
| }) | ||
| defer rdb.Close() | ||
|
|
||
| // In production, also call: redisotel.InstrumentTracing(rdb) | ||
|
|
||
| for i := 0; i < 10; i++ { | ||
| key := fmt.Sprintf("compat-test-%d", i) | ||
| if err := rdb.Set(ctx, key, "value", time.Minute).Err(); err != nil { | ||
| t.Fatalf("SET failed: %v", err) | ||
| } | ||
| if _, err := rdb.Get(ctx, key).Result(); err != nil { | ||
| t.Fatalf("GET failed: %v", err) | ||
| } | ||
| } | ||
|
|
||
| var rm metricdata.ResourceMetrics | ||
| if err := reader.Collect(ctx, &rm); err != nil { | ||
| t.Fatalf("Failed to collect metrics: %v", err) | ||
| } | ||
|
|
||
| found := false | ||
| for _, sm := range rm.ScopeMetrics { | ||
| for _, m := range sm.Metrics { | ||
| if m.Name == MetricOperationDuration { | ||
| found = true | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if !found { | ||
| t.Error("Expected to find db.client.operation.duration metric") | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.