From cb73f9493b7ce8155c6607a4a17b4562d9393bd5 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Fri, 19 Apr 2019 13:50:25 -0700 Subject: [PATCH 1/8] xds: support BalancerV2 api --- balancer/xds/xds.go | 108 ++++++++++++++++++++++++++++++++------- balancer/xds/xds_test.go | 98 +++++++++++++++++++---------------- 2 files changed, 143 insertions(+), 63 deletions(-) diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index 7795724f4114..d585047aba51 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -25,7 +25,6 @@ package xds import ( "context" "encoding/json" - "errors" "fmt" "reflect" "sync" @@ -131,6 +130,8 @@ type xdsBalancer struct { xdsLB edsBalancerInterface fallbackLB balancer.Balancer fallbackInitData *addressUpdate // may change when HandleResolved address is called + + lastResolverUpdate *resolver.State } func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) { @@ -254,7 +255,37 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { x.fallbackLB.Close() x.startFallBackBalancer(u) } - x.config = u + case *resolverUpdate: + // in case of x.config == nil where it returns early, we set the fallbackInitData here. + x.fallbackInitData = u.addrUpdate + cfg := u.xdsConfig + if x.config == nil { + // The first time we get config, we just need to start the xdsClient. + x.startNewXDSClient(cfg) + x.config = cfg + return + } + // With a different BalancerName, we need to create a new xdsClient. + // If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient. + // This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't. + if cfg.BalancerName != x.config.BalancerName || (cfg.ChildPolicy == nil) != (x.config.ChildPolicy == nil) { + x.startNewXDSClient(cfg) + } + // We will update the xdsLB with the new child policy, if we got a different one and it's not nil. + // The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time. + if !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) && cfg.ChildPolicy != nil && x.xdsLB != nil { + x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) + } + + if x.fallbackLB != nil { + if !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) { + x.fallbackLB.Close() + x.startFallBackBalancer(cfg) + } else { + x.fallbackLB.HandleResolvedAddrs(u.addrUpdate.addrs, u.addrUpdate.err) + } + } + x.config = cfg default: // unreachable path panic("wrong update type") @@ -351,6 +382,11 @@ type subConnStateUpdate struct { state connectivity.State } +type resolverUpdate struct { + addrUpdate *addressUpdate + xdsConfig *xdsConfig +} + func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { update := &subConnStateUpdate{ sc: sc, @@ -363,28 +399,61 @@ func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connec } func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { - update := &addressUpdate{ - addrs: addrs, - err: err, - } - select { - case x.grpcUpdate <- update: - case <-x.ctx.Done(): - } + grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs") +} + +type serviceConfig struct { + LoadBalancingConfig []*loadBalancingConfig } -// TODO: once the API is merged, check whether we need to change the function name/signature here. -func (x *xdsBalancer) HandleBalancerConfig(config json.RawMessage) error { - var cfg xdsConfig - if err := json.Unmarshal(config, &cfg); err != nil { - return errors.New("unable to unmarshal balancer config into xds config") +func parseFullServiceConfig(s string) *serviceConfig { + var ret serviceConfig + err := json.Unmarshal([]byte(s), &ret) + if err != nil { + return nil } + return &ret +} + +func (x *xdsBalancer) UpdateResolverState(s resolver.State) { + var update interface{} + // if service config does not change for this update, we only send address update. + if x.lastResolverUpdate != nil && x.lastResolverUpdate.ServiceConfig == s.ServiceConfig { + update = &addressUpdate{ + addrs: s.Addresses, + } + } else { + sc := parseFullServiceConfig(s.ServiceConfig) + var xdsConfigRaw json.RawMessage + for _, lbcfg := range sc.LoadBalancingConfig { + if lbcfg.Name == xdsName { + xdsConfigRaw = lbcfg.Config + break + } + } + var cfg xdsConfig + if err := json.Unmarshal(xdsConfigRaw, &cfg); err != nil { + grpclog.Warningf("unable to unmarshal balancer config %s into xds config", string(xdsConfigRaw)) + return + } + // if addresses do not change for this update, we only send service config update. + if x.lastResolverUpdate != nil && reflect.DeepEqual(x.lastResolverUpdate.Addresses, s.Addresses) { + update = &cfg + } else { + // Both addresses and service config have changed for this update, send the combined update. + update = &resolverUpdate{ + addrUpdate: &addressUpdate{ + addrs: s.Addresses, + }, + xdsConfig: &cfg, + } + } + } select { - case x.grpcUpdate <- &cfg: + case x.grpcUpdate <- update: case <-x.ctx.Done(): } - return nil } type cdsResp struct { @@ -479,6 +548,7 @@ func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) { // balancer is registered or not. builder := balancer.Get(c.FallBackPolicy.Name) x.fallbackLB = builder.Build(x.cc, x.buildOpts) + if x.fallbackInitData != nil { // TODO: uncomment when HandleBalancerConfig API is merged. //x.fallbackLB.HandleBalancerConfig(c.FallBackPolicy.Config) @@ -596,7 +666,9 @@ type loadBalancingConfig struct { } func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) { - return nil, nil + m := make(map[string]json.RawMessage) + m[l.Name] = l.Config + return json.Marshal(m) } func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error { diff --git a/balancer/xds/xds_test.go b/balancer/xds/xds_test.go index abe2e1b301ea..0e3972b9f22e 100644 --- a/balancer/xds/xds_test.go +++ b/balancer/xds/xds_test.go @@ -64,12 +64,13 @@ const ( ) var ( - testBalancerNameFooBar = "foo.bar" - testBalancerConfigFooBar, _ = json.Marshal(&testBalancerConfig{ + testBalancerNameFooBar = "foo.bar" + testServiceConfigFooBar = constructServiceConfigFromXdsConfig(&testBalancerConfig{ BalancerName: testBalancerNameFooBar, ChildPolicy: []lbPolicy{fakeBalancerA}, FallbackPolicy: []lbPolicy{fakeBalancerA}, }) + specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"} specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"} @@ -95,6 +96,19 @@ func (l lbPolicy) MarshalJSON() ([]byte, error) { return json.Marshal(m) } +func constructServiceConfigFromXdsConfig(xdsCfg *testBalancerConfig) string { + cfgRaw, _ := json.Marshal(xdsCfg) + sc, _ := json.Marshal(&serviceConfig{ + LoadBalancingConfig: []*loadBalancingConfig{ + { + Name: xdsName, + Config: cfgRaw, + }, + }, + }) + return string(sc) +} + type balancerABuilder struct { mu sync.Mutex lastBalancer *balancerA @@ -264,19 +278,19 @@ func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) { t.Fatalf("unable to type assert to *xdsBalancer") } defer lb.Close() - if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil { - t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err) - } addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} for i := 0; i < 3; i++ { - lb.HandleResolvedAddrs(addrs, nil) + lb.UpdateResolverState(resolver.State{ + Addresses: addrs, + ServiceConfig: string(testServiceConfigFooBar), + }) select { case nsc := <-cc.newSubConns: if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) { t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA)) } case <-time.After(2 * time.Second): - t.Fatalf("timeout when geting new subconn result") + t.Fatal("timeout when geting new subconn result", i) } addrs = addrs[:2-i] } @@ -298,11 +312,11 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { t.Fatalf("unable to type assert to *xdsBalancer") } defer lb.Close() - if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil { - t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err) - } addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} - lb.HandleResolvedAddrs(addrs, nil) + lb.UpdateResolverState(resolver.State{ + Addresses: addrs, + ServiceConfig: string(testServiceConfigFooBar), + }) // verify fallback takes over select { @@ -325,15 +339,15 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { for i := 0; i < 2; i++ { addr, td, cleanup := setupServer(t) cleanups = append(cleanups, cleanup) - workingBalancerConfig, _ := json.Marshal(&testBalancerConfig{ + workingServiceConfig := constructServiceConfigFromXdsConfig(&testBalancerConfig{ BalancerName: addr, ChildPolicy: []lbPolicy{fakeBalancerA}, FallbackPolicy: []lbPolicy{fakeBalancerA}, }) - - if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { - t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) - } + lb.UpdateResolverState(resolver.State{ + Addresses: addrs, + ServiceConfig: string(workingServiceConfig), + }) td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) var j int @@ -415,11 +429,10 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { addr, td, cleanup := setupServer(t) cleanups = append(cleanups, cleanup) test.cfg.BalancerName = addr - workingBalancerConfig, _ := json.Marshal(test.cfg) - if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { - t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) - } + lb.UpdateResolverState(resolver.State{ + ServiceConfig: constructServiceConfigFromXdsConfig(test.cfg), + }) if test.responseToSend != nil { td.sendResp(&response{resp: test.responseToSend}) } @@ -468,18 +481,14 @@ func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) { ChildPolicy: []lbPolicy{fakeBalancerA}, FallbackPolicy: []lbPolicy{fakeBalancerA}, } - workingBalancerConfig, _ := json.Marshal(cfg) - - if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { - t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) - } + lb.UpdateResolverState(resolver.State{ + ServiceConfig: constructServiceConfigFromXdsConfig(cfg), + }) cfg.FallbackPolicy = []lbPolicy{fakeBalancerB} - workingBalancerConfig, _ = json.Marshal(cfg) - - if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { - t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) - } + lb.UpdateResolverState(resolver.State{ + ServiceConfig: constructServiceConfigFromXdsConfig(cfg), + }) td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) @@ -497,7 +506,10 @@ func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) { cleanup() addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} - lb.HandleResolvedAddrs(addrs, nil) + lb.UpdateResolverState(resolver.State{ + Addresses: addrs, + ServiceConfig: constructServiceConfigFromXdsConfig(cfg), + }) // verify fallback balancer B takes over select { @@ -510,10 +522,10 @@ func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) { } cfg.FallbackPolicy = []lbPolicy{fakeBalancerA} - workingBalancerConfig, _ = json.Marshal(cfg) - if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { - t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) - } + lb.UpdateResolverState(resolver.State{ + Addresses: addrs, + ServiceConfig: constructServiceConfigFromXdsConfig(cfg), + }) // verify fallback balancer A takes over select { @@ -548,11 +560,9 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { ChildPolicy: []lbPolicy{fakeBalancerA}, FallbackPolicy: []lbPolicy{fakeBalancerA}, } - workingBalancerConfig, _ := json.Marshal(cfg) - - if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { - t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) - } + lb.UpdateResolverState(resolver.State{ + ServiceConfig: constructServiceConfigFromXdsConfig(cfg), + }) td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) @@ -630,11 +640,9 @@ func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) { ChildPolicy: []lbPolicy{fakeBalancerA}, FallbackPolicy: []lbPolicy{fakeBalancerA}, } - workingBalancerConfig, _ := json.Marshal(cfg) - - if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { - t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) - } + lb.UpdateResolverState(resolver.State{ + ServiceConfig: constructServiceConfigFromXdsConfig(cfg), + }) td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) From 0b2003fba6e0bac89d4ec2b26de217b12b562d81 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Fri, 19 Apr 2019 14:07:23 -0700 Subject: [PATCH 2/8] set lastResolverUpdate --- balancer/xds/xds.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index d585047aba51..db65bde8060f 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -416,6 +416,10 @@ func parseFullServiceConfig(s string) *serviceConfig { } func (x *xdsBalancer) UpdateResolverState(s resolver.State) { + defer func() { + x.lastResolverUpdate = &s + }() + var update interface{} // if service config does not change for this update, we only send address update. if x.lastResolverUpdate != nil && x.lastResolverUpdate.ServiceConfig == s.ServiceConfig { From f8931bf2f3dccb45d6174ec8a9494cf60920807e Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Fri, 19 Apr 2019 14:54:57 -0700 Subject: [PATCH 3/8] minor fix --- balancer/xds/xds.go | 1 + 1 file changed, 1 insertion(+) diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index db65bde8060f..5d6391f0b6ac 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -255,6 +255,7 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { x.fallbackLB.Close() x.startFallBackBalancer(u) } + x.config = u case *resolverUpdate: // in case of x.config == nil where it returns early, we set the fallbackInitData here. x.fallbackInitData = u.addrUpdate From e681661ee69f6f319593d436553c57e9f23624d0 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Mon, 22 Apr 2019 16:01:15 -0700 Subject: [PATCH 4/8] fix reviews --- balancer/xds/xds.go | 94 +++++++++++++++++----------------------- balancer/xds/xds_test.go | 81 ++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 55 deletions(-) diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index 5d6391f0b6ac..4313e3f3446b 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -221,11 +221,6 @@ func (x *xdsBalancer) run() { func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { switch u := update.(type) { - case *addressUpdate: - if x.fallbackLB != nil { - x.fallbackLB.HandleResolvedAddrs(u.addrs, u.err) - } - x.fallbackInitData = u case *subConnStateUpdate: if x.xdsLB != nil { x.xdsLB.HandleSubConnStateChange(u.sc, u.state) @@ -233,60 +228,45 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { if x.fallbackLB != nil { x.fallbackLB.HandleSubConnStateChange(u.sc, u.state) } - case *xdsConfig: - if x.config == nil { - // The first time we get config, we just need to start the xdsClient. - x.startNewXDSClient(u) - x.config = u - return - } - // With a different BalancerName, we need to create a new xdsClient. - // If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient. - // This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't. - if u.BalancerName != x.config.BalancerName || (u.ChildPolicy == nil) != (x.config.ChildPolicy == nil) { - x.startNewXDSClient(u) - } - // We will update the xdsLB with the new child policy, if we got a different one and it's not nil. - // The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time. - if !reflect.DeepEqual(u.ChildPolicy, x.config.ChildPolicy) && u.ChildPolicy != nil && x.xdsLB != nil { - x.xdsLB.HandleChildPolicy(u.ChildPolicy.Name, u.ChildPolicy.Config) - } - if !reflect.DeepEqual(u.FallBackPolicy, x.config.FallBackPolicy) && x.fallbackLB != nil { - x.fallbackLB.Close() - x.startFallBackBalancer(u) - } - x.config = u case *resolverUpdate: - // in case of x.config == nil where it returns early, we set the fallbackInitData here. - x.fallbackInitData = u.addrUpdate - cfg := u.xdsConfig - if x.config == nil { - // The first time we get config, we just need to start the xdsClient. - x.startNewXDSClient(cfg) - x.config = cfg - return - } - // With a different BalancerName, we need to create a new xdsClient. - // If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient. - // This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't. - if cfg.BalancerName != x.config.BalancerName || (cfg.ChildPolicy == nil) != (x.config.ChildPolicy == nil) { - x.startNewXDSClient(cfg) - } - // We will update the xdsLB with the new child policy, if we got a different one and it's not nil. - // The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time. - if !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) && cfg.ChildPolicy != nil && x.xdsLB != nil { - x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) + if u.addrUpdate != nil { + // addresses have been updated. + // in case of x.config == nil where it returns early, we set the fallbackInitData here. + x.fallbackInitData = u.addrUpdate } + cfg := u.xdsConfig - if x.fallbackLB != nil { - if !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) { + // service config has been updated. + if cfg != nil { + if x.config == nil { + // The first time we get config, we just need to start the xdsClient. + x.startNewXDSClient(cfg) + x.config = cfg + return + } + + // With a different BalancerName, we need to create a new xdsClient. + // If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient. + // This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't. + if cfg.BalancerName != x.config.BalancerName || (cfg.ChildPolicy == nil) != (x.config.ChildPolicy == nil) { + x.startNewXDSClient(cfg) + } + // We will update the xdsLB with the new child policy, if we got a different one and it's not nil. + // The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time. + if !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) && cfg.ChildPolicy != nil && x.xdsLB != nil { + x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) + } + + if x.fallbackLB != nil && !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) { x.fallbackLB.Close() x.startFallBackBalancer(cfg) - } else { - x.fallbackLB.HandleResolvedAddrs(u.addrUpdate.addrs, u.addrUpdate.err) } + x.config = cfg + } + + if u.addrUpdate != nil && x.fallbackLB != nil { + x.fallbackLB.HandleResolvedAddrs(u.addrUpdate.addrs, u.addrUpdate.err) } - x.config = cfg default: // unreachable path panic("wrong update type") @@ -424,8 +404,10 @@ func (x *xdsBalancer) UpdateResolverState(s resolver.State) { var update interface{} // if service config does not change for this update, we only send address update. if x.lastResolverUpdate != nil && x.lastResolverUpdate.ServiceConfig == s.ServiceConfig { - update = &addressUpdate{ - addrs: s.Addresses, + update = &resolverUpdate{ + addrUpdate: &addressUpdate{ + addrs: s.Addresses, + }, } } else { sc := parseFullServiceConfig(s.ServiceConfig) @@ -444,7 +426,9 @@ func (x *xdsBalancer) UpdateResolverState(s resolver.State) { // if addresses do not change for this update, we only send service config update. if x.lastResolverUpdate != nil && reflect.DeepEqual(x.lastResolverUpdate.Addresses, s.Addresses) { - update = &cfg + update = &resolverUpdate{ + xdsConfig: &cfg, + } } else { // Both addresses and service config have changed for this update, send the combined update. update = &resolverUpdate{ diff --git a/balancer/xds/xds_test.go b/balancer/xds/xds_test.go index 0e3972b9f22e..5a8d02865954 100644 --- a/balancer/xds/xds_test.go +++ b/balancer/xds/xds_test.go @@ -718,3 +718,84 @@ func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) { t.Fatalf("got fallback policy %v, want %v", xdsCfg.FallBackPolicy, wantFallbackPolicy) } } + +func (s) TestXdsFullServiceConfigParsing(t *testing.T) { + tests := []struct { + name string + s string + want *serviceConfig + }{ + { + name: "empty", + s: "", + want: nil, + }, + { + name: "success1", + s: `{"loadBalancingConfig":[{"xds":{"childPolicy":[{"pick_first":{}}]}}]}`, + want: &serviceConfig{ + LoadBalancingConfig: []*loadBalancingConfig{ + {"xds", json.RawMessage(`{"childPolicy":[{"pick_first":{}}]}`)}, + }, + }, + }, + { + name: "success2", + s: `{"loadBalancingConfig":[{"xds":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`, + want: &serviceConfig{ + LoadBalancingConfig: []*loadBalancingConfig{ + {"xds", json.RawMessage(`{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`)}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := parseFullServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) { + t.Errorf("test name: %s, parseFullServiceConfig() = %+v, want %+v", tt.name, got, tt.want) + } + }) + } +} + +func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) { + tests := []struct { + name string + s string + want *xdsConfig + }{ + { + name: "empty", + s: "{}", + want: &xdsConfig{}, + }, + { + name: "success1", + s: `{"childPolicy":[{"pick_first":{}}]}`, + want: &xdsConfig{ + ChildPolicy: &loadBalancingConfig{ + Name: "pick_first", + Config: json.RawMessage(`{}`), + }, + }, + }, + { + name: "success2", + s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`, + want: &xdsConfig{ + ChildPolicy: &loadBalancingConfig{ + Name: "round_robin", + Config: json.RawMessage(`{}`), + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var cfg xdsConfig + if err := json.Unmarshal([]byte(tt.s), &cfg); err != nil || !reflect.DeepEqual(&cfg, tt.want) { + t.Errorf("test name: %s, parseFullServiceConfig() = %+v, err: %v, want %+v, ", tt.name, cfg, err, tt.want) + } + }) + } +} From ba0b46c8b111f4a34b72fb0023db0f6201d6c88a Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Thu, 25 Apr 2019 11:35:09 -0700 Subject: [PATCH 5/8] fix reviews, and use UpdateSubConnState --- balancer/xds/xds.go | 173 +++++++++++++++++++-------------------- balancer/xds/xds_test.go | 8 +- 2 files changed, 90 insertions(+), 91 deletions(-) diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index 4313e3f3446b..586e13804694 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -91,6 +91,11 @@ func (b *xdsBalancerBuilder) Name() string { return xdsName } +// closer is the interface that both Balancer and V2Balancer implements +type closer interface { + Close() +} + // edsBalancerInterface defines the interface that edsBalancer must implement to // communicate with xdsBalancer. // @@ -125,13 +130,12 @@ type xdsBalancer struct { timer *time.Timer noSubConnAlert <-chan struct{} - client *client // may change when passed a different service config - config *xdsConfig // may change when passed a different service config - xdsLB edsBalancerInterface - fallbackLB balancer.Balancer - fallbackInitData *addressUpdate // may change when HandleResolved address is called - - lastResolverUpdate *resolver.State + client *client // may change when passed a different service config + config *xdsConfig // may change when passed a different service config + xdsLB edsBalancerInterface + fallbackLB closer + fallbackInitData *resolver.State // may change when HandleResolved address is called + lastFallbackState []resolver.Address // TODO: should also contains config, but it's not available now. } func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) { @@ -219,29 +223,62 @@ func (x *xdsBalancer) run() { } } +func getBalancerConfig(serviceConfig string) *xdsConfig { + sc := parseFullServiceConfig(serviceConfig) + var xdsConfigRaw json.RawMessage + for _, lbcfg := range sc.LoadBalancingConfig { + if lbcfg.Name == xdsName { + xdsConfigRaw = lbcfg.Config + break + } + } + var cfg xdsConfig + if err := json.Unmarshal(xdsConfigRaw, &cfg); err != nil { + grpclog.Warningf("unable to unmarshal balancer config %s into xds config", string(xdsConfigRaw)) + return nil + } + return &cfg +} + func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { switch u := update.(type) { case *subConnStateUpdate: if x.xdsLB != nil { - x.xdsLB.HandleSubConnStateChange(u.sc, u.state) + x.xdsLB.HandleSubConnStateChange(u.sc, u.state.ConnectivityState) } if x.fallbackLB != nil { - x.fallbackLB.HandleSubConnStateChange(u.sc, u.state) + switch lb := x.fallbackLB.(type) { + case balancer.V2Balancer: + lb.UpdateSubConnState(u.sc, u.state) + case balancer.Balancer: + lb.HandleSubConnStateChange(u.sc, u.state.ConnectivityState) + default: + grpclog.Error("unexpected balancer type") + } } - case *resolverUpdate: - if u.addrUpdate != nil { - // addresses have been updated. - // in case of x.config == nil where it returns early, we set the fallbackInitData here. - x.fallbackInitData = u.addrUpdate + case *resolver.State: + cfg := getBalancerConfig(u.ServiceConfig) + if cfg == nil { + // service config parsing failed. should never happen. And this parsing will be removed, once + // we support service config validation. + return + } + defer func() { + x.config = cfg + x.lastFallbackState = u.Addresses + }() + x.fallbackInitData = &resolver.State{ + Addresses: u.Addresses, + // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where + // we can pass along the config struct. } - cfg := u.xdsConfig + var fallbackChanged bool // service config has been updated. - if cfg != nil { + if cfg != x.config { if x.config == nil { // The first time we get config, we just need to start the xdsClient. x.startNewXDSClient(cfg) - x.config = cfg return } @@ -259,13 +296,13 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { if x.fallbackLB != nil && !reflect.DeepEqual(cfg.FallBackPolicy, x.config.FallBackPolicy) { x.fallbackLB.Close() - x.startFallBackBalancer(cfg) + x.buildFallBackBalancer(cfg) + fallbackChanged = true } - x.config = cfg } - if u.addrUpdate != nil && x.fallbackLB != nil { - x.fallbackLB.HandleResolvedAddrs(u.addrUpdate.addrs, u.addrUpdate.err) + if x.fallbackLB != nil && (!reflect.DeepEqual(x.lastFallbackState, u.Addresses) || fallbackChanged) { + x.updateFallbackWithResolverState(x.fallbackInitData) } default: // unreachable path @@ -353,22 +390,20 @@ func (w *xdsClientConn) UpdateBalancerState(s connectivity.State, p balancer.Pic w.ClientConn.UpdateBalancerState(s, p) } -type addressUpdate struct { - addrs []resolver.Address - err error -} - type subConnStateUpdate struct { sc balancer.SubConn - state connectivity.State + state balancer.SubConnState +} + +func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange") } -type resolverUpdate struct { - addrUpdate *addressUpdate - xdsConfig *xdsConfig +func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs") } -func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { +func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { update := &subConnStateUpdate{ sc: sc, state: state, @@ -379,10 +414,6 @@ func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connec } } -func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { - grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs") -} - type serviceConfig struct { LoadBalancingConfig []*loadBalancingConfig } @@ -397,50 +428,8 @@ func parseFullServiceConfig(s string) *serviceConfig { } func (x *xdsBalancer) UpdateResolverState(s resolver.State) { - defer func() { - x.lastResolverUpdate = &s - }() - - var update interface{} - // if service config does not change for this update, we only send address update. - if x.lastResolverUpdate != nil && x.lastResolverUpdate.ServiceConfig == s.ServiceConfig { - update = &resolverUpdate{ - addrUpdate: &addressUpdate{ - addrs: s.Addresses, - }, - } - } else { - sc := parseFullServiceConfig(s.ServiceConfig) - var xdsConfigRaw json.RawMessage - for _, lbcfg := range sc.LoadBalancingConfig { - if lbcfg.Name == xdsName { - xdsConfigRaw = lbcfg.Config - break - } - } - var cfg xdsConfig - if err := json.Unmarshal(xdsConfigRaw, &cfg); err != nil { - grpclog.Warningf("unable to unmarshal balancer config %s into xds config", string(xdsConfigRaw)) - return - } - - // if addresses do not change for this update, we only send service config update. - if x.lastResolverUpdate != nil && reflect.DeepEqual(x.lastResolverUpdate.Addresses, s.Addresses) { - update = &resolverUpdate{ - xdsConfig: &cfg, - } - } else { - // Both addresses and service config have changed for this update, send the combined update. - update = &resolverUpdate{ - addrUpdate: &addressUpdate{ - addrs: s.Addresses, - }, - xdsConfig: &cfg, - } - } - } select { - case x.grpcUpdate <- update: + case x.grpcUpdate <- &s: case <-x.ctx.Done(): } } @@ -499,10 +488,26 @@ func (x *xdsBalancer) switchFallback() { x.xdsLB.Close() x.xdsLB = nil } - x.startFallBackBalancer(x.config) + x.buildFallBackBalancer(x.config) + x.updateFallbackWithResolverState(x.fallbackInitData) x.cancelFallbackMonitoring() } +func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) { + switch lb := x.fallbackLB.(type) { + case balancer.V2Balancer: + lb.UpdateResolverState(resolver.State{ + Addresses: s.Addresses, + // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where + // we can pass along the config struct. + }) + case balancer.Balancer: + lb.HandleResolvedAddrs(s.Addresses, nil) + default: + grpclog.Error("unexpected balancer type") + } +} + // x.cancelFallbackAndSwitchEDSBalancerIfNecessary() will be no-op if we have a working xds client. // It will cancel fallback monitoring if we are in fallback monitoring stage. // If there's no running edsBalancer currently, it will create one and initialize it. Also, it will @@ -524,9 +529,9 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() { } } -func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) { +func (x *xdsBalancer) buildFallBackBalancer(c *xdsConfig) { if c.FallBackPolicy == nil { - x.startFallBackBalancer(&xdsConfig{ + x.buildFallBackBalancer(&xdsConfig{ FallBackPolicy: &loadBalancingConfig{ Name: "round_robin", }, @@ -537,12 +542,6 @@ func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) { // balancer is registered or not. builder := balancer.Get(c.FallBackPolicy.Name) x.fallbackLB = builder.Build(x.cc, x.buildOpts) - - if x.fallbackInitData != nil { - // TODO: uncomment when HandleBalancerConfig API is merged. - //x.fallbackLB.HandleBalancerConfig(c.FallBackPolicy.Config) - x.fallbackLB.HandleResolvedAddrs(x.fallbackInitData.addrs, x.fallbackInitData.err) - } } // There are three ways that could lead to fallback: diff --git a/balancer/xds/xds_test.go b/balancer/xds/xds_test.go index 5a8d02865954..2654feb8eb43 100644 --- a/balancer/xds/xds_test.go +++ b/balancer/xds/xds_test.go @@ -574,7 +574,7 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { var i int for i = 0; i < 10; i++ { if edsLB := getLatestEdsBalancer(); edsLB != nil { - lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state) + lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state}) select { case scsc := <-edsLB.subconnStateChange: if !reflect.DeepEqual(scsc, expectedScStateChange) { @@ -600,7 +600,7 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { // fallback balancer A takes over for i = 0; i < 10; i++ { if fblb := lbABuilder.getLastBalancer(); fblb != nil { - lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state) + lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state}) select { case scsc := <-fblb.subconnStateChange: if !reflect.DeepEqual(scsc, expectedScStateChange) { @@ -654,7 +654,7 @@ func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) { var i int for i = 0; i < 10; i++ { if edsLB := getLatestEdsBalancer(); edsLB != nil { - lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state) + lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state}) select { case scsc := <-edsLB.subconnStateChange: if !reflect.DeepEqual(scsc, expectedScStateChange) { @@ -680,7 +680,7 @@ func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) { // fallback balancer A takes over for i = 0; i < 10; i++ { if fblb := lbABuilder.getLastBalancer(); fblb != nil { - lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state) + lb.UpdateSubConnState(expectedScStateChange.sc, balancer.SubConnState{ConnectivityState: expectedScStateChange.state}) select { case scsc := <-fblb.subconnStateChange: if !reflect.DeepEqual(scsc, expectedScStateChange) { From 4282eec4356298fa7407216e50135458a450f85c Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Fri, 26 Apr 2019 13:11:49 -0700 Subject: [PATCH 6/8] fix review x2 --- balancer/xds/xds.go | 55 ++++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index 586e13804694..eefa7eb83307 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -91,11 +91,6 @@ func (b *xdsBalancerBuilder) Name() string { return xdsName } -// closer is the interface that both Balancer and V2Balancer implements -type closer interface { - Close() -} - // edsBalancerInterface defines the interface that edsBalancer must implement to // communicate with xdsBalancer. // @@ -130,12 +125,11 @@ type xdsBalancer struct { timer *time.Timer noSubConnAlert <-chan struct{} - client *client // may change when passed a different service config - config *xdsConfig // may change when passed a different service config - xdsLB edsBalancerInterface - fallbackLB closer - fallbackInitData *resolver.State // may change when HandleResolved address is called - lastFallbackState []resolver.Address // TODO: should also contains config, but it's not available now. + client *client // may change when passed a different service config + config *xdsConfig // may change when passed a different service config + xdsLB edsBalancerInterface + fallbackLB balancer.Balancer + fallbackInitData *resolver.State // may change when HandleResolved address is called } func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) { @@ -247,13 +241,10 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { x.xdsLB.HandleSubConnStateChange(u.sc, u.state.ConnectivityState) } if x.fallbackLB != nil { - switch lb := x.fallbackLB.(type) { - case balancer.V2Balancer: + if lb, ok := x.fallbackLB.(balancer.V2Balancer); ok { lb.UpdateSubConnState(u.sc, u.state) - case balancer.Balancer: - lb.HandleSubConnStateChange(u.sc, u.state.ConnectivityState) - default: - grpclog.Error("unexpected balancer type") + } else { + x.fallbackLB.HandleSubConnStateChange(u.sc, u.state.ConnectivityState) } } case *resolver.State: @@ -265,17 +256,16 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { } defer func() { x.config = cfg - x.lastFallbackState = u.Addresses + x.fallbackInitData = &resolver.State{ + Addresses: u.Addresses, + // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where + // we can pass along the config struct. + } }() - x.fallbackInitData = &resolver.State{ - Addresses: u.Addresses, - // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where - // we can pass along the config struct. - } var fallbackChanged bool // service config has been updated. - if cfg != x.config { + if !reflect.DeepEqual(cfg, x.config) { if x.config == nil { // The first time we get config, we just need to start the xdsClient. x.startNewXDSClient(cfg) @@ -290,7 +280,7 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { } // We will update the xdsLB with the new child policy, if we got a different one and it's not nil. // The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time. - if !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) && cfg.ChildPolicy != nil && x.xdsLB != nil { + if x.xdsLB != nil && !reflect.DeepEqual(cfg.ChildPolicy, x.config.ChildPolicy) && cfg.ChildPolicy != nil { x.xdsLB.HandleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config) } @@ -301,8 +291,10 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { } } - if x.fallbackLB != nil && (!reflect.DeepEqual(x.lastFallbackState, u.Addresses) || fallbackChanged) { - x.updateFallbackWithResolverState(x.fallbackInitData) + if x.fallbackLB != nil && (!reflect.DeepEqual(x.fallbackInitData.Addresses, u.Addresses) || fallbackChanged) { + x.updateFallbackWithResolverState(&resolver.State{ + Addresses: u.Addresses, + }) } default: // unreachable path @@ -494,17 +486,14 @@ func (x *xdsBalancer) switchFallback() { } func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) { - switch lb := x.fallbackLB.(type) { - case balancer.V2Balancer: + if lb, ok := x.fallbackLB.(balancer.V2Balancer); ok { lb.UpdateResolverState(resolver.State{ Addresses: s.Addresses, // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where // we can pass along the config struct. }) - case balancer.Balancer: - lb.HandleResolvedAddrs(s.Addresses, nil) - default: - grpclog.Error("unexpected balancer type") + } else { + x.fallbackLB.HandleResolvedAddrs(s.Addresses, nil) } } From 4f30b829fdb550c030f85a05e28bc8c85649c596 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Fri, 26 Apr 2019 16:05:45 -0700 Subject: [PATCH 7/8] fix fix fix --- balancer/xds/xds.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index eefa7eb83307..3f556e5d11df 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -254,14 +254,6 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { // we support service config validation. return } - defer func() { - x.config = cfg - x.fallbackInitData = &resolver.State{ - Addresses: u.Addresses, - // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where - // we can pass along the config struct. - } - }() var fallbackChanged bool // service config has been updated. @@ -269,6 +261,12 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { if x.config == nil { // The first time we get config, we just need to start the xdsClient. x.startNewXDSClient(cfg) + x.config = cfg + x.fallbackInitData = &resolver.State{ + Addresses: u.Addresses, + // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where + // we can pass along the config struct. + } return } @@ -296,6 +294,13 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { Addresses: u.Addresses, }) } + + x.config = cfg + x.fallbackInitData = &resolver.State{ + Addresses: u.Addresses, + // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where + // we can pass along the config struct. + } default: // unreachable path panic("wrong update type") From ef662496233cdf0f41eff1a42eba3039e5473348 Mon Sep 17 00:00:00 2001 From: Yuxuan Li Date: Mon, 29 Apr 2019 14:14:09 -0700 Subject: [PATCH 8/8] ffffffffff --- balancer/xds/xds.go | 3 +++ balancer/xds/xds_test.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index 3f556e5d11df..ba8485a9ddbe 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -219,6 +219,9 @@ func (x *xdsBalancer) run() { func getBalancerConfig(serviceConfig string) *xdsConfig { sc := parseFullServiceConfig(serviceConfig) + if sc == nil { + return nil + } var xdsConfigRaw json.RawMessage for _, lbcfg := range sc.LoadBalancingConfig { if lbcfg.Name == xdsName { diff --git a/balancer/xds/xds_test.go b/balancer/xds/xds_test.go index 2654feb8eb43..be9c4e2fb2aa 100644 --- a/balancer/xds/xds_test.go +++ b/balancer/xds/xds_test.go @@ -290,7 +290,7 @@ func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) { t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA)) } case <-time.After(2 * time.Second): - t.Fatal("timeout when geting new subconn result", i) + t.Fatal("timeout when geting new subconn result") } addrs = addrs[:2-i] }