Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 115 additions & 58 deletions balancer/xds/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ package xds
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -130,7 +129,7 @@ type xdsBalancer struct {
config *xdsConfig // may change when passed a different service config
xdsLB edsBalancerInterface
fallbackLB balancer.Balancer
fallbackInitData *addressUpdate // may change when HandleResolved address is called
fallbackInitData *resolver.State // may change when HandleResolved address is called
}

func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) {
Expand Down Expand Up @@ -218,43 +217,93 @@ func (x *xdsBalancer) run() {
}
}

func getBalancerConfig(serviceConfig string) *xdsConfig {
sc := parseFullServiceConfig(serviceConfig)
if sc == nil {
return nil
}
var xdsConfigRaw json.RawMessage
for _, lbcfg := range sc.LoadBalancingConfig {
if lbcfg.Name == xdsName {
xdsConfigRaw = lbcfg.Config
break
}
}
var cfg xdsConfig
if err := json.Unmarshal(xdsConfigRaw, &cfg); err != nil {
grpclog.Warningf("unable to unmarshal balancer config %s into xds config", string(xdsConfigRaw))
return nil
}
return &cfg
}

func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
switch u := update.(type) {
case *addressUpdate:
if x.fallbackLB != nil {
x.fallbackLB.HandleResolvedAddrs(u.addrs, u.err)
}
x.fallbackInitData = u
case *subConnStateUpdate:
if x.xdsLB != nil {
x.xdsLB.HandleSubConnStateChange(u.sc, u.state)
x.xdsLB.HandleSubConnStateChange(u.sc, u.state.ConnectivityState)
}
if x.fallbackLB != nil {
x.fallbackLB.HandleSubConnStateChange(u.sc, u.state)
if lb, ok := x.fallbackLB.(balancer.V2Balancer); ok {
lb.UpdateSubConnState(u.sc, u.state)
} else {
x.fallbackLB.HandleSubConnStateChange(u.sc, u.state.ConnectivityState)
}
}
case *xdsConfig:
if x.config == nil {
// The first time we get config, we just need to start the xdsClient.
x.startNewXDSClient(u)
x.config = u
case *resolver.State:
cfg := getBalancerConfig(u.ServiceConfig)
if cfg == nil {
// service config parsing failed. should never happen. And this parsing will be removed, once
// we support service config validation.
return
}
// With a different BalancerName, we need to create a new xdsClient.
// If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient.
// This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't.
if u.BalancerName != x.config.BalancerName || (u.ChildPolicy == nil) != (x.config.ChildPolicy == nil) {
x.startNewXDSClient(u)

var fallbackChanged bool
// service config has been updated.
if !reflect.DeepEqual(cfg, x.config) {
if x.config == nil {
// The first time we get config, we just need to start the xdsClient.
x.startNewXDSClient(cfg)
x.config = cfg
x.fallbackInitData = &resolver.State{
Addresses: u.Addresses,
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
// we can pass along the config struct.
}
return
}

// With a different BalancerName, we need to create a new xdsClient.
// If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient.
// This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't.
if cfg.BalancerName != x.config.BalancerName || (cfg.ChildPolicy == nil) != (x.config.ChildPolicy == nil) {
x.startNewXDSClient(cfg)
}
// We will update the xdsLB with the new child policy, if we got a different one and it's not nil.
// The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time.
if x.xdsLB != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) && cfg.ChildPolicy != nil {
x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
}

if x.fallbackLB != nil && !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) {
x.fallbackLB.Close()
x.buildFallBackBalancer(cfg)
fallbackChanged = true
}
}
// We will update the xdsLB with the new child policy, if we got a different one and it's not nil.
// The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time.
if !reflect.DeepEqual(u.ChildPolicy, x.config.ChildPolicy) && u.ChildPolicy != nil && x.xdsLB != nil {
x.xdsLB.HandleChildPolicy(u.ChildPolicy.Name, u.ChildPolicy.Config)

if x.fallbackLB != nil && (!reflect.DeepEqual(x.fallbackInitData.Addresses, u.Addresses) || fallbackChanged) {
x.updateFallbackWithResolverState(&resolver.State{
Addresses: u.Addresses,
})
}
if !reflect.DeepEqual(u.FallBackPolicy, x.config.FallBackPolicy) && x.fallbackLB != nil {
x.fallbackLB.Close()
x.startFallBackBalancer(u)

x.config = cfg
x.fallbackInitData = &resolver.State{
Addresses: u.Addresses,
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
// we can pass along the config struct.
}
x.config = u
default:
// unreachable path
panic("wrong update type")
Expand Down Expand Up @@ -341,17 +390,20 @@ func (w *xdsClientConn) UpdateBalancerState(s connectivity.State, p balancer.Pic
w.ClientConn.UpdateBalancerState(s, p)
}

type addressUpdate struct {
addrs []resolver.Address
err error
}

type subConnStateUpdate struct {
sc balancer.SubConn
state connectivity.State
state balancer.SubConnState
}

func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange")
}

func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs")
}

func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
update := &subConnStateUpdate{
sc: sc,
state: state,
Expand All @@ -362,29 +414,24 @@ func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connec
}
}

