Skip to content

Commit 218811e

Browse files
authored
balancer/rls: Add picker and cache unit tests for RLS Metrics (#7614)
1 parent a9ff62d commit 218811e

File tree

6 files changed

+348
-100
lines changed

6 files changed

+348
-100
lines changed

balancer/rls/cache_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,3 +242,61 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
242242
t.Fatalf("unexpected diff in backoffState for cache entry after dataCache.resetBackoffState(): %s", diff)
243243
}
244244
}
245+
246+
func (s) TestDataCache_Metrics(t *testing.T) {
247+
cacheEntriesMetricsTests := []*cacheEntry{
248+
{size: 1},
249+
{size: 2},
250+
{size: 3},
251+
{size: 4},
252+
{size: 5},
253+
}
254+
tmr := stats.NewTestMetricsRecorder()
255+
dc := newDataCache(50, nil, tmr, "")
256+
257+
dc.updateRLSServerTarget("rls-server-target")
258+
for i, k := range cacheKeys {
259+
dc.addEntry(k, cacheEntriesMetricsTests[i])
260+
}
261+
262+
const cacheEntriesKey = "grpc.lb.rls.cache_entries"
263+
const cacheSizeKey = "grpc.lb.rls.cache_size"
264+
// 5 total entries which add up to 15 size, so should record that.
265+
if got, _ := tmr.Metric(cacheEntriesKey); got != 5 {
266+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 5)
267+
}
268+
if got, _ := tmr.Metric(cacheSizeKey); got != 15 {
269+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 15)
270+
}
271+
272+
// Resize down the cache to 2 entries (deterministic as based of LRU).
273+
dc.resize(9)
274+
if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
275+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
276+
}
277+
if got, _ := tmr.Metric(cacheSizeKey); got != 9 {
278+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 9)
279+
}
280+
281+
// Update an entry to have size 6. This should reflect in the size metrics,
282+
// which will increase by 1 to 11, while the number of cache entries should
283+
// stay same. This write is deterministic and writes to the last one.
284+
dc.updateEntrySize(cacheEntriesMetricsTests[4], 6)
285+
286+
if got, _ := tmr.Metric(cacheEntriesKey); got != 2 {
287+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 2)
288+
}
289+
if got, _ := tmr.Metric(cacheSizeKey); got != 10 {
290+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 10)
291+
}
292+
293+
// Delete this scaled up cache key. This should scale down the cache to 1
294+
// entries, and remove 6 size so cache size should be 4.
295+
dc.deleteAndCleanup(cacheKeys[4], cacheEntriesMetricsTests[4])
296+
if got, _ := tmr.Metric(cacheEntriesKey); got != 1 {
297+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheEntriesKey, got, 1)
298+
}
299+
if got, _ := tmr.Metric(cacheSizeKey); got != 4 {
300+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", cacheSizeKey, got, 4)
301+
}
302+
}

balancer/rls/picker_test.go

Lines changed: 167 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ import (
2626
"time"
2727

2828
"google.golang.org/grpc"
29+
"google.golang.org/grpc/balancer"
2930
"google.golang.org/grpc/codes"
3031
"google.golang.org/grpc/credentials/insecure"
3132
"google.golang.org/grpc/internal/grpcsync"
3233
"google.golang.org/grpc/internal/stubserver"
3334
rlstest "google.golang.org/grpc/internal/testutils/rls"
35+
"google.golang.org/grpc/internal/testutils/stats"
3436
"google.golang.org/grpc/metadata"
3537
"google.golang.org/grpc/status"
3638
"google.golang.org/protobuf/types/known/durationpb"
@@ -246,6 +248,133 @@ func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) {
246248
}
247249
}
248250

