Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 122 additions & 62 deletions balancer/xds/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ package xds
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -92,6 +91,11 @@ func (b *xdsBalancerBuilder) Name() string {
return xdsName
}

// closer is the interface that both Balancer and V2Balancer implements
type closer interface {
Close()
}

// edsBalancerInterface defines the interface that edsBalancer must implement to
// communicate with xdsBalancer.
//
Expand Down Expand Up @@ -126,11 +130,12 @@ type xdsBalancer struct {
timer *time.Timer
noSubConnAlert <-chan struct{}

client *client // may change when passed a different service config
config *xdsConfig // may change when passed a different service config
xdsLB edsBalancerInterface
fallbackLB balancer.Balancer
fallbackInitData *addressUpdate // may change when HandleResolved address is called
client *client // may change when passed a different service config
config *xdsConfig // may change when passed a different service config
xdsLB edsBalancerInterface
fallbackLB closer
fallbackInitData *resolver.State // may change when HandleResolved address is called
lastFallbackState []resolver.Address // TODO: should also contains config, but it's not available now.
}

func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) {
Expand Down Expand Up @@ -218,43 +223,87 @@ func (x *xdsBalancer) run() {
}
}

func getBalancerConfig(serviceConfig string) *xdsConfig {
sc := parseFullServiceConfig(serviceConfig)
var xdsConfigRaw json.RawMessage
for _, lbcfg := range sc.LoadBalancingConfig {
if lbcfg.Name == xdsName {
xdsConfigRaw = lbcfg.Config
break
}
}
var cfg xdsConfig
if err := json.Unmarshal(xdsConfigRaw, &cfg); err != nil {
grpclog.Warningf("unable to unmarshal balancer config %s into xds config", string(xdsConfigRaw))
return nil
}
return &cfg
}

func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
switch u := update.(type) {
case *addressUpdate:
if x.fallbackLB != nil {
x.fallbackLB.HandleResolvedAddrs(u.addrs, u.err)
}
x.fallbackInitData = u
case *subConnStateUpdate:
if x.xdsLB != nil {
x.xdsLB.HandleSubConnStateChange(u.sc, u.state)
x.xdsLB.HandleSubConnStateChange(u.sc, u.state.ConnectivityState)
}
if x.fallbackLB != nil {
x.fallbackLB.HandleSubConnStateChange(u.sc, u.state)
switch lb := x.fallbackLB.(type) {
case balancer.V2Balancer:
lb.UpdateSubConnState(u.sc, u.state)
case balancer.Balancer:
lb.HandleSubConnStateChange(u.sc, u.state.ConnectivityState)
default:
grpclog.Error("unexpected balancer type")
}
}
case *xdsConfig:
if x.config == nil {
// The first time we get config, we just need to start the xdsClient.
x.startNewXDSClient(u)
x.config = u
case *resolver.State:
cfg := getBalancerConfig(u.ServiceConfig)
if cfg == nil {
// service config parsing failed. should never happen. And this parsing will be removed, once
// we support service config validation.
return
}
// With a different BalancerName, we need to create a new xdsClient.
// If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient.
// This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't.
if u.BalancerName != x.config.BalancerName || (u.ChildPolicy == nil) != (x.config.ChildPolicy == nil) {
x.startNewXDSClient(u)
defer func() {
x.config = cfg
x.lastFallbackState = u.Addresses
}()
x.fallbackInitData = &resolver.State{
Addresses: u.Addresses,
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
// we can pass along the config struct.
}
// We will update the xdsLB with the new child policy, if we got a different one and it's not nil.
// The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time.
if !reflect.DeepEqual(u.ChildPolicy, x.config.ChildPolicy) && u.ChildPolicy != nil && x.xdsLB != nil {
x.xdsLB.HandleChildPolicy(u.ChildPolicy.Name, u.ChildPolicy.Config)

var fallbackChanged bool
// service config has been updated.
if cfg != x.config {
if x.config == nil {
// The first time we get config, we just need to start the xdsClient.
x.startNewXDSClient(cfg)
return
}

// With a different BalancerName, we need to create a new xdsClient.
// If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient.
// This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't.
if cfg.BalancerName != x.config.BalancerName || (cfg.ChildPolicy == nil) != (x.config.ChildPolicy == nil) {
x.startNewXDSClient(cfg)
}
// We will update the xdsLB with the new child policy, if we got a different one and it's not nil.
// The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time.
if !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) && cfg.ChildPolicy != nil && x.xdsLB != nil {
x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
}

if x.fallbackLB != nil && !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) {
x.fallbackLB.Close()
x.buildFallBackBalancer(cfg)
fallbackChanged = true
}
}
if !reflect.DeepEqual(u.FallBackPolicy, x.config.FallBackPolicy) && x.fallbackLB != nil {
x.fallbackLB.Close()
x.startFallBackBalancer(u)

if x.fallbackLB != nil && (!reflect.DeepEqual(x.lastFallbackState, u.Addresses) || fallbackChanged) {
x.updateFallbackWithResolverState(x.fallbackInitData)
}
x.config = u
default:
// unreachable path
panic("wrong update type")
Expand Down Expand Up @@ -341,17 +390,20 @@ func (w *xdsClientConn) UpdateBalancerState(s connectivity.State, p balancer.Pic
w.ClientConn.UpdateBalancerState(s, p)
}

type addressUpdate struct {
addrs []resolver.Address
err error
}

type subConnStateUpdate struct {
sc balancer.SubConn
state connectivity.State
state balancer.SubConnState
}

func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange")
}

func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs")
}

func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
update := &subConnStateUpdate{
sc: sc,
state: state,
Expand All @@ -362,29 +414,24 @@ func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connec
}
}

