Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (p *picker) inc() uint32 {
}

func (p *picker) regenerateScheduler() {
s := p.newScheduler()
s := p.newScheduler(true)
atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s))
}

Expand Down Expand Up @@ -558,14 +558,17 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
w.SubConn.Connect()
case connectivity.Ready:
// If we transition back to READY state, reset nonEmptySince so that we
// apply the blackout period after we start receiving load data. Note
// that we cannot guarantee that we will never receive lingering
// callbacks for backend metric reports from the previous connection
// after the new connection has been established, but they should be
// masked by new backend metric reports from the new connection by the
// time the blackout period ends.
// apply the blackout period after we start receiving load data. Also
// reset lastUpdated to trigger endpoint weight not yet usable in the
// case endpoint gets asked what weight it is before receiving a new
// load report. Note that we cannot guarantee that we will never receive
// lingering callbacks for backend metric reports from the previous
// connection after the new connection has been established, but they
// should be masked by new backend metric reports from the new
// connection by the time the blackout period ends.
w.mu.Lock()
w.nonEmptySince = time.Time{}
w.lastUpdated = time.Time{}
w.mu.Unlock()
case connectivity.Shutdown:
if w.stopORCAListener != nil {
Expand All @@ -592,7 +595,7 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
// account the parameters. Returns 0 for blacked out or expired data, which
// will cause the backend weight to be treated as the mean of the weights of the
// other backends. If forScheduler is set to true, this function will emit
// metrics through the mtrics registry.
// metrics through the metrics registry.
func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration, recordMetrics bool) (weight float64) {
w.mu.Lock()
defer w.mu.Unlock()
Expand All @@ -603,6 +606,13 @@ func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackout
}()
}

// The SubConn has not received a load report (i.e. just turned READY with
// no load report).
if w.lastUpdated == (time.Time{}) {
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
return 0
}

// If the most recent update was longer ago than the expiration period,
// reset nonEmptySince so that we apply the blackout period again if we
// start getting data again in the future, and return 0.
Expand Down
49 changes: 48 additions & 1 deletion balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -81,6 +82,14 @@ var (
WeightUpdatePeriod: stringp(".050s"),
ErrorUtilizationPenalty: float64p(0),
}
testMetricsConfig = iwrr.LBConfig{
EnableOOBLoadReport: boolp(false),
OOBReportingPeriod: stringp("0.005s"),
BlackoutPeriod: stringp("0s"),
WeightExpirationPeriod: stringp("60s"),
WeightUpdatePeriod: stringp(".050s"),
ErrorUtilizationPenalty: float64p(0),
}
)

type testServer struct {
Expand Down Expand Up @@ -196,6 +205,43 @@ func (s) TestBalancer_OneAddress(t *testing.T) {
}
}

// TestWRRMetricsBasic tests metrics emitted from the WRR balancer. It
// configures a weighted round robin balancer as the top level balancer of a
// ClientConn, and configures a fake stats handler on the ClientConn to receive
// metrics. It verifies stats emitted from the Weighted Round Robin Balancer on
// balancer startup case which triggers the first picker and scheduler update
// before any load reports are received.
//
// Note that this test and others, metrics emission asssertions are a snapshot
// of the most recently emitted metrics. This is due to the nondeterminism of
// scheduler updates with respect to test bodies, so the assertions made are
// from the most recently synced state of the system (picker/scheduler) from the
// test body.
func (s) TestWRRMetricsBasic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

srv := startServer(t, reportCall)
sc := svcConfig(t, testMetricsConfig)

mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
srv.callMetrics.SetQPS(float64(1))

if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("Error from EmptyCall: %v", err)
}

mr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1) // Falls back because only one SubConn.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0 (never emitted).
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight. Due to only one SubConn, this will never
// update the weight. Thus, this will stay 0.
mr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
}

// Tests two addresses with ORCA reporting disabled (should fall back to pure
// RR).
func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) {
Expand Down Expand Up @@ -577,7 +623,8 @@ func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
cfg := oobConfig
cfg.OOBReportingPeriod = stringp("60s")
sc := svcConfig(t, cfg)
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
mr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})
if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(mr)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
Expand Down
136 changes: 136 additions & 0 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package weightedroundrobin

