Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions authz/audit/audit_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (lb *loggerBuilder) Build(audit.LoggerConfig) audit.Logger {
}
}

func (*loggerBuilder) ParseLoggerConfig(config json.RawMessage) (audit.LoggerConfig, error) {
func (*loggerBuilder) ParseLoggerConfig(json.RawMessage) (audit.LoggerConfig, error) {
return nil, nil
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func (s) TestAuditLogger(t *testing.T) {
serverCreds := loadServerCreds(t)
clientCreds := loadClientCreds(t)
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
Expand Down
14 changes: 7 additions & 7 deletions authz/grpc_authz_end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (s) TestStaticPolicyEnd2End(t *testing.T) {
i, _ := authz.NewStatic(test.authzPolicy)

stub := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
Expand Down Expand Up @@ -374,7 +374,7 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnTLSAuthenticatedConnection(t *
}

stub := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
S: grpc.NewServer(grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
Expand Down Expand Up @@ -436,7 +436,7 @@ func (s) TestAllowsRPCRequestWithPrincipalsFieldOnMTLSAuthenticatedConnection(t
ClientCAs: certPool,
})
stub := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
S: grpc.NewServer(grpc.Creds(creds), grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
Expand Down Expand Up @@ -486,7 +486,7 @@ func (s) TestFileWatcherEnd2End(t *testing.T) {
defer i.Close()

stub := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
Expand Down Expand Up @@ -563,7 +563,7 @@ func (s) TestFileWatcher_ValidPolicyRefresh(t *testing.T) {
defer i.Close()

stub := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
// Start a gRPC server with gRPC authz unary server interceptor.
Expand Down Expand Up @@ -608,7 +608,7 @@ func (s) TestFileWatcher_InvalidPolicySkipReload(t *testing.T) {
defer i.Close()

stub := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
// Start a gRPC server with gRPC authz unary server interceptors.
Expand Down Expand Up @@ -656,7 +656,7 @@ func (s) TestFileWatcher_RecoversFromReloadFailure(t *testing.T) {
defer i.Close()

stub := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, req *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
S: grpc.NewServer(grpc.ChainUnaryInterceptor(i.UnaryInterceptor)),
Expand Down
8 changes: 6 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,15 +360,19 @@ type Balancer interface {
// call SubConn.Shutdown for its existing SubConns; however, this will be
// required in a future release, so it is recommended.
Close()
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
// the IDLE state will not reconnect until SubConn.Connect is called.
ExitIdle()
}

// ExitIdler is an optional interface for balancers to implement. If
// implemented, ExitIdle will be called when ClientConn.Connect is called, if
// the ClientConn is idle. If unimplemented, ClientConn.Connect will cause
// all SubConns to connect.
//
// Notice: it will be required for all balancers to implement this in a future
// release.
// Deprecated: All balancers must implement this interface. This interface will
// be removed in a future release.
type ExitIdler interface {
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
Expand Down
36 changes: 26 additions & 10 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,15 @@ type ChildState struct {

// Balancer exposes only the ExitIdler interface of the child LB policy.
// Other methods of the child policy are called only by endpointsharding.
Balancer balancer.ExitIdler
Balancer ExitIdler
}

// ExitIdler provides access to only the ExitIdle method of the child balancer.
type ExitIdler interface {
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
// the IDLE state will not reconnect until SubConn.Connect is called.
ExitIdle()
}

// Options are the options to configure the behaviour of the
Expand Down Expand Up @@ -205,6 +213,16 @@ func (es *endpointSharding) Close() {
}
}

func (es *endpointSharding) ExitIdle() {
es.childMu.Lock()
defer es.childMu.Unlock()
for _, bw := range es.children.Load().Values() {
if !bw.isClosed {
bw.child.ExitIdle()
}
}
}

// updateState updates this component's state. It sends the aggregated state,
// and a picker with round robin behavior with all the child states present if
// needed.
Expand Down Expand Up @@ -326,15 +344,13 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
// avoid deadlocks due to synchronous balancer state updates.
func (bw *balancerWrapper) ExitIdle() {
if ei, ok := bw.child.(balancer.ExitIdler); ok {
go func() {
bw.es.childMu.Lock()
if !bw.isClosed {
ei.ExitIdle()
}
bw.es.childMu.Unlock()
}()
}
go func() {
bw.es.childMu.Lock()
if !bw.isClosed {
bw.child.ExitIdle()
}
bw.es.childMu.Unlock()
}()
}

// updateClientConnStateLocked delivers the ClientConnState to the child
Expand Down
66 changes: 66 additions & 0 deletions balancer/endpointsharding/endpointsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,69 @@ func (s) TestEndpointShardingReconnectDisabled(t *testing.T) {
}
}
}

// Tests that endpointsharding doesn't automatically re-connect IDLE children
// until cc.Connect() is called. The test creates an endpoint with a single
// address. The client is connected and the active server is closed to make the
// child pickfirst enter IDLE state. The test verifies that the child pickfirst
// doesn't re-connect automatically. The test calls cc.Connect() and verified
// that the balancer connects causing the channel to enter TransientFailure.
func (s) TestEndpointShardingExitIdle(t *testing.T) {
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()

mr := manual.NewBuilderWithScheme("e2e-test")
defer mr.Close()

name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "")
bf := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
epOpts := endpointsharding.Options{DisableAutoReconnect: true}
bd.Data = endpointsharding.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build, epOpts)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs)
},
Close: func(bd *stub.BalancerData) {
bd.Data.(balancer.Balancer).Close()
},
ExitIdle: func(bd *stub.BalancerData) {
bd.Data.(balancer.Balancer).ExitIdle()
},
}
stub.Register(name, bf)

json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, name)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json)
mr.InitialState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backend.Address}}},
},
ServiceConfig: sc,
})

cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to create new client: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("client.EmptyCall() returned unexpected error: %v", err)
}

// On closing the first server, the first child balancer should enter
// IDLE. Since endpointsharding is configured not to auto-reconnect, it will
// remain IDLE and will not try to re-connect
backend.Stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer shortCancel()
testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Idle)

// The balancer should try to re-connect and fail.
cc.Connect()
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
}
4 changes: 1 addition & 3 deletions balancer/lazy/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ func (lb *lazyBalancer) ExitIdle() {
lb.mu.Lock()
defer lb.mu.Unlock()
if lb.delegate != nil {
if d, ok := lb.delegate.(balancer.ExitIdler); ok {
d.ExitIdle()
}
lb.delegate.ExitIdle()
return
}
lb.delegate = lb.childBuilder(lb.cc, lb.buildOptions)
Expand Down
10 changes: 5 additions & 5 deletions balancer/lazy/lazy_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s) TestExitIdle(t *testing.T) {
bd.Data = lazy.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build)
},
ExitIdle: func(bd *stub.BalancerData) {
bd.Data.(balancer.ExitIdler).ExitIdle()
bd.Data.(balancer.Balancer).ExitIdle()
},
ResolverError: func(bd *stub.BalancerData, err error) {
bd.Data.(balancer.Balancer).ResolverError(err)
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s) TestPicker(t *testing.T) {
Init: func(bd *stub.BalancerData) {
bd.Data = lazy.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build)
},
ExitIdle: func(bd *stub.BalancerData) {
ExitIdle: func(*stub.BalancerData) {
t.Log("Ignoring call to ExitIdle, calling the picker should make the lazy balancer exit IDLE state.")
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
Expand Down Expand Up @@ -229,7 +229,7 @@ func (s) TestGoodUpdateThenResolverError(t *testing.T) {
Init: func(bd *stub.BalancerData) {
bd.Data = lazy.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(childBalName).Build)
},
ExitIdle: func(bd *stub.BalancerData) {
ExitIdle: func(*stub.BalancerData) {
t.Log("Ignoring call to ExitIdle to delay lazy child creation until RPC time.")
},
ResolverError: func(bd *stub.BalancerData, err error) {
Expand Down Expand Up @@ -327,7 +327,7 @@ func (s) TestResolverErrorThenGoodUpdate(t *testing.T) {
Init: func(bd *stub.BalancerData) {
bd.Data = lazy.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(childBalName).Build)
},
ExitIdle: func(bd *stub.BalancerData) {
ExitIdle: func(*stub.BalancerData) {
t.Log("Ignoring call to ExitIdle to delay lazy child creation until RPC time.")
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
Expand Down Expand Up @@ -410,7 +410,7 @@ func (s) TestExitIdlePassthrough(t *testing.T) {
bd.Data = lazy.NewBalancer(bd.ClientConn, bd.BuildOptions, balancer.Get(pickfirstleaf.Name).Build)
},
ExitIdle: func(bd *stub.BalancerData) {
bd.Data.(balancer.ExitIdler).ExitIdle()
bd.Data.(balancer.Balancer).ExitIdle()
},
ResolverError: func(bd *stub.BalancerData, err error) {
bd.Data.(balancer.Balancer).ResolverError(err)
Expand Down
4 changes: 1 addition & 3 deletions balancer/leastrequest/leastrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ func (lrb *leastRequestBalancer) ResolverError(err error) {
}

func (lrb *leastRequestBalancer) ExitIdle() {
if ei, ok := lrb.child.(balancer.ExitIdler); ok { // Should always be ok, as child is endpoint sharding.
ei.ExitIdle()
}
lrb.child.ExitIdle()
}

func (lrb *leastRequestBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
Expand Down
4 changes: 2 additions & 2 deletions balancer/pickfirst/pickfirstleaf/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s) TestPickFirstMetrics(t *testing.T) {
defer cancel()

ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func (s) TestPickFirstMetricsE2E(t *testing.T) {
defer cancel()

ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
Expand Down
6 changes: 0 additions & 6 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,12 +1592,6 @@ func (b *stateStoringBalancer) Close() {
b.Balancer.Close()
}

func (b *stateStoringBalancer) ExitIdle() {
if ib, ok := b.Balancer.(balancer.ExitIdler); ok {
ib.ExitIdle()
}
}

type stateStoringBalancerBuilder struct {
balancer chan *stateStoringBalancer
}
Expand Down
9 changes: 7 additions & 2 deletions balancer/ringhash/ringhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
// - https://github.com/grpc/proposal/blob/master/A42-xds-ring-hash-lb-policy.md
// - https://github.com/grpc/proposal/blob/master/A61-IPv4-IPv6-dualstack-backends.md#ring-hash
// - https://github.com/grpc/proposal/blob/master/A76-ring-hash-improvements.md
//
// # Experimental
//
// Notice: This package is EXPERIMENTAL and may be changed or removed in a
// later release.
package ringhash

import (
Expand Down Expand Up @@ -263,7 +268,7 @@ func (b *ringhashBalancer) updatePickerLocked() {
sort.Slice(endpointStates, func(i, j int) bool {
return endpointStates[i].hashKey < endpointStates[j].hashKey
})
var idleBalancer balancer.ExitIdler
var idleBalancer endpointsharding.ExitIdler
for _, es := range endpointStates {
connState := es.state.ConnectivityState
if connState == connectivity.Connecting {
Expand Down Expand Up @@ -394,7 +399,7 @@ type endpointState struct {
// overridden, for example based on EDS endpoint metadata.
hashKey string
weight uint32
balancer balancer.ExitIdler
balancer endpointsharding.ExitIdler

// state is updated by the balancer while receiving resolver updates from
// the channel and picker updates from its children. Access to it is guarded
Expand Down
8 changes: 4 additions & 4 deletions balancer/ringhash/ringhash_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicks(t *testing.T) {
backend := stubserver.StartTestService(t, &stubserver.StubServer{
// We expect the server EmptyCall to not be call here because the
// aggregated channel state is never READY when the call is pending.
EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
t.Errorf("EmptyCall() should not have been called")
return &testpb.Empty{}, nil
},
Expand Down Expand Up @@ -1600,7 +1600,7 @@ func (s) TestRingHash_ReattemptWhenGoingFromTransientFailureToIdle(t *testing.T)
// Tests that when all backends are down and then up, we may pick a TF backend
// and we will then jump to ready backend.
func (s) TestRingHash_TransientFailureSkipToAvailableReady(t *testing.T) {
emptyCallF := func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
emptyCallF := func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}
lis, err := testutils.LocalTCPListener()
Expand Down Expand Up @@ -1722,7 +1722,7 @@ func (s) TestRingHash_ReattemptWhenAllEndpointsUnreachable(t *testing.T) {
restartableListener := testutils.NewRestartableListener(lis)
restartableServer := stubserver.StartTestService(t, &stubserver.StubServer{
Listener: restartableListener,
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
})
Expand Down Expand Up @@ -1788,7 +1788,7 @@ func (s) TestRingHash_SwitchToLowerPriorityAndThenBack(t *testing.T) {
restartableListener := testutils.NewRestartableListener(lis)
restartableServer := stubserver.StartTestService(t, &stubserver.StubServer{
Listener: restartableListener,
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
})
Expand Down
Loading