func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
update := &addressUpdate{
addrs: addrs,
err: err,
}
select {
case x.grpcUpdate <- update:
case <-x.ctx.Done():
}
type serviceConfig struct {
LoadBalancingConfig []*loadBalancingConfig
}

// TODO: once the API is merged, check whether we need to change the function name/signature here.
func (x *xdsBalancer) HandleBalancerConfig(config json.RawMessage) error {
var cfg xdsConfig
if err := json.Unmarshal(config, &cfg); err != nil {
return errors.New("unable to unmarshal balancer config into xds config")
func parseFullServiceConfig(s string) *serviceConfig {
var ret serviceConfig
err := json.Unmarshal([]byte(s), &ret)
if err != nil {
return nil
}
return &ret
}

func (x *xdsBalancer) UpdateResolverState(s resolver.State) {
select {
case x.grpcUpdate <- &cfg:
case x.grpcUpdate <- &s:
case <-x.ctx.Done():
}
return nil
}

type cdsResp struct {
Expand Down Expand Up @@ -441,10 +488,26 @@ func (x *xdsBalancer) switchFallback() {
x.xdsLB.Close()
x.xdsLB = nil
}
x.startFallBackBalancer(x.config)
x.buildFallBackBalancer(x.config)
x.updateFallbackWithResolverState(x.fallbackInitData)
x.cancelFallbackMonitoring()
}

func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) {
switch lb := x.fallbackLB.(type) {
case balancer.V2Balancer:
lb.UpdateResolverState(resolver.State{
Addresses: s.Addresses,
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
// we can pass along the config struct.
})
case balancer.Balancer:
lb.HandleResolvedAddrs(s.Addresses, nil)
default:
grpclog.Error("unexpected balancer type")
}
}

// x.cancelFallbackAndSwitchEDSBalancerIfNecessary() will be no-op if we have a working xds client.
// It will cancel fallback monitoring if we are in fallback monitoring stage.
// If there's no running edsBalancer currently, it will create one and initialize it. Also, it will
Expand All @@ -466,9 +529,9 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
}
}

func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) {
func (x *xdsBalancer) buildFallBackBalancer(c *xdsConfig) {
if c.FallBackPolicy == nil {
x.startFallBackBalancer(&xdsConfig{
x.buildFallBackBalancer(&xdsConfig{
FallBackPolicy: &loadBalancingConfig{
Name: "round_robin",
},
Expand All @@ -479,11 +542,6 @@ func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) {
// balancer is registered or not.
builder := balancer.Get(c.FallBackPolicy.Name)
x.fallbackLB = builder.Build(x.cc, x.buildOpts)
if x.fallbackInitData != nil {
// TODO: uncomment when HandleBalancerConfig API is merged.
//x.fallbackLB.HandleBalancerConfig(c.FallBackPolicy.Config)
x.fallbackLB.HandleResolvedAddrs(x.fallbackInitData.addrs, x.fallbackInitData.err)
}
}

// There are three ways that could lead to fallback:
Expand Down Expand Up @@ -596,7 +654,9 @@ type loadBalancingConfig struct {
}

func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) {
return nil, nil
m := make(map[string]json.RawMessage)
m[l.Name] = l.Config
return json.Marshal(m)
}

func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
Expand Down
Loading