Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 103 additions & 42 deletions balancer/xds/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ package xds
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -131,6 +130,8 @@ type xdsBalancer struct {
xdsLB edsBalancerInterface
fallbackLB balancer.Balancer
fallbackInitData *addressUpdate // may change when HandleResolved address is called

lastResolverUpdate *resolver.State
}

func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) {
Expand Down Expand Up @@ -220,41 +221,52 @@ func (x *xdsBalancer) run() {

func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
switch u := update.(type) {
case *addressUpdate:
if x.fallbackLB != nil {
x.fallbackLB.HandleResolvedAddrs(u.addrs, u.err)
}
x.fallbackInitData = u
case *subConnStateUpdate:
if x.xdsLB != nil {
x.xdsLB.HandleSubConnStateChange(u.sc, u.state)
}
if x.fallbackLB != nil {
x.fallbackLB.HandleSubConnStateChange(u.sc, u.state)
}
case *xdsConfig:
if x.config == nil {
// The first time we get config, we just need to start the xdsClient.
x.startNewXDSClient(u)
x.config = u
return
}
// With a different BalancerName, we need to create a new xdsClient.
// If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient.
// This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't.
if u.BalancerName != x.config.BalancerName || (u.ChildPolicy == nil) != (x.config.ChildPolicy == nil) {
x.startNewXDSClient(u)
case *resolverUpdate:
if u.addrUpdate != nil {
// addresses have been updated.
// in case of x.config == nil where it returns early, we set the fallbackInitData here.
x.fallbackInitData = u.addrUpdate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you still need fallbackInitData?

If fallback balancer is updated, the new addresses are also sent along with the update.
You shouldn't need to keep the old addresses for a new fallback balancer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that fallback balancer is changed at later time (and the addresses are not updated in the same update), so we still need to initialize with the old addresses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, you need this field when switching to fallback.
But this update here shouldn't be necessary. It also causes double address update for a new fallback balancer.

Copy link
Contributor Author

@lyuxuan lyuxuan Apr 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's here because of the early return. Let me think a bit more if we can just move this line inside the early return condition.

}
// We will update the xdsLB with the new child policy, if we got a different one and it's not nil.
// The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time.
if !reflect.DeepEqual(u.ChildPolicy, x.config.ChildPolicy) && u.ChildPolicy != nil && x.xdsLB != nil {
x.xdsLB.HandleChildPolicy(u.ChildPolicy.Name, u.ChildPolicy.Config)
cfg := u.xdsConfig

// service config has been updated.
if cfg != nil {
if x.config == nil {
// The first time we get config, we just need to start the xdsClient.
x.startNewXDSClient(cfg)
x.config = cfg
return
}

// With a different BalancerName, we need to create a new xdsClient.
// If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient.
// This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't.
if cfg.BalancerName != x.config.BalancerName || (cfg.ChildPolicy == nil) != (x.config.ChildPolicy == nil) {
x.startNewXDSClient(cfg)
}
// We will update the xdsLB with the new child policy, if we got a different one and it's not nil.
// The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time.
if !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) && cfg.ChildPolicy != nil && x.xdsLB != nil {
x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
}

if x.fallbackLB != nil && !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) {
x.fallbackLB.Close()
x.startFallBackBalancer(cfg)
}
x.config = cfg
}
if !reflect.DeepEqual(u.FallBackPolicy, x.config.FallBackPolicy) && x.fallbackLB != nil {
x.fallbackLB.Close()
x.startFallBackBalancer(u)

if u.addrUpdate != nil && x.fallbackLB != nil {
x.fallbackLB.HandleResolvedAddrs(u.addrUpdate.addrs, u.addrUpdate.err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fallback balancer is new, it will handle the same addresses twice.

}
x.config = u
default:
// unreachable path
panic("wrong update type")
Expand Down Expand Up @@ -351,6 +363,11 @@ type subConnStateUpdate struct {
state connectivity.State
}

type resolverUpdate struct {
addrUpdate *addressUpdate
xdsConfig *xdsConfig
}

func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
update := &subConnStateUpdate{
sc: sc,
Expand All @@ -363,28 +380,69 @@ func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connec
}

func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
update := &addressUpdate{
addrs: addrs,
err: err,
}
select {
case x.grpcUpdate <- update:
case <-x.ctx.Done():
}
grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs")
}

type serviceConfig struct {
LoadBalancingConfig []*loadBalancingConfig
}

// TODO: once the API is merged, check whether we need to change the function name/signature here.
func (x *xdsBalancer) HandleBalancerConfig(config json.RawMessage) error {
var cfg xdsConfig
if err := json.Unmarshal(config, &cfg); err != nil {
return errors.New("unable to unmarshal balancer config into xds config")
func parseFullServiceConfig(s string) *serviceConfig {
var ret serviceConfig
err := json.Unmarshal([]byte(s), &ret)
if err != nil {
return nil
}
return &ret
}

func (x *xdsBalancer) UpdateResolverState(s resolver.State) {
defer func() {
x.lastResolverUpdate = &s
}()

var update interface{}
// if service config does not change for this update, we only send address update.
if x.lastResolverUpdate != nil && x.lastResolverUpdate.ServiceConfig == s.ServiceConfig {
update = &resolverUpdate{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this function just sent a &resolverUpdate{...} to the executor goroutine, and do all the checks and parsing there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we want to do this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the logic here and later in the goroutine is duplicate.

For example:
Each resolver update, the balancer does:

  1. if addresses are updated, do something for the addresses
  2. if config is updated, do something for the config

Checks like this are done twice.
First time here,

  1. if addresses are updated, resolverUpdate.addrUpdate is set
  2. if config is updated, resolverUpdate.xdsconfig is set

Later in the goroutine

  1. if resolverUpdate.addrUpdate is not nil, do something
  2. if resolverUpdate.xdsconfig is not nil, do something

The code is this way because the resolver update is still treated as two separate things. I think the code should be updated to actually treat them as one.

The update sent to the goroutine can be just

type resolverUpdate struct {
    state resolver.State
}

addrUpdate: &addressUpdate{
addrs: s.Addresses,
},
}
} else {
sc := parseFullServiceConfig(s.ServiceConfig)
var xdsConfigRaw json.RawMessage
for _, lbcfg := range sc.LoadBalancingConfig {
if lbcfg.Name == xdsName {
xdsConfigRaw = lbcfg.Config
break
}
}
var cfg xdsConfig
if err := json.Unmarshal(xdsConfigRaw, &cfg); err != nil {
grpclog.Warningf("unable to unmarshal balancer config %s into xds config", string(xdsConfigRaw))
return
}

// if addresses do not change for this update, we only send service config update.
if x.lastResolverUpdate != nil && reflect.DeepEqual(x.lastResolverUpdate.Addresses, s.Addresses) {
update = &resolverUpdate{
xdsConfig: &cfg,
}
} else {
// Both addresses and service config have changed for this update, send the combined update.
update = &resolverUpdate{
addrUpdate: &addressUpdate{
addrs: s.Addresses,
},
xdsConfig: &cfg,
}
}
}
select {
case x.grpcUpdate <- &cfg:
case x.grpcUpdate <- update:
case <-x.ctx.Done():
}
return nil
}

type cdsResp struct {
Expand Down Expand Up @@ -479,6 +537,7 @@ func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) {
// balancer is registered or not.
builder := balancer.Get(c.FallBackPolicy.Name)
x.fallbackLB = builder.Build(x.cc, x.buildOpts)

if x.fallbackInitData != nil {
// TODO: uncomment when HandleBalancerConfig API is merged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to clean up this at later time. Essentially, it's essentially make fallback balancer which implements V2 API gets both service config and addresses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling that this won't be as trivial as it sounds. It may change the way you handle resolver updates in this PR.

//x.fallbackLB.HandleBalancerConfig(c.FallBackPolicy.Config)
Expand Down Expand Up @@ -596,7 +655,9 @@ type loadBalancingConfig struct {
}

func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) {
return nil, nil
m := make(map[string]json.RawMessage)
m[l.Name] = l.Config
return json.Marshal(m)
}

func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
Expand Down
Loading