Skip to content

Commit 37e724c

Browse files
committed
Add recording point for endpoint weight not yet usable and add metrics tests
1 parent 3eb0145 commit 37e724c

23 files changed

+542
-48
lines changed

balancer/weightedroundrobin/balancer.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ func (p *picker) inc() uint32 {
415415
}
416416

417417
func (p *picker) regenerateScheduler() {
418-
s := p.newScheduler()
418+
s := p.newScheduler(true)
419419
atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s))
420420
}
421421

@@ -558,14 +558,17 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
558558
w.SubConn.Connect()
559559
case connectivity.Ready:
560560
// If we transition back to READY state, reset nonEmptySince so that we
561-
// apply the blackout period after we start receiving load data. Note
562-
// that we cannot guarantee that we will never receive lingering
563-
// callbacks for backend metric reports from the previous connection
564-
// after the new connection has been established, but they should be
565-
// masked by new backend metric reports from the new connection by the
566-
// time the blackout period ends.
561+
// apply the blackout period after we start receiving load data. Also
562+
// reset lastUpdated to trigger endpoint weight not yet usable in the
563+
// case endpoint gets asked what weight it is before receiving a new
564+
// load report. Note that we cannot guarantee that we will never receive
565+
// lingering callbacks for backend metric reports from the previous
566+
// connection after the new connection has been established, but they
567+
// should be masked by new backend metric reports from the new
568+
// connection by the time the blackout period ends.
567569
w.mu.Lock()
568570
w.nonEmptySince = time.Time{}
571+
w.lastUpdated = time.Time{}
569572
w.mu.Unlock()
570573
case connectivity.Shutdown:
571574
if w.stopORCAListener != nil {
@@ -603,6 +606,13 @@ func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackout
603606
}()
604607
}
605608

609+
// The SubConn has not received a load report (i.e. just turned READY with
610+
// no load report).
611+
if w.lastUpdated == (time.Time{}) {
612+
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
613+
return 0
614+
}
615+
606616
// If the most recent update was longer ago than the expiration period,
607617
// reset nonEmptySince so that we apply the blackout period again if we
608618
// start getting data again in the future, and return 0.

balancer/weightedroundrobin/balancer_test.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"google.golang.org/grpc/internal/grpctest"
3333
"google.golang.org/grpc/internal/stubserver"
3434
"google.golang.org/grpc/internal/testutils/roundrobin"
35+
"google.golang.org/grpc/internal/testutils/stats"
3536
"google.golang.org/grpc/orca"
3637
"google.golang.org/grpc/peer"
3738
"google.golang.org/grpc/resolver"
@@ -81,6 +82,14 @@ var (
8182
WeightUpdatePeriod: stringp(".050s"),
8283
ErrorUtilizationPenalty: float64p(0),
8384
}
85+
testMetricsConfig = iwrr.LBConfig{
86+
EnableOOBLoadReport: boolp(false),
87+
OOBReportingPeriod: stringp("0.005s"),
88+
BlackoutPeriod: stringp("0s"),
89+
WeightExpirationPeriod: stringp("60s"),
90+
WeightUpdatePeriod: stringp(".050s"),
91+
ErrorUtilizationPenalty: float64p(0),
92+
}
8493
)
8594

8695
type testServer struct {
@@ -196,6 +205,43 @@ func (s) TestBalancer_OneAddress(t *testing.T) {
196205
}
197206
}
198207

