Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 96 additions & 17 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/weightedroundrobin/internal"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/connectivity"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal/grpclog"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/orca"
Expand All @@ -45,6 +47,43 @@ import (
// Name is the name of the weighted round robin balancer.
const Name = "weighted_round_robin"

var (
rrFallbackMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.wrr.rr_fallback",
Description: "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.",
Unit: "update",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.locality"},
Default: false,
})

endpointWeightNotYetUsableMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.wrr.endpoint_weight_not_yet_usable",
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).",
Unit: "endpoint",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.locality"},
Default: false,
})

endpointWeightStaleMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
Name: "grpc.lb.wrr.endpoint_weight_stale",
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.",
Unit: "endpoint",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.locality"},
Default: false,
})
endpointWeightsMetric = estats.RegisterFloat64Histo(estats.MetricDescriptor{
Name: "grpc.lb.wrr.endpoint_weights",
Description: "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.",
Unit: "endpoint",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.locality"},
Default: false,
})
)

func init() {
balancer.Register(bb{})
}
Expand All @@ -58,7 +97,10 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
csEvltr: &balancer.ConnectivityStateEvaluator{},
scMap: make(map[balancer.SubConn]*weightedSubConn),
connectivityState: connectivity.Connecting,
target: bOpts.Target.String(),
metricsRecorder: bOpts.MetricsRecorder,
}

b.logger = prefixLogger(b)
b.logger.Infof("Created")
return b
Expand Down Expand Up @@ -101,8 +143,11 @@ func (bb) Name() string {

// wrrBalancer implements the weighted round robin LB policy.
type wrrBalancer struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
// The following fields are immutable.
cc balancer.ClientConn
logger *grpclog.PrefixLogger
target string
metricsRecorder estats.MetricsRecorder

// The following fields are only accessed on calls into the LB policy, and
// do not need a mutex.
Expand All @@ -114,6 +159,7 @@ type wrrBalancer struct {
resolverErr error // the last error reported by the resolver; cleared on successful resolution
connErr error // the last connection error; cleared upon leaving TransientFailure
stopPicker func()
locality string
}

func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
Expand All @@ -125,6 +171,7 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
}

b.cfg = cfg
b.locality = weightedtarget.LocalityFromResolverState(ccs.ResolverState)
b.updateAddresses(ccs.ResolverState.Addresses)

if len(ccs.ResolverState.Addresses) == 0 {
Expand Down Expand Up @@ -171,6 +218,10 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
// Initially, we set load reports to off, because they are not
// running upon initial weightedSubConn creation.
cfg: &lbConfig{EnableOOBLoadReport: false},

metricsRecorder: b.metricsRecorder,
target: b.target,
locality: b.locality,
}
b.subConns.Set(addr, wsc)
b.scMap[sc] = wsc
Expand Down Expand Up @@ -318,9 +369,12 @@ func (b *wrrBalancer) regeneratePicker() {
}

p := &picker{
v: rand.Uint32(), // start the scheduler at a random point
cfg: b.cfg,
subConns: b.readySubConns(),
v: rand.Uint32(), // start the scheduler at a random point
cfg: b.cfg,
subConns: b.readySubConns(),
metricsRecorder: b.metricsRecorder,
locality: b.locality,
target: b.target,
}
var ctx context.Context
ctx, b.stopPicker = context.WithCancel(context.Background())
Expand All @@ -339,16 +393,20 @@ type picker struct {
v uint32 // incrementing value used by the scheduler; accessed atomically
cfg *lbConfig // active config when picker created
subConns []*weightedSubConn // all READY subconns

// The following fields are immutable.
target string
locality string
metricsRecorder estats.MetricsRecorder
}

// scWeights returns a slice containing the weights from p.subConns in the same
// order as p.subConns.
func (p *picker) scWeights() []float64 {
func (p *picker) scWeights(recordMetrics bool) []float64 {
ws := make([]float64, len(p.subConns))
now := internal.TimeNow()
for i, wsc := range p.subConns {
ws[i] = wsc.weight(now, time.Duration(p.cfg.WeightExpirationPeriod), time.Duration(p.cfg.BlackoutPeriod))
ws[i] = wsc.weight(now, time.Duration(p.cfg.WeightExpirationPeriod), time.Duration(p.cfg.BlackoutPeriod), recordMetrics)
}

return ws
}

Expand All @@ -357,7 +415,7 @@ func (p *picker) inc() uint32 {
}

func (p *picker) regenerateScheduler() {
s := newScheduler(p.scWeights(), p.inc)
s := p.newScheduler()
atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s))
}

Expand All @@ -367,6 +425,7 @@ func (p *picker) start(ctx context.Context) {
// No need to regenerate weights with only one backend.
return
}