import (
"testing"
"time"

"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils/stats"
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestWRR_Metrics_SubConnWeight tests different scenarios for the weight call
// on a weighted SubConn, and expects certain metrics for each of these
// scenarios.
func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})

wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 3,
}

// The weighted SubConn's lastUpdated field hasn't been set, so this
// SubConn's weight is not yet usable. Thus, should emit that endpoint
// weight is not yet usable, and 0 for weight.
wsc.weight(time.Now(), time.Second, time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0) // The endpoint weight has not expired so this is 0.
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight (i.e. 0).
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()

// Setup a scenario where the SubConn's weight expires. Thus, should emit
// that endpoint weight is stale, and 0 for weight.
wsc.lastUpdated = time.Now()
wsc.weight(time.Now().Add(100*time.Second), 2*time.Second, time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0)
// Unusable, so no endpoint weight (i.e. 0).
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()

// Setup a scenario where the SubConn's weight is in the blackout period.
// Thus, should emit that endpoint weight is not yet usable, and 0 for
// weight.
wsc.weight(time.Now(), time.Minute, 10*time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 1)
// Unusable, so no endpoint weight (i.e. 0).
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()

// Setup a scenario where SubConn's weight is what is persists in weight
// field. This is triggered by last update being past blackout period and
// before weight update period. Should thus emit that endpoint weight is 3,
// and no other metrics.
wsc.nonEmptySince = time.Now()
wsc.weight(time.Now().Add(10*time.Second), time.Minute, time.Second, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 3)
tmr.ClearMetrics()

// Setup a scenario where a SubConn's weight both expires and is within the
// blackout period. In this case, weight expiry should take precedence with
// respect to metrics emitted. Thus, should emit that endpoint weight is not
// yet usable, and 0 for weight.
wsc.weight(time.Now().Add(10*time.Second), time.Second, time.Minute, true)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_stale", 1)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weight_not_yet_usable", 0)
tmr.AssertDataForMetric("grpc.lb.wrr.endpoint_weights", 0)
tmr.ClearMetrics()
}

// TestWRR_Metrics_Scheduler_RR_Fallback tests the round robin fallback metric
// for scheduler updates. It tests the case with one SubConn, and two SubConns
// with no weights. Both of these should emit a count metric for round robin
// fallback.
func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
tmr := stats.NewTestMetricsRecorder(t, []string{"grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"})
wsc := &weightedSubConn{
metricsRecorder: tmr,
weightVal: 0,
}

p := &picker{
cfg: &lbConfig{
BlackoutPeriod: iserviceconfig.Duration(10 * time.Second),
WeightExpirationPeriod: iserviceconfig.Duration(3 * time.Minute),
},
subConns: []*weightedSubConn{wsc},
metricsRecorder: tmr,
}
// There is only one SubConn, so no matter if the SubConn has a weight or
// not will fallback to round robin.
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
tmr.ClearMetrics()

// With two SubConns, if neither of them have weights, it will also fallback
// to round robin.
wsc2 := &weightedSubConn{
target: "target",
metricsRecorder: tmr,
weightVal: 0,
}
p.subConns = append(p.subConns, wsc2)
p.regenerateScheduler()
tmr.AssertDataForMetric("grpc.lb.wrr.rr_fallback", 1)
}
12 changes: 8 additions & 4 deletions balancer/weightedroundrobin/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ type scheduler interface {
// len(scWeights)-1 are zero or there is only a single subconn, otherwise it
// will return an Earliest Deadline First (EDF) scheduler implementation that
// selects the subchannels according to their weights.
func (p *picker) newScheduler() scheduler {
scWeights := p.scWeights(true)
func (p *picker) newScheduler(recordMetrics bool) scheduler {
scWeights := p.scWeights(recordMetrics)
n := len(scWeights)
if n == 0 {
return nil
}
if n == 1 {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
if recordMetrics {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
}
return &rrScheduler{numSCs: 1, inc: p.inc}
}
sum := float64(0)
Expand All @@ -55,7 +57,9 @@ func (p *picker) newScheduler() scheduler {
}

if numZero >= n-1 {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
if recordMetrics {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
}
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
}
unscaledMean := sum / float64(n-numZero)
Expand Down
Loading