208+
// TestWRRMetricsBasic tests metrics emitted from the WRR balancer. It
209+
// configures a weighted round robin balancer as the top level balancer of a
210+
// ClientConn, and configures a fake stats handler on the ClientConn to receive
211+
// metrics. It verifies stats emitted from the Weighted Round Robin Balancer on
212+
// balancer startup case which triggers the first picker and scheduler update
213+
// before any load reports are received.
214+
//
215+
// Note that this test and others, metrics emission asssertions are a snapshot
216+
// of the most recently emitted metrics. This is due to the nondeterminism of
217+
// scheduler updates with respect to test bodies, so the assertions made are
218+
// from the most recently synced state of the system (picker/scheduler) from the
219+
// test body.
220+
func (s) TestWRRMetricsBasic(t *testing.T) {
221+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
222+
defer cancel()
223+
224+
srv := startServer(t, reportCall)
225+
sc := svcConfig(t, testMetricsConfig)
226+
227+
mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})
228+
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
229+
t.Fatalf("Error starting client: %v", err)
230+
}
231+
srv.callMetrics.SetQPS(float64(1))
232+
233+
if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
234+
t.Fatalf("Error from EmptyCall: %v", err)
235+
}
236+
237+
mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn.
238+
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted).
239+
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
240+
// Unusable, so no endpoint weight. Due to only one SubConn, this will never
241+
// update the weight. Thus, this will stay 0.
242+
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
243+
}
244+
199245
// Tests two addresses with ORCA reporting disabled (should fall back to pure
200246
// RR).
201247
func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) {
@@ -509,7 +555,8 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) {
509555
cfg := oobConfig
510556
cfg.BlackoutPeriod = tc.blackoutPeriodCfg
511557
sc := svcConfig(t, cfg)
512-
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
558+
mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})
559+
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
513560
t.Fatalf("Error starting client: %v", err)
514561
}
515562
addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
@@ -536,12 +583,20 @@ func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) {
536583
// Wait for the weight update period to allow the new weights to be processed.
537584
time.Sleep(weightUpdatePeriod)
538585
checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
586+
587+
mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
588+
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0)
589+
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
590+
// Either 10 or 100, dependent on whatever ordering SubConns are
591+
// processed, which is nondeterministic.
592+
mr.AssertEitherDataForMetric("grpc.lb.wrr.endpoint_weights", 10, 100)
539593
}
540594
}
541595

542596
// Tests that the weight expiration period causes backends to use 0 as their
543597
// weight (meaning to use the average weight) once the expiration period
544-
// elapses.
598+
// elapses. After the weight expires, the expected metrics to be emitted from
599+
// WRR are also configured.
545600
func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
546601
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
547602
defer cancel()
@@ -577,7 +632,8 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
577632
cfg := oobConfig
578633
cfg.OOBReportingPeriod = stringp("60s")
579634
sc := svcConfig(t, cfg)
580-
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
635+
mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})
636+
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
581637
t.Fatalf("Error starting client: %v", err)
582638
}
583639
addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
@@ -605,6 +661,11 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
605661
// Wait for the weight expiration period so the weights have expired.
606662
time.Sleep(weightUpdatePeriod)
607663
checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
664+
665+
mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
666+
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1)
667+
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
668+
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
608669
}
609670

610671
// Tests logic surrounding subchannel management.