251+
// Test_RLSDefaultTargetPicksMetric tests the default target picks metric. It
252+
// configures an RLS Balancer which specifies to route to the default target in
253+
// the RLS Configuration, and makes an RPC on a Channel containing this RLS
254+
// Balancer. This test then asserts a default target picks metric is emitted,
255+
// and target pick or failed pick metric is not emitted.
256+
func (s) Test_RLSDefaultTargetPicksMetric(t *testing.T) {
257+
// Start an RLS server and set the throttler to always throttle requests.
258+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
259+
overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
260+
261+
// Build RLS service config with a default target.
262+
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
263+
defBackendCh, defBackendAddress := startBackend(t)
264+
rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
265+
266+
// Register a manual resolver and push the RLS service config through it.
267+
r := startManualResolverWithConfig(t, rlsConfig)
268+
269+
tmr := stats.NewTestMetricsRecorder()
270+
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
271+
if err != nil {
272+
t.Fatalf("grpc.Dial() failed: %v", err)
273+
}
274+
defer cc.Close()
275+
276+
// Make an RPC and ensure it gets routed to the default target.
277+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
278+
defer cancel()
279+
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
280+
281+
if got, _ := tmr.Metric("grpc.lb.rls.default_target_picks"); got != 1 {
282+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.default_target_picks", got, 1)
283+
}
284+
if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok {
285+
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks")
286+
}
287+
if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok {
288+
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks")
289+
}
290+
}
291+
292+
// Test_RLSTargetPicksMetric tests the target picks metric. It configures an RLS
293+
// Balancer which specifies to route to a target through a RouteLookupResponse,
294+
// and makes an RPC on a Channel containing this RLS Balancer. This test then
295+
// asserts a target picks metric is emitted, and default target pick or failed
296+
// pick metric is not emitted.
297+
func (s) Test_RLSTargetPicksMetric(t *testing.T) {
298+
// Start an RLS server and set the throttler to never throttle requests.
299+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
300+
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
301+
302+
// Build the RLS config without a default target.
303+
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
304+
305+
// Start a test backend, and setup the fake RLS server to return this as a
306+
// target in the RLS response.
307+
testBackendCh, testBackendAddress := startBackend(t)
308+
rlsServer.SetResponseCallback(func(context.Context, *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
309+
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
310+
})
311+
312+
// Register a manual resolver and push the RLS service config through it.
313+
r := startManualResolverWithConfig(t, rlsConfig)
314+
315+
tmr := stats.NewTestMetricsRecorder()
316+
// Dial the backend.
317+
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
318+
if err != nil {
319+
t.Fatalf("grpc.Dial() failed: %v", err)
320+
}
321+
defer cc.Close()
322+
323+
// Make an RPC and ensure it gets routed to the test backend.
324+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
325+
defer cancel()
326+
makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
327+
if got, _ := tmr.Metric("grpc.lb.rls.target_picks"); got != 1 {
328+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.target_picks", got, 1)
329+
}
330+
if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok {
331+
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks")
332+
}
333+
if _, ok := tmr.Metric("grpc.lb.rls.failed_picks"); ok {
334+
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.failed_picks")
335+
}
336+
}
337+
338+
// Test_RLSFailedPicksMetric tests the failed picks metric. It configures an RLS
339+
// Balancer to fail a pick with unavailable, and makes an RPC on a Channel
340+
// containing this RLS Balancer. This test then asserts a failed picks metric is
341+
// emitted, and default target pick or target pick metric is not emitted.
342+
func (s) Test_RLSFailedPicksMetric(t *testing.T) {
343+
// Start an RLS server and set the throttler to never throttle requests.
344+
rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
345+
overrideAdaptiveThrottler(t, neverThrottlingThrottler())
346+
347+
// Build an RLS config without a default target.
348+
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
349+
350+
// Register a manual resolver and push the RLS service config through it.
351+
r := startManualResolverWithConfig(t, rlsConfig)
352+
353+
tmr := stats.NewTestMetricsRecorder()
354+
// Dial the backend.
355+
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
356+
if err != nil {
357+
t.Fatalf("grpc.Dial() failed: %v", err)
358+
}
359+
defer cc.Close()
360+
361+
// Make an RPC and expect it to fail with deadline exceeded error. We use a
362+
// smaller timeout to ensure that the test doesn't run very long.
363+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
364+
defer cancel()
365+
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key"))
366+
367+
if got, _ := tmr.Metric("grpc.lb.rls.failed_picks"); got != 1 {
368+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.rls.failed_picks", got, 1)
369+
}
370+
if _, ok := tmr.Metric("grpc.lb.rls.target_picks"); ok {
371+
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.target_picks")
372+
}
373+
if _, ok := tmr.Metric("grpc.lb.rls.default_target_picks"); ok {
374+
t.Fatalf("Data is present for metric %v", "grpc.lb.rls.default_target_picks")
375+
}
376+
}
377+
249378
// Test verifies the scenario where there is a matching entry in the data cache
250379
// which is valid and there is no pending request. The pick is expected to be
251380
// delegated to the child policy.
@@ -256,7 +385,6 @@ func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) {
256385

257386
// Build the RLS config without a default target.
258387
rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
259-
260388
// Start a test backend, and setup the fake RLS server to return this as a
261389
// target in the RLS response.
262390
testBackendCh, testBackendAddress := startBackend(t)
@@ -881,3 +1009,41 @@ func TestIsFullMethodNameValid(t *testing.T) {
8811009
})
8821010
}
8831011
}
1012+
1013+
// Tests the conversion of the child pickers error to the pick result attribute.
1014+
func (s) TestChildPickResultError(t *testing.T) {
1015+
tests := []struct {
1016+
name string
1017+
err error
1018+
want string
1019+
}{
1020+
{
1021+
name: "nil",
1022+
err: nil,
1023+
want: "complete",
1024+
},
1025+
{
1026+
name: "errNoSubConnAvailable",
1027+
err: balancer.ErrNoSubConnAvailable,
1028+
want: "queue",
1029+
},
1030+
{
1031+
name: "status error",
1032+
err: status.Error(codes.Unimplemented, "unimplemented"),
1033+
want: "drop",
1034+
},
1035+
{
1036+
name: "other error",
1037+
err: errors.New("some error"),
1038+
want: "fail",
1039+
},
1040+
}
1041+
1042+
for _, test := range tests {
1043+
t.Run(test.name, func(t *testing.T) {
1044+
if got := errToPickResult(test.err); got != test.want {
1045+
t.Fatalf("errToPickResult(%q) = %v, want %v", test.err, got, test.want)
1046+
}
1047+
})
1048+
}
1049+
}