go func() {
ticker := time.NewTicker(time.Duration(p.cfg.WeightUpdatePeriod))
defer ticker.Stop()
Expand Down Expand Up @@ -404,8 +463,12 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// When needed, it also tracks connectivity state, listens for metrics updates
// by implementing the orca.OOBListener interface and manages that listener.
type weightedSubConn struct {
// The following fields are immutable.
balancer.SubConn
logger *grpclog.PrefixLogger
logger *grpclog.PrefixLogger
target string
metricsRecorder estats.MetricsRecorder
locality string

// The following fields are only accessed on calls into the LB policy, and
// do not need a mutex.
Expand Down Expand Up @@ -527,21 +590,37 @@ func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connect

// weight returns the current effective weight of the subconn, taking into
// account the parameters. Returns 0 for blacked out or expired data, which
// will cause the backend weight to be treated as the mean of the weights of
// the other backends.
func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration) float64 {
// will cause the backend weight to be treated as the mean of the weights of the
// other backends. If forScheduler is set to true, this function will emit
// metrics through the mtrics registry.
func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration, recordMetrics bool) float64 {
w.mu.Lock()
defer w.mu.Unlock()

weight := float64(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Named return values are totally fine if you want to keep it. Just don't do a bare return, since it can be confusing (for a minute I actually thought you removed the return value when I saw the bare return, and had to go back to see what was happening).

(But you don't need to initialize them to 0 as in the previous code; Go always initializes everything to zero.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to named return value.

if recordMetrics {
defer func() {
endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality)
}()
}

// If the most recent update was longer ago than the expiration period,
// reset nonEmptySince so that we apply the blackout period again if we
// start getting data again in the future, and return 0.
if now.Sub(w.lastUpdated) >= weightExpirationPeriod {
if recordMetrics {
endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
}
w.nonEmptySince = time.Time{}
return 0
return weight
}

// If we don't have at least blackoutPeriod worth of data, return 0.
if blackoutPeriod != 0 && (w.nonEmptySince == (time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) {
return 0
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)

return weight
}
return w.weightVal
weight = w.weightVal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah...I'd prefer to use what you had before. Use the named return value, and don't ever assign to weight at all. Just read it in the defer to find out what was returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to named return value.

return weight
}
14 changes: 9 additions & 5 deletions balancer/weightedroundrobin/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ type scheduler interface {
// len(scWeights)-1 are zero or there is only a single subconn, otherwise it
// will return an Earliest Deadline First (EDF) scheduler implementation that
// selects the subchannels according to their weights.
func newScheduler(scWeights []float64, inc func() uint32) scheduler {
func (p *picker) newScheduler() scheduler {
scWeights := p.scWeights(true)
n := len(scWeights)
if n == 0 {
return nil
}
if n == 1 {
return &rrScheduler{numSCs: 1, inc: inc}
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
return &rrScheduler{numSCs: 1, inc: p.inc}
}
sum := float64(0)
numZero := 0
Expand All @@ -51,8 +53,10 @@ func newScheduler(scWeights []float64, inc func() uint32) scheduler {
numZero++
}
}

if numZero >= n-1 {
return &rrScheduler{numSCs: uint32(n), inc: inc}
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
}
unscaledMean := sum / float64(n-numZero)
scalingFactor := maxWeight / max
Expand All @@ -74,11 +78,11 @@ func newScheduler(scWeights []float64, inc func() uint32) scheduler {
}

if allEqual {
return &rrScheduler{numSCs: uint32(n), inc: inc}
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
}

logger.Infof("using edf scheduler with weights: %v", weights)
return &edfScheduler{weights: weights, inc: inc}
return &edfScheduler{weights: weights, inc: p.inc}
}

const maxWeight = math.MaxUint16
Expand Down
13 changes: 12 additions & 1 deletion balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ type weightedTargetBalancer struct {
targets map[string]Target
}

type localityKeyType string

const localityKey = localityKeyType("locality")

// LocalityFromResolverState returns the locality from the resolver.State
// provided, or an empty string if not present.
func LocalityFromResolverState(state resolver.State) string {
locality, _ := state.Attributes.Value(localityKey).(string)
return locality
}

// UpdateClientConnState takes the new targets in balancer group,
// creates/deletes sub-balancers and sends them update. addresses are split into
// groups based on hierarchy path.
Expand Down Expand Up @@ -142,7 +153,7 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
ResolverState: resolver.State{
Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes,
Attributes: s.ResolverState.Attributes.WithValue(localityKey, name),
},
BalancerConfig: newT.ChildPolicy.Config,
})
Expand Down