balancer/weightedroundrobin/scheduler.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,16 @@ type scheduler interface {
3131
// len(scWeights)-1 are zero or there is only a single subconn, otherwise it
3232
// will return an Earliest Deadline First (EDF) scheduler implementation that
3333
// selects the subchannels according to their weights.
34-
func (p *picker) newScheduler() scheduler {
35-
scWeights := p.scWeights(true)
34+
func (p *picker) newScheduler(recordMetrics bool) scheduler {
35+
scWeights := p.scWeights(recordMetrics)
3636
n := len(scWeights)
3737
if n == 0 {
3838
return nil
3939
}
4040
if n == 1 {
41-
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
41+
if recordMetrics {
42+
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
43+
}
4244
return &rrScheduler{numSCs: 1, inc: p.inc}
4345
}
4446
sum := float64(0)
@@ -55,7 +57,9 @@ func (p *picker) newScheduler() scheduler {
5557
}
5658

5759
if numZero >= n-1 {
58-
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
60+
if recordMetrics {
61+
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
62+
}
5963
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
6064
}
6165
unscaledMean := sum / float64(n-numZero)

internal/testutils/stats/test_metrics_recorder.go

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ package stats
2121

2222
import (
2323
"context"
24+
"sync"
2425
"testing"
26+
"time"
2527

2628
"github.com/google/go-cmp/cmp"
2729
estats "google.golang.org/grpc/experimental/stats"
@@ -41,18 +43,65 @@ type TestMetricsRecorder struct {
4143
intHistoCh *testutils.Channel
4244
floatHistoCh *testutils.Channel
4345
intGaugeCh *testutils.Channel
46+
47+
// The most recent update for each metric name.
48+
mu sync.Mutex
49+
data map[estats.Metric]float64
4450
}
4551

4652
func NewTestMetricsRecorder(t *testing.T, metrics []string) *TestMetricsRecorder {
47-
return &TestMetricsRecorder{
53+
tmr := &TestMetricsRecorder{
4854
t: t,
4955

50-
intCountCh: testutils.NewChannelWithSize(10),
51-
floatCountCh: testutils.NewChannelWithSize(10),
52-
intHistoCh: testutils.NewChannelWithSize(10),
53-
floatHistoCh: testutils.NewChannelWithSize(10),
54-
intGaugeCh: testutils.NewChannelWithSize(10),
56+
intCountCh: testutils.NewChannelWithSize(1000),
57+
floatCountCh: testutils.NewChannelWithSize(1000),
58+
intHistoCh: testutils.NewChannelWithSize(1000),
59+
floatHistoCh: testutils.NewChannelWithSize(1000),
60+
intGaugeCh: testutils.NewChannelWithSize(1000),
61+
62+
data: make(map[estats.Metric]float64),
63+
}
64+
65+
for _, metric := range metrics {
66+
tmr.data[estats.Metric(metric)] = 0
67+
}
68+
69+
return tmr
70+
}
71+
72+
// AssertDataForMetric asserts data is present for metric. The zero value in the
73+
// check is equivalent to unset.
74+
func (r *TestMetricsRecorder) AssertDataForMetric(metricName string, wantVal float64) {
75+
r.mu.Lock()
76+
defer r.mu.Unlock()
77+
if r.data[estats.Metric(metricName)] != wantVal {
78+
r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v", metricName, r.data[estats.Metric(metricName)], wantVal)
79+
}
80+
}
81+
82+
// AssertEitherDataForMetric asserts either data point is present for metric.
83+
// The zero value in the check is equivalent to unset.
84+
85+
func (r *TestMetricsRecorder) AssertEitherDataForMetric(metricName string, wantVal1 float64, wantVal2 float64) {
86+
r.mu.Lock()
87+
defer r.mu.Unlock()
88+
if r.data[estats.Metric(metricName)] != wantVal1 && r.data[estats.Metric(metricName)] != wantVal2 {
89+
r.t.Fatalf("Unexpected data for metric %v, got: %v, want: %v or %v", metricName, r.data[estats.Metric(metricName)], wantVal1, wantVal2)
90+
}
91+
}
92+
93+
// PollForDataForMetric polls the metric data for the want. Fails if context
94+
// provided expires before data for metric is found.
95+
func (r *TestMetricsRecorder) PollForDataForMetric(ctx context.Context, metricName string, wantVal float64) {
96+
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
97+
r.mu.Lock()
98+
if r.data[estats.Metric(metricName)] == wantVal {
99+
r.mu.Unlock()
100+
break
101+
}
102+
r.mu.Unlock()
55103
}
104+
r.t.Fatalf("Timeout waiting for data %v for metric %v", wantVal, metricName)
56105
}
57106

58107
type MetricsData struct {
@@ -85,6 +134,10 @@ func (r *TestMetricsRecorder) RecordInt64Count(handle *estats.Int64CountHandle,
85134
LabelKeys: append(handle.Labels, handle.OptionalLabels...),
86135
LabelVals: labels,
87136
})
137+
138+
r.mu.Lock()
139+
defer r.mu.Unlock()
140+
r.data[handle.Name] = float64(incr)
88141
}
89142

90143
func (r *TestMetricsRecorder) WaitForFloat64Count(ctx context.Context, metricsDataWant MetricsData) {
@@ -105,6 +158,10 @@ func (r *TestMetricsRecorder) RecordFloat64Count(handle *estats.Float64CountHand
105158
LabelKeys: append(handle.Labels, handle.OptionalLabels...),
106159
LabelVals: labels,
107160
})
161+
162+
r.mu.Lock()
163+
defer r.mu.Unlock()
164+
r.data[handle.Name] = incr
108165
}
109166

110167
func (r *TestMetricsRecorder) WaitForInt64Histo(ctx context.Context, metricsDataWant MetricsData) {
@@ -125,6 +182,10 @@ func (r *TestMetricsRecorder) RecordInt64Histo(handle *estats.Int64HistoHandle,
125182
LabelKeys: append(handle.Labels, handle.OptionalLabels...),
126183
LabelVals: labels,
127184
})
185+
186+
r.mu.Lock()
187+
defer r.mu.Unlock()
188+
r.data[handle.Name] = float64(incr)
128189
}
129190

130191
func (r *TestMetricsRecorder) WaitForFloat64Histo(ctx context.Context, metricsDataWant MetricsData) {
@@ -145,6 +206,10 @@ func (r *TestMetricsRecorder) RecordFloat64Histo(handle *estats.Float64HistoHand
145206
LabelKeys: append(handle.Labels, handle.OptionalLabels...),
146207
LabelVals: labels,
147208
})
209+
210+
r.mu.Lock()
211+
defer r.mu.Unlock()
212+
r.data[handle.Name] = incr
148213
}
149214

