@@ -32,7 +32,9 @@ import (
3232 "google.golang.org/grpc/balancer"
3333 "google.golang.org/grpc/balancer/base"
3434 "google.golang.org/grpc/balancer/weightedroundrobin/internal"
35+ "google.golang.org/grpc/balancer/weightedtarget"
3536 "google.golang.org/grpc/connectivity"
37+ estats "google.golang.org/grpc/experimental/stats"
3638 "google.golang.org/grpc/internal/grpclog"
3739 iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
3840 "google.golang.org/grpc/orca"
@@ -45,6 +47,43 @@ import (
4547// Name is the name of the weighted round robin balancer.
4648const Name = "weighted_round_robin"
4749
50+ var (
51+ rrFallbackMetric = estats .RegisterInt64Count (estats.MetricDescriptor {
52+ Name : "grpc.lb.wrr.rr_fallback" ,
53+ Description : "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior." ,
54+ Unit : "update" ,
55+ Labels : []string {"grpc.target" },
56+ OptionalLabels : []string {"grpc.lb.locality" },
57+ Default : false ,
58+ })
59+
60+ endpointWeightNotYetUsableMetric = estats .RegisterInt64Count (estats.MetricDescriptor {
61+ Name : "grpc.lb.wrr.endpoint_weight_not_yet_usable" ,
62+ Description : "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period)." ,
63+ Unit : "endpoint" ,
64+ Labels : []string {"grpc.target" },
65+ OptionalLabels : []string {"grpc.lb.locality" },
66+ Default : false ,
67+ })
68+
69+ endpointWeightStaleMetric = estats .RegisterInt64Count (estats.MetricDescriptor {
70+ Name : "grpc.lb.wrr.endpoint_weight_stale" ,
71+ Description : "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period." ,
72+ Unit : "endpoint" ,
73+ Labels : []string {"grpc.target" },
74+ OptionalLabels : []string {"grpc.lb.locality" },
75+ Default : false ,
76+ })
77+ endpointWeightsMetric = estats .RegisterFloat64Histo (estats.MetricDescriptor {
78+ Name : "grpc.lb.wrr.endpoint_weights" ,
79+ Description : "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0." ,
80+ Unit : "endpoint" ,
81+ Labels : []string {"grpc.target" },
82+ OptionalLabels : []string {"grpc.lb.locality" },
83+ Default : false ,
84+ })
85+ )
86+
4887func init () {
4988 balancer .Register (bb {})
5089}
@@ -58,7 +97,10 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
5897 csEvltr : & balancer.ConnectivityStateEvaluator {},
5998 scMap : make (map [balancer.SubConn ]* weightedSubConn ),
6099 connectivityState : connectivity .Connecting ,
100+ target : bOpts .Target .String (),
101+ metricsRecorder : bOpts .MetricsRecorder ,
61102 }
103+
62104 b .logger = prefixLogger (b )
63105 b .logger .Infof ("Created" )
64106 return b
@@ -101,8 +143,11 @@ func (bb) Name() string {
101143
102144// wrrBalancer implements the weighted round robin LB policy.
103145type wrrBalancer struct {
104- cc balancer.ClientConn
105- logger * grpclog.PrefixLogger
146+ // The following fields are immutable.
147+ cc balancer.ClientConn
148+ logger * grpclog.PrefixLogger
149+ target string
150+ metricsRecorder estats.MetricsRecorder
106151
107152 // The following fields are only accessed on calls into the LB policy, and
108153 // do not need a mutex.
@@ -114,6 +159,7 @@ type wrrBalancer struct {
114159 resolverErr error // the last error reported by the resolver; cleared on successful resolution
115160 connErr error // the last connection error; cleared upon leaving TransientFailure
116161 stopPicker func ()
162+ locality string
117163}
118164
119165func (b * wrrBalancer ) UpdateClientConnState (ccs balancer.ClientConnState ) error {
@@ -125,6 +171,7 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
125171 }
126172
127173 b .cfg = cfg
174+ b .locality = weightedtarget .LocalityFromResolverState (ccs .ResolverState )
128175 b .updateAddresses (ccs .ResolverState .Addresses )
129176
130177 if len (ccs .ResolverState .Addresses ) == 0 {
@@ -171,6 +218,10 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
171218 // Initially, we set load reports to off, because they are not
172219 // running upon initial weightedSubConn creation.
173220 cfg : & lbConfig {EnableOOBLoadReport : false },
221+
222+ metricsRecorder : b .metricsRecorder ,
223+ target : b .target ,
224+ locality : b .locality ,
174225 }
175226 b .subConns .Set (addr , wsc )
176227 b .scMap [sc ] = wsc
@@ -318,9 +369,12 @@ func (b *wrrBalancer) regeneratePicker() {
318369 }
319370
320371 p := & picker {
321- v : rand .Uint32 (), // start the scheduler at a random point
322- cfg : b .cfg ,
323- subConns : b .readySubConns (),
372+ v : rand .Uint32 (), // start the scheduler at a random point
373+ cfg : b .cfg ,
374+ subConns : b .readySubConns (),
375+ metricsRecorder : b .metricsRecorder ,
376+ locality : b .locality ,
377+ target : b .target ,
324378 }
325379 var ctx context.Context
326380 ctx , b .stopPicker = context .WithCancel (context .Background ())
@@ -339,16 +393,20 @@ type picker struct {
339393 v uint32 // incrementing value used by the scheduler; accessed atomically
340394 cfg * lbConfig // active config when picker created
341395 subConns []* weightedSubConn // all READY subconns
396+
397+ // The following fields are immutable.
398+ target string
399+ locality string
400+ metricsRecorder estats.MetricsRecorder
342401}
343402
344- // scWeights returns a slice containing the weights from p.subConns in the same
345- // order as p.subConns.
346- func (p * picker ) scWeights () []float64 {
403+ func (p * picker ) scWeights (recordMetrics bool ) []float64 {
347404 ws := make ([]float64 , len (p .subConns ))
348405 now := internal .TimeNow ()
349406 for i , wsc := range p .subConns {
350- ws [i ] = wsc .weight (now , time .Duration (p .cfg .WeightExpirationPeriod ), time .Duration (p .cfg .BlackoutPeriod ))
407+ ws [i ] = wsc .weight (now , time .Duration (p .cfg .WeightExpirationPeriod ), time .Duration (p .cfg .BlackoutPeriod ), recordMetrics )
351408 }
409+
352410 return ws
353411}
354412
@@ -357,7 +415,7 @@ func (p *picker) inc() uint32 {
357415}
358416
359417func (p * picker ) regenerateScheduler () {
360- s := newScheduler ( p . scWeights (), p . inc )
418+ s := p . newScheduler ( )
361419 atomic .StorePointer (& p .scheduler , unsafe .Pointer (& s ))
362420}
363421
@@ -367,6 +425,7 @@ func (p *picker) start(ctx context.Context) {
367425 // No need to regenerate weights with only one backend.
368426 return
369427 }
428+
370429 go func () {
371430 ticker := time .NewTicker (time .Duration (p .cfg .WeightUpdatePeriod ))
372431 defer ticker .Stop ()
@@ -404,8 +463,12 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
404463// When needed, it also tracks connectivity state, listens for metrics updates
405464// by implementing the orca.OOBListener interface and manages that listener.
406465type weightedSubConn struct {
466+ // The following fields are immutable.
407467 balancer.SubConn
408- logger * grpclog.PrefixLogger
468+ logger * grpclog.PrefixLogger
469+ target string
470+ metricsRecorder estats.MetricsRecorder
471+ locality string
409472
410473 // The following fields are only accessed on calls into the LB policy, and
411474 // do not need a mutex.
@@ -527,21 +590,37 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect
527590
528591// weight returns the current effective weight of the subconn, taking into
529592// account the parameters. Returns 0 for blacked out or expired data, which
530- // will cause the backend weight to be treated as the mean of the weights of
531- // the other backends.
532- func (w * weightedSubConn ) weight (now time.Time , weightExpirationPeriod , blackoutPeriod time.Duration ) float64 {
593+ // will cause the backend weight to be treated as the mean of the weights of the
594+ // other backends. If forScheduler is set to true, this function will emit
595+ // metrics through the mtrics registry.
596+ func (w * weightedSubConn ) weight (now time.Time , weightExpirationPeriod , blackoutPeriod time.Duration , recordMetrics bool ) (weight float64 ) {
533597 w .mu .Lock ()
534598 defer w .mu .Unlock ()
599+
600+ if recordMetrics {
601+ defer func () {
602+ endpointWeightsMetric .Record (w .metricsRecorder , weight , w .target , w .locality )
603+ }()
604+ }
605+
535606 // If the most recent update was longer ago than the expiration period,
536607 // reset nonEmptySince so that we apply the blackout period again if we
537608 // start getting data again in the future, and return 0.
538609 if now .Sub (w .lastUpdated ) >= weightExpirationPeriod {
610+ if recordMetrics {
611+ endpointWeightStaleMetric .Record (w .metricsRecorder , 1 , w .target , w .locality )
612+ }
539613 w .nonEmptySince = time.Time {}
540614 return 0
541615 }
616+
542617 // If we don't have at least blackoutPeriod worth of data, return 0.
543618 if blackoutPeriod != 0 && (w .nonEmptySince == (time.Time {}) || now .Sub (w .nonEmptySince ) < blackoutPeriod ) {
619+ if recordMetrics {
620+ endpointWeightNotYetUsableMetric .Record (w .metricsRecorder , 1 , w .target , w .locality )
621+ }
544622 return 0
545623 }
624+
546625 return w .weightVal
547626}
0 commit comments