diff --git a/balancer/rls/cache_test.go b/balancer/rls/cache_test.go index 4c064e971ac6..52239deb77d6 100644 --- a/balancer/rls/cache_test.go +++ b/balancer/rls/cache_test.go @@ -242,3 +242,61 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) { t.Fatalf("unexpected diff in backoffState for cache entry after dataCache.resetBackoffState(): %s", diff) } } + +func (s) TestDataCache_Metrics(t *testing.T) { + cacheEntriesMetricsTests := []*cacheEntry{ + {size: 1}, + {size: 2}, + {size: 3}, + {size: 4}, + {size: 5}, + } + tmr := stats.NewTestMetricsRecorder() + dc := newDataCache(50, nil, tmr, "") + + dc.updateRLSServerTarget("rls-server-target") + for i, k := range cacheKeys { + dc.addEntry(k, cacheEntriesMetricsTests[i]) + } + + const cacheEntriesKey = "grpc.lb.rls.cache_entries" + const cacheSizeKey = "grpc.lb.rls.cache_size" + // 5 total entries which add up to 15 size, so should record that. + if got, _ := tmr.Metric(cacheEntriesKey); got != 5 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 5) + } + if got, _ := tmr.Metric(cacheSizeKey); got != 15 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 15) + } + + // Resize down the cache to 2 entries (deterministic as based of LRU). + dc.resize(9) + if got, _ := tmr.Metric(cacheEntriesKey); got != 2 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2) + } + if got, _ := tmr.Metric(cacheSizeKey); got != 9 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 9) + } + + // Update an entry to have size 6. This should reflect in the size metrics, + // which will increase by 1 to 11, while the number of cache entries should + // stay same. This write is deterministic and writes to the last one. + dc.updateEntrySize(cacheEntriesMetricsTests[4], 6) + + if got, _ := tmr.Metric(cacheEntriesKey); got != 2 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2) + } + if got, _ := tmr.Metric(cacheSizeKey); got != 10 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 10) + } + + // Delete this scaled up cache key. This should scale down the cache to 1 + // entries, and remove 6 size so cache size should be 4. + dc.deleteAndCleanup(cacheKeys[4], cacheEntriesMetricsTests[4]) + if got, _ := tmr.Metric(cacheEntriesKey); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 1) + } + if got, _ := tmr.Metric(cacheSizeKey); got != 4 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 4) + } +} diff --git a/balancer/rls/picker_test.go b/balancer/rls/picker_test.go index d9ce0c2432cb..7c69f2c0fc89 100644 --- a/balancer/rls/picker_test.go +++ b/balancer/rls/picker_test.go @@ -26,11 +26,13 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/stubserver" rlstest "google.golang.org/grpc/internal/testutils/rls" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" @@ -246,6 +248,133 @@ func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) { } } +// Test_RLSDefaultTargetPicksMetric tests the default target picks metric. It +// configures an RLS Balancer which specifies to route to the default target in +// the RLS Configuration, and makes an RPC on a Channel containing this RLS +// Balancer. This test then asserts a default target picks metric is emitted, +// and target pick or failed pick metric is not emitted. +func (s) Test_RLSDefaultTargetPicksMetric(t *testing.T) { + // Start an RLS server and set the throttler to always throttle requests. + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) + overrideAdaptiveThrottler(t, alwaysThrottlingThrottler()) + + // Build RLS service config with a default target. + rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address) + defBackendCh, defBackendAddress := startBackend(t) + rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress + + // Register a manual resolver and push the RLS service config through it. + r := startManualResolverWithConfig(t, rlsConfig) + + tmr := stats.NewTestMetricsRecorder() + cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Make an RPC and ensure it gets routed to the default target. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh) + + if got, _ := tmr.Metric("grpc.lb.rls.default_target_picks"); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.default_target_picks", got, 1) + } + if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok { + t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks") + } + if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok { + t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks") + } +} + +// Test_RLSTargetPicksMetric tests the target picks metric. It configures an RLS +// Balancer which specifies to route to a target through a RouteLookupResponse, +// and makes an RPC on a Channel containing this RLS Balancer. This test then +// asserts a target picks metric is emitted, and default target pick or failed +// pick metric is not emitted. +func (s) Test_RLSTargetPicksMetric(t *testing.T) { + // Start an RLS server and set the throttler to never throttle requests. + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) + overrideAdaptiveThrottler(t, neverThrottlingThrottler()) + + // Build the RLS config without a default target. + rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address) + + // Start a test backend, and setup the fake RLS server to return this as a + // target in the RLS response. + testBackendCh, testBackendAddress := startBackend(t) + rlsServer.SetResponseCallback(func(context.Context, *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} + }) + + // Register a manual resolver and push the RLS service config through it. + r := startManualResolverWithConfig(t, rlsConfig) + + tmr := stats.NewTestMetricsRecorder() + // Dial the backend. + cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Make an RPC and ensure it gets routed to the test backend. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh) + if got, _ := tmr.Metric("grpc.lb.rls.target_picks"); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.target_picks", got, 1) + } + if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok { + t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks") + } + if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok { + t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks") + } +} + +// Test_RLSFailedPicksMetric tests the failed picks metric. It configures an RLS +// Balancer to fail a pick with unavailable, and makes an RPC on a Channel +// containing this RLS Balancer. This test then asserts a failed picks metric is +// emitted, and default target pick or target pick metric is not emitted. +func (s) Test_RLSFailedPicksMetric(t *testing.T) { + // Start an RLS server and set the throttler to never throttle requests. + rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil) + overrideAdaptiveThrottler(t, neverThrottlingThrottler()) + + // Build an RLS config without a default target. + rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address) + + // Register a manual resolver and push the RLS service config through it. + r := startManualResolverWithConfig(t, rlsConfig) + + tmr := stats.NewTestMetricsRecorder() + // Dial the backend. + cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Make an RPC and expect it to fail with deadline exceeded error. We use a + // smaller timeout to ensure that the test doesn't run very long. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key")) + + if got, _ := tmr.Metric("grpc.lb.rls.failed_picks"); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.failed_picks", got, 1) + } + if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok { + t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks") + } + if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok { + t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks") + } +} + // Test verifies the scenario where there is a matching entry in the data cache // which is valid and there is no pending request. The pick is expected to be // delegated to the child policy. @@ -256,7 +385,6 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) { // Build the RLS config without a default target. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address) - // Start a test backend, and setup the fake RLS server to return this as a // target in the RLS response. testBackendCh, testBackendAddress := startBackend(t) @@ -881,3 +1009,41 @@ func TestIsFullMethodNameValid(t *testing.T) { }) } } + +// Tests the conversion of the child pickers error to the pick result attribute. +func (s) TestChildPickResultError(t *testing.T) { + tests := []struct { + name string + err error + want string + }{ + { + name: "nil", + err: nil, + want: "complete", + }, + { + name: "errNoSubConnAvailable", + err: balancer.ErrNoSubConnAvailable, + want: "queue", + }, + { + name: "status error", + err: status.Error(codes.Unimplemented, "unimplemented"), + want: "drop", + }, + { + name: "other error", + err: errors.New("some error"), + want: "fail", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if got := errToPickResult(test.err); got != test.want { + t.Fatalf("errToPickResult(%q) = %v, want %v", test.err, got, test.want) + } + }) + } +} diff --git a/balancer/weightedroundrobin/balancer_test.go b/balancer/weightedroundrobin/balancer_test.go index 9df66a8d5f17..68d2d5a5c5c8 100644 --- a/balancer/weightedroundrobin/balancer_test.go +++ b/balancer/weightedroundrobin/balancer_test.go @@ -87,7 +87,7 @@ var ( OOBReportingPeriod: stringp("0.005s"), BlackoutPeriod: stringp("0s"), WeightExpirationPeriod: stringp("60s"), - WeightUpdatePeriod: stringp(".050s"), + WeightUpdatePeriod: stringp("30s"), ErrorUtilizationPenalty: float64p(0), } ) @@ -224,8 +224,8 @@ func (s) TestWRRMetricsBasic(t *testing.T) { srv := startServer(t, reportCall) sc := svcConfig(t, testMetricsConfig) - mr := stats.NewTestMetricsRecorder(t) - if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil { + tmr := stats.NewTestMetricsRecorder() + if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(tmr)); err != nil { t.Fatalf("Error starting client: %v", err) } srv.callMetrics.SetQPS(float64(1)) @@ -234,12 +234,20 @@ func (s) TestWRRMetricsBasic(t *testing.T) { t.Fatalf("Error from EmptyCall: %v", err) } - mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn. - mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted). - mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1) + if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1) + } + if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != 0 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0) + } + if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, 1) + } // Unusable, so no endpoint weight. Due to only one SubConn, this will never // update the weight. Thus, this will stay 0. - mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0) + if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != 0 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0) + } } // Tests two addresses with ORCA reporting disabled (should fall back to pure diff --git a/balancer/weightedroundrobin/metrics_test.go b/balancer/weightedroundrobin/metrics_test.go index 0ff43dc26e52..9794a65e044f 100644 --- a/balancer/weightedroundrobin/metrics_test.go +++ b/balancer/weightedroundrobin/metrics_test.go @@ -108,7 +108,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - tmr := stats.NewTestMetricsRecorder(t) + tmr := stats.NewTestMetricsRecorder() wsc := &weightedSubConn{ metricsRecorder: tmr, weightVal: 3, @@ -117,9 +117,15 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { } wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant) - tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant) + if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant) + } + if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != test.endpointWeightNotYetUsableWant { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, test.endpointWeightNotYetUsableWant) + } + if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant) + } }) } @@ -130,7 +136,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) { // with no weights. Both of these should emit a count metric for round robin // fallback. func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) { - tmr := stats.NewTestMetricsRecorder(t) + tmr := stats.NewTestMetricsRecorder() wsc := &weightedSubConn{ metricsRecorder: tmr, weightVal: 0, @@ -147,7 +153,9 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) { // There is only one SubConn, so no matter if the SubConn has a weight or // not will fallback to round robin. p.regenerateScheduler() - tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) + if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1) + } tmr.ClearMetrics() // With two SubConns, if neither of them have weights, it will also fallback @@ -159,5 +167,7 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) { } p.subConns = append(p.subConns, wsc2) p.regenerateScheduler() - tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) + if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 { + t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1) + } } diff --git a/internal/stats/metrics_recorder_list_test.go b/internal/stats/metrics_recorder_list_test.go index 15139f128158..1bf09bd10b68 100644 --- a/internal/stats/metrics_recorder_list_test.go +++ b/internal/stats/metrics_recorder_list_test.go @@ -146,8 +146,8 @@ func (s) TestMetricsRecorderList(t *testing.T) { // Create two stats.Handlers which also implement MetricsRecorder, configure // one as a global dial option and one as a local dial option. - mr1 := stats.NewTestMetricsRecorder(t) - mr2 := stats.NewTestMetricsRecorder(t) + mr1 := stats.NewTestMetricsRecorder() + mr2 := stats.NewTestMetricsRecorder() defer internal.ClearGlobalDialOptions() internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(grpc.WithStatsHandler(mr1)) @@ -172,8 +172,12 @@ func (s) TestMetricsRecorderList(t *testing.T) { LabelKeys: []string{"int counter label", "int counter optional label"}, LabelVals: []string{"int counter label val", "int counter optional label val"}, } - mr1.WaitForInt64Count(ctx, mdWant) - mr2.WaitForInt64Count(ctx, mdWant) + if err := mr1.WaitForInt64Count(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } + if err := mr2.WaitForInt64Count(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } mdWant = stats.MetricsData{ Handle: floatCountHandle.Descriptor(), @@ -181,8 +185,12 @@ func (s) TestMetricsRecorderList(t *testing.T) { LabelKeys: []string{"float counter label", "float counter optional label"}, LabelVals: []string{"float counter label val", "float counter optional label val"}, } - mr1.WaitForFloat64Count(ctx, mdWant) - mr2.WaitForFloat64Count(ctx, mdWant) + if err := mr1.WaitForFloat64Count(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } + if err := mr2.WaitForFloat64Count(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } mdWant = stats.MetricsData{ Handle: intHistoHandle.Descriptor(), @@ -190,8 +198,12 @@ func (s) TestMetricsRecorderList(t *testing.T) { LabelKeys: []string{"int histo label", "int histo optional label"}, LabelVals: []string{"int histo label val", "int histo optional label val"}, } - mr1.WaitForInt64Histo(ctx, mdWant) - mr2.WaitForInt64Histo(ctx, mdWant) + if err := mr1.WaitForInt64Histo(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } + if err := mr2.WaitForInt64Histo(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } mdWant = stats.MetricsData{ Handle: floatHistoHandle.Descriptor(), @@ -199,16 +211,24 @@ func (s) TestMetricsRecorderList(t *testing.T) { LabelKeys: []string{"float histo label", "float histo optional label"}, LabelVals: []string{"float histo label val", "float histo optional label val"}, } - mr1.WaitForFloat64Histo(ctx, mdWant) - mr2.WaitForFloat64Histo(ctx, mdWant) + if err := mr1.WaitForFloat64Histo(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } + if err := mr2.WaitForFloat64Histo(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } mdWant = stats.MetricsData{ Handle: intGaugeHandle.Descriptor(), IntIncr: 5, LabelKeys: []string{"int gauge label", "int gauge optional label"}, LabelVals: []string{"int gauge label val", "int gauge optional label val"}, } - mr1.WaitForInt64Gauge(ctx, mdWant) - mr2.WaitForInt64Gauge(ctx, mdWant) + if err := mr1.WaitForInt64Gauge(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } + if err := mr2.WaitForInt64Gauge(ctx, mdWant); err != nil { + t.Fatal(err.Error()) + } } // TestMetricRecorderListPanic tests that the metrics recorder list panics if diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index e8805dd8e65a..72a20c1cbf44 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -21,9 +21,8 @@ package stats import ( "context" + "fmt" "sync" - "testing" - "time" "github.com/google/go-cmp/cmp" estats "google.golang.org/grpc/experimental/stats" @@ -36,24 +35,21 @@ import ( // have taken place. It also persists metrics data keyed on the metrics // descriptor. type TestMetricsRecorder struct { - t *testing.T - intCountCh *testutils.Channel floatCountCh *testutils.Channel intHistoCh *testutils.Channel floatHistoCh *testutils.Channel intGaugeCh *testutils.Channel - // The most recent update for each metric name. - mu sync.Mutex + // mu protects data. + mu sync.Mutex + // data is the most recent update for each metric name. data map[estats.Metric]float64 } -// NewTestMetricsRecorder returns new TestMetricsRecorder -func NewTestMetricsRecorder(t *testing.T) *TestMetricsRecorder { - tmr := &TestMetricsRecorder{ - t: t, - +// NewTestMetricsRecorder returns a new TestMetricsRecorder. +func NewTestMetricsRecorder() *TestMetricsRecorder { + return &TestMetricsRecorder{ intCountCh: testutils.NewChannelWithSize(10), floatCountCh: testutils.NewChannelWithSize(10), intHistoCh: testutils.NewChannelWithSize(10), @@ -62,36 +58,16 @@ func NewTestMetricsRecorder(t *testing.T) *TestMetricsRecorder { data: make(map[estats.Metric]float64), } - - return tmr } -// AssertDataForMetric asserts data is present for metric. The zero value in the -// check is equivalent to unset. -func (r *TestMetricsRecorder) AssertDataForMetric(metricName string, wantVal float64) { - r.mu.Lock() - defer r.mu.Unlock() - if r.data[estats.Metric(metricName)] != wantVal { - r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", metricName, r.data[estats.Metric(metricName)], wantVal) - } +// Metric returns the most recent data for a metric, and whether this recorder +// has received data for a metric. +func (r *TestMetricsRecorder) Metric(name string) (float64, bool) { + data, ok := r.data[estats.Metric(name)] + return data, ok } -// PollForDataForMetric polls the metric data for the want. Fails if context -// provided expires before data for metric is found. -func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricName string, wantVal float64) { - for ; ctx.Err() == nil; <-time.After(time.Millisecond) { - r.mu.Lock() - if r.data[estats.Metric(metricName)] == wantVal { - r.mu.Unlock() - return - } - r.mu.Unlock() - } - r.t.Fatalf("Timeout waiting for data %v for metric %v", wantVal, metricName) -} - -// ClearMetrics clears the metrics data stores of the test metrics recorder by -// setting all the data to 0. +// ClearMetrics clears the metrics data store of the test metrics recorder. func (r *TestMetricsRecorder) ClearMetrics() { r.mu.Lock() defer r.mu.Unlock() @@ -111,23 +87,25 @@ type MetricsData struct { LabelVals []string } -// WaitForInt64Count waits for an int64 count metric to be recorded and -// verifies that the recorded metrics data matches the expected -// metricsDataWant. -func (r *TestMetricsRecorder) WaitForInt64Count(ctx context.Context, metricsDataWant MetricsData) { +// WaitForInt64Count waits for an int64 count metric to be recorded and verifies +// that the recorded metrics data matches the expected metricsDataWant. Returns +// an error if failed to wait or received wrong data. +func (r *TestMetricsRecorder) WaitForInt64Count(ctx context.Context, metricsDataWant MetricsData) error { got, err := r.intCountCh.Receive(ctx) if err != nil { - r.t.Fatalf("timeout waiting for int64Count") + return fmt.Errorf("timeout waiting for int64Count") } metricsDataGot := got.(MetricsData) if diff := cmp.Diff(metricsDataGot, metricsDataWant); diff != "" { - r.t.Fatalf("int64count metricsData received unexpected value (-got, +want): %v", diff) + return fmt.Errorf("int64count metricsData received unexpected value (-got, +want): %v", diff) } + return nil } -// RecordInt64Count sends the metrics data to the intCountCh channel -// and updates the internal data map with the recorded value. +// RecordInt64Count sends the metrics data to the intCountCh channel and updates +// the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) { + r.intCountCh.ReceiveOrFail() r.intCountCh.Send(MetricsData{ Handle: handle.Descriptor(), IntIncr: incr, @@ -141,22 +119,24 @@ func (r *TestMetricsRecorder) RecordInt64Count(handle *estats.Int64CountHandle, } // WaitForFloat64Count waits for a float count metric to be recorded and -// verifies that the recorded metrics data matches the expected -// metricsDataWant. -func (r *TestMetricsRecorder) WaitForFloat64Count(ctx context.Context, metricsDataWant MetricsData) { +// verifies that the recorded metrics data matches the expected metricsDataWant. +// Returns an error if failed to wait or received wrong data. +func (r *TestMetricsRecorder) WaitForFloat64Count(ctx context.Context, metricsDataWant MetricsData) error { got, err := r.floatCountCh.Receive(ctx) if err != nil { - r.t.Fatalf("timeout waiting for float64Count") + return fmt.Errorf("timeout waiting for float64Count") } metricsDataGot := got.(MetricsData) if diff := cmp.Diff(metricsDataGot, metricsDataWant); diff != "" { - r.t.Fatalf("float64count metricsData received unexpected value (-got, +want): %v", diff) + return fmt.Errorf("float64count metricsData received unexpected value (-got, +want): %v", diff) } + return nil } -// RecordFloat64Count sends the metrics data to the floatCountCh channel -// and updates the internal data map with the recorded value. +// RecordFloat64Count sends the metrics data to the floatCountCh channel and +// updates the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) { + r.floatCountCh.ReceiveOrFail() r.floatCountCh.Send(MetricsData{ Handle: handle.Descriptor(), FloatIncr: incr, @@ -170,21 +150,24 @@ func (r *TestMetricsRecorder) RecordFloat64Count(handle *estats.Float64CountHand } // WaitForInt64Histo waits for an int histo metric to be recorded and verifies -// that the recorded metrics data matches the expected metricsDataWant. -func (r *TestMetricsRecorder) WaitForInt64Histo(ctx context.Context, metricsDataWant MetricsData) { +// that the recorded metrics data matches the expected metricsDataWant. Returns +// an error if failed to wait or received wrong data. +func (r *TestMetricsRecorder) WaitForInt64Histo(ctx context.Context, metricsDataWant MetricsData) error { got, err := r.intHistoCh.Receive(ctx) if err != nil { - r.t.Fatalf("timeout waiting for int64Histo") + return fmt.Errorf("timeout waiting for int64Histo") } metricsDataGot := got.(MetricsData) if diff := cmp.Diff(metricsDataGot, metricsDataWant); diff != "" { - r.t.Fatalf("int64Histo metricsData received unexpected value (-got, +want): %v", diff) + return fmt.Errorf("int64Histo metricsData received unexpected value (-got, +want): %v", diff) } + return nil } -// RecordInt64Histo sends the metrics data to the intHistoCh channel -// and updates the internal data map with the recorded value. +// RecordInt64Histo sends the metrics data to the intHistoCh channel and updates +// the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) { + r.intHistoCh.ReceiveOrFail() r.intHistoCh.Send(MetricsData{ Handle: handle.Descriptor(), IntIncr: incr, @@ -198,22 +181,24 @@ func (r *TestMetricsRecorder) RecordInt64Histo(handle *estats.Int64HistoHandle, } // WaitForFloat64Histo waits for a float histo metric to be recorded and -// verifies that the recorded metrics data matches the expected -// metricsDataWant. -func (r *TestMetricsRecorder) WaitForFloat64Histo(ctx context.Context, metricsDataWant MetricsData) { +// verifies that the recorded metrics data matches the expected metricsDataWant. +// Returns an error if failed to wait or received wrong data. +func (r *TestMetricsRecorder) WaitForFloat64Histo(ctx context.Context, metricsDataWant MetricsData) error { got, err := r.floatHistoCh.Receive(ctx) if err != nil { - r.t.Fatalf("timeout waiting for float64Histo") + return fmt.Errorf("timeout waiting for float64Histo") } metricsDataGot := got.(MetricsData) if diff := cmp.Diff(metricsDataGot, metricsDataWant); diff != "" { - r.t.Fatalf("float64Histo metricsData received unexpected value (-got, +want): %v", diff) + return fmt.Errorf("float64Histo metricsData received unexpected value (-got, +want): %v", diff) } + return nil } -// RecordFloat64Histo sends the metrics data to the floatHistoCh channel -// and updates the internal data map with the recorded value. +// RecordFloat64Histo sends the metrics data to the floatHistoCh channel and +// updates the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) { + r.floatHistoCh.ReceiveOrFail() r.floatHistoCh.Send(MetricsData{ Handle: handle.Descriptor(), FloatIncr: incr, @@ -226,23 +211,24 @@ func (r *TestMetricsRecorder) RecordFloat64Histo(handle *estats.Float64HistoHand r.data[handle.Name] = incr } -// WaitForInt64Gauge waits for a int gauge metric to be recorded and -// verifies that the recorded metrics data matches the expected -// metricsDataWant. -func (r *TestMetricsRecorder) WaitForInt64Gauge(ctx context.Context, metricsDataWant MetricsData) { +// WaitForInt64Gauge waits for a int gauge metric to be recorded and verifies +// that the recorded metrics data matches the expected metricsDataWant. +func (r *TestMetricsRecorder) WaitForInt64Gauge(ctx context.Context, metricsDataWant MetricsData) error { got, err := r.intGaugeCh.Receive(ctx) if err != nil { - r.t.Fatalf("timeout waiting for int64Gauge") + return fmt.Errorf("timeout waiting for int64Gauge") } metricsDataGot := got.(MetricsData) if diff := cmp.Diff(metricsDataGot, metricsDataWant); diff != "" { - r.t.Fatalf("int64Gauge metricsData received unexpected value (-got, +want): %v", diff) + return fmt.Errorf("int64Gauge metricsData received unexpected value (-got, +want): %v", diff) } + return nil } -// RecordInt64Gauge sends the metrics data to the intGaugeCh channel -// and updates the internal data map with the recorded value. +// RecordInt64Gauge sends the metrics data to the intGaugeCh channel and updates +// the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) { + r.intGaugeCh.ReceiveOrFail() r.intGaugeCh.Send(MetricsData{ Handle: handle.Descriptor(), IntIncr: incr,