150215
func (r *TestMetricsRecorder) WaitForInt64Gauge(ctx context.Context, metricsDataWant MetricsData) {
@@ -165,6 +230,10 @@ func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle,
165230
LabelKeys: append(handle.Labels, handle.OptionalLabels...),
166231
LabelVals: labels,
167232
})
233+
234+
r.mu.Lock()
235+
defer r.mu.Unlock()
236+
r.data[handle.Name] = float64(incr)
168237
}
169238

170239
// To implement a stats.Handler, which allows it to be set as a dial option:
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package setup
20+
21+
import (
22+
"testing"
23+
24+
"github.com/google/uuid"
25+
"google.golang.org/grpc/internal"
26+
"google.golang.org/grpc/internal/testutils/xds/e2e"
27+
"google.golang.org/grpc/resolver"
28+
_ "google.golang.org/grpc/xds" // Register the xds_resolver.
29+
)
30+
31+
// ManagementServerAndResolver sets up an xDS management server, creates
32+
// bootstrap configuration pointing to that server and creates an xDS resolver
33+
// using that configuration.
34+
//
35+
// Registers a cleanup function on t to stop the management server.
36+
//
37+
// Returns the following:
38+
// - the xDS management server
39+
// - the node ID to use when talking to this management server
40+
// - bootstrap configuration to use (if creating an xDS-enabled gRPC server)
41+
// - xDS resolver builder (if creating an xDS-enabled gRPC client)
42+
func ManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder) {
43+
// Start an xDS management server.
44+
xdsServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
45+
46+
// Create bootstrap configuration pointing to the above management server.
47+
nodeID := uuid.New().String()
48+
bc := e2e.DefaultBootstrapContents(t, nodeID, xdsServer.Address)
49+
50+
// Create an xDS resolver with the above bootstrap configuration.
51+
var r resolver.Builder
52+
var err error
53+
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
54+
r, err = newResolver.(func([]byte) (resolver.Builder, error))(bc)
55+
if err != nil {
56+
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
57+
}
58+
}
59+
60+
return xdsServer, nodeID, bc, r
61+
}

0 commit comments

Comments
 (0)