Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,21 @@ type pfConfig struct {
}

func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
cfg := &pfConfig{}
if err := json.Unmarshal(js, cfg); err != nil {
if !envconfig.PickFirstLBConfig {
// Prior to supporting loadbalancing configuration, the pick_first LB
// policy did not implement the balancer.ConfigParser interface. This
// meant that if a non-empty configuration was passed to it, the service
// config unmarshaling code would throw a warning log, but would
// continue using the pick_first LB policy. The code below ensures the
// same behavior is retained if the env var is not set.
if string(js) != "{}" {
logger.Warningf("Ignoring non-empty balancer configuration %q for the pick_first LB policy", string(js))
}
return nil, nil
}

var cfg pfConfig
if err := json.Unmarshal(js, &cfg); err != nil {
return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
}
return cfg, nil
Expand All @@ -69,7 +82,6 @@ type pickfirstBalancer struct {
state connectivity.State
cc balancer.ClientConn
subConn balancer.SubConn
cfg *pfConfig
}

func (b *pickfirstBalancer) ResolverError(err error) {
Expand Down Expand Up @@ -106,18 +118,17 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return balancer.ErrBadResolverState
}

if state.BalancerConfig != nil {
cfg, ok := state.BalancerConfig.(*pfConfig)
if !ok {
return fmt.Errorf("pickfirstBalancer: received nil or illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
}
b.cfg = cfg
// We don't have to guard this block with the env var because ParseConfig
// already does so.
cfg, ok := state.BalancerConfig.(pfConfig)
if state.BalancerConfig != nil && !ok {
return fmt.Errorf("pickfirstBalancer: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
}

if envconfig.PickFirstLBConfig && b.cfg != nil && b.cfg.ShuffleAddressList {
if cfg.ShuffleAddressList {
addrs = append([]resolver.Address{}, addrs...)
grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}

if b.subConn != nil {
b.cc.UpdateAddresses(b.subConn, addrs)
return nil
Expand Down
103 changes: 103 additions & 0 deletions test/pickfirst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
Expand All @@ -37,6 +38,7 @@ import (
"google.golang.org/grpc/internal/testutils/pickfirst"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
Expand Down Expand Up @@ -468,6 +470,107 @@ func (s) TestPickFirst_ShuffleAddressListDisabled(t *testing.T) {
}
}

// Test config parsing with the env var turned on and off for various scenarios.
func (s) TestPickFirst_ParseConfig_Success(t *testing.T) {
// Install a shuffler that always reverses two entries.
origShuf := grpcrand.Shuffle
defer func() { grpcrand.Shuffle = origShuf }()
grpcrand.Shuffle = func(n int, f func(int, int)) {
if n != 2 {
t.Errorf("Shuffle called with n=%v; want 2", n)
return
}
f(0, 1) // reverse the two addresses
}

tests := []struct {
name string
envVar bool
serviceConfig string
wantFirstAddr bool
}{
{
name: "env var disabled with empty pickfirst config",
envVar: false,
serviceConfig: `{"loadBalancingConfig": [{"pick_first":{}}]}`,
wantFirstAddr: true,
},
{
name: "env var disabled with non-empty good pickfirst config",
envVar: false,
serviceConfig: `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`,
wantFirstAddr: true,
},
{
name: "env var disabled with non-empty bad pickfirst config",
envVar: false,
serviceConfig: `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": 666 }}]}`,
wantFirstAddr: true,
},
{
name: "env var enabled with empty pickfirst config",
envVar: true,
serviceConfig: `{"loadBalancingConfig": [{"pick_first":{}}]}`,
wantFirstAddr: true,
},
{
name: "env var enabled with empty good pickfirst config",
envVar: true,
serviceConfig: `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`,
wantFirstAddr: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Set the env var as specified by the test table.
origPickFirstLBConfig := envconfig.PickFirstLBConfig
envconfig.PickFirstLBConfig = test.envVar
defer func() { envconfig.PickFirstLBConfig = origPickFirstLBConfig }()

// Set up our backends.
cc, r, backends := setupPickFirst(t, 2)
addrs := stubBackendsToResolverAddrs(backends)

r.UpdateState(resolver.State{
ServiceConfig: parseServiceConfig(t, r, test.serviceConfig),
Addresses: addrs,
})

// Some tests expect address shuffling to happen, and indicate that
// by setting wantFirstAddr to false (since our shuffling function
// defined at the top of this test, simply reverses the list of
// addresses provided to it).
wantAddr := addrs[0]
if !test.wantFirstAddr {
wantAddr = addrs[1]
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := pickfirst.CheckRPCsToBackend(ctx, cc, wantAddr); err != nil {
t.Fatal(err)
}
})
}
}

// Test config parsing for a bad service config.
func (s) TestPickFirst_ParseConfig_Failure(t *testing.T) {
origPickFirstLBConfig := envconfig.PickFirstLBConfig
envconfig.PickFirstLBConfig = true
defer func() { envconfig.PickFirstLBConfig = origPickFirstLBConfig }()

// Service config should fail with the below config. Name resolvers are
// expected to perform this parsing before they push the parsed service
// config to the channel.
const sc = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": 666 }}]}`
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(sc)
if scpr.Err == nil {
t.Fatalf("ParseConfig() succeeded and returned %+v, when expected to fail", scpr)
}
}

// setupPickFirstWithListenerWrapper is very similar to setupPickFirst, but uses
// a wrapped listener that the test can use to track accepted connections.
func setupPickFirstWithListenerWrapper(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer, []*testutils.ListenerWrapper) {
Expand Down