balancer/weightedroundrobin/balancer_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ var (
8787
OOBReportingPeriod: stringp("0.005s"),
8888
BlackoutPeriod: stringp("0s"),
8989
WeightExpirationPeriod: stringp("60s"),
90-
WeightUpdatePeriod: stringp(".050s"),
90+
WeightUpdatePeriod: stringp("30s"),
9191
ErrorUtilizationPenalty: float64p(0),
9292
}
9393
)
@@ -224,8 +224,8 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
224224
srv := startServer(t, reportCall)
225225
sc := svcConfig(t, testMetricsConfig)
226226

227-
mr := stats.NewTestMetricsRecorder(t)
228-
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
227+
tmr := stats.NewTestMetricsRecorder()
228+
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(tmr)); err != nil {
229229
t.Fatalf("Error starting client: %v", err)
230230
}
231231
srv.callMetrics.SetQPS(float64(1))
@@ -234,12 +234,20 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
234234
t.Fatalf("Error from EmptyCall: %v", err)
235235
}
236236

237-
mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn.
238-
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted).
239-
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
237+
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
238+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
239+
}
240+
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != 0 {
241+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0)
242+
}
243+
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != 1 {
244+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, 1)
245+
}
240246
// Unusable, so no endpoint weight. Due to only one SubConn, this will never
241247
// update the weight. Thus, this will stay 0.
242-
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
248+
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != 0 {
249+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, 0)
250+
}
243251
}
244252

245253
// Tests two addresses with ORCA reporting disabled (should fall back to pure

balancer/weightedroundrobin/metrics_test.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
108108

109109
for _, test := range tests {
110110
t.Run(test.name, func(t *testing.T) {
111-
tmr := stats.NewTestMetricsRecorder(t)
111+
tmr := stats.NewTestMetricsRecorder()
112112
wsc := &weightedSubConn{
113113
metricsRecorder: tmr,
114114
weightVal: 3,
@@ -117,9 +117,15 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
117117
}
118118
wsc.weight(test.nowTime, test.weightExpirationPeriod, test.blackoutPeriod, true)
119119

120-
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", test.endpointWeightStaleWant)
121-
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", test.endpointWeightNotYetUsableWant)
122-
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", test.endpointWeightWant)
120+
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant {
121+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant)
122+
}
123+
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_not_yet_usable"); got != test.endpointWeightNotYetUsableWant {
124+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_not_yet_usable", got, test.endpointWeightNotYetUsableWant)
125+
}
126+
if got, _ := tmr.Metric("grpc.lb.wrr.endpoint_weight_stale"); got != test.endpointWeightStaleWant {
127+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.endpoint_weight_stale", got, test.endpointWeightStaleWant)
128+
}
123129
})
124130
}
125131

@@ -130,7 +136,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
130136
// with no weights. Both of these should emit a count metric for round robin
131137
// fallback.
132138
func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
133-
tmr := stats.NewTestMetricsRecorder(t)
139+
tmr := stats.NewTestMetricsRecorder()
134140
wsc := &weightedSubConn{
135141
metricsRecorder: tmr,
136142
weightVal: 0,
@@ -147,7 +153,9 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
147153
// There is only one SubConn, so no matter if the SubConn has a weight or
148154
// not will fallback to round robin.
149155
p.regenerateScheduler()
150-
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
156+
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
157+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
158+
}
151159
tmr.ClearMetrics()
152160

153161
// With two SubConns, if neither of them have weights, it will also fallback
@@ -159,5 +167,7 @@ func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
159167
}
160168
p.subConns = append(p.subConns, wsc2)
161169
p.regenerateScheduler()
162-
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
170+
if got, _ := tmr.Metric("grpc.lb.wrr.rr_fallback"); got != 1 {
171+
t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.wrr.rr_fallback", got, 1)
172+
}
163173
}

0 commit comments

Comments
 (0)