func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
update := &addressUpdate{
addrs: addrs,
err: err,
}
select {
case x.grpcUpdate <- update:
case <-x.ctx.Done():
}
type serviceConfig struct {
LoadBalancingConfig []*loadBalancingConfig
}

// TODO: once the API is merged, check whether we need to change the function name/signature here.
func (x *xdsBalancer) HandleBalancerConfig(config json.RawMessage) error {
var cfg xdsConfig
if err := json.Unmarshal(config, &cfg); err != nil {
return errors.New("unable to unmarshal balancer config into xds config")
func parseFullServiceConfig(s string) *serviceConfig {
var ret serviceConfig
err := json.Unmarshal([]byte(s), &ret)
if err != nil {
return nil
}
return &ret
}

func (x *xdsBalancer) UpdateResolverState(s resolver.State) {
select {
case x.grpcUpdate <- &cfg:
case x.grpcUpdate <- &s:
case <-x.ctx.Done():
}
return nil
}

type cdsResp struct {
Expand Down Expand Up @@ -441,10 +488,23 @@ func (x *xdsBalancer) switchFallback() {
x.xdsLB.Close()
x.xdsLB = nil
}
x.startFallBackBalancer(x.config)
x.buildFallBackBalancer(x.config)
x.updateFallbackWithResolverState(x.fallbackInitData)
x.cancelFallbackMonitoring()
}

func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) {
if lb, ok := x.fallbackLB.(balancer.V2Balancer); ok {
lb.UpdateResolverState(resolver.State{
Addresses: s.Addresses,
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
// we can pass along the config struct.
})
} else {
x.fallbackLB.HandleResolvedAddrs(s.Addresses, nil)
}
}

// x.cancelFallbackAndSwitchEDSBalancerIfNecessary() will be no-op if we have a working xds client.
// It will cancel fallback monitoring if we are in fallback monitoring stage.
// If there's no running edsBalancer currently, it will create one and initialize it. Also, it will
Expand All @@ -466,9 +526,9 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
}
}

func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) {
func (x *xdsBalancer) buildFallBackBalancer(c *xdsConfig) {
if c.FallBackPolicy == nil {
x.startFallBackBalancer(&xdsConfig{
x.buildFallBackBalancer(&xdsConfig{
FallBackPolicy: &loadBalancingConfig{
Name: "round_robin",
},
Expand All @@ -479,11 +539,6 @@ func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) {
// balancer is registered or not.
builder := balancer.Get(c.FallBackPolicy.Name)
x.fallbackLB = builder.Build(x.cc, x.buildOpts)
if x.fallbackInitData != nil {
// TODO: uncomment when HandleBalancerConfig API is merged.
//x.fallbackLB.HandleBalancerConfig(c.FallBackPolicy.Config)
x.fallbackLB.HandleResolvedAddrs(x.fallbackInitData.addrs, x.fallbackInitData.err)
}
}

// There are three ways that could lead to fallback:
Expand Down Expand Up @@ -596,7 +651,9 @@ type loadBalancingConfig struct {
}

func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) {
return nil, nil
m := make(map[string]json.RawMessage)
m[l.Name] = l.Config
return json.Marshal(m)
}

func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
Expand Down
Loading