@@ -29,7 +29,9 @@ import (
2929 "google.golang.org/grpc/connectivity"
3030 "google.golang.org/grpc/credentials/insecure"
3131 "google.golang.org/grpc/internal"
32+ "google.golang.org/grpc/internal/buffer"
3233 internalgrpclog "google.golang.org/grpc/internal/grpclog"
34+ "google.golang.org/grpc/internal/grpcsync"
3335 "google.golang.org/grpc/internal/pretty"
3436 rlsgrpc "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
3537 rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
@@ -55,37 +57,58 @@ type controlChannel struct {
5557 // hammering the RLS service while it is overloaded or down.
5658 throttler adaptiveThrottler
5759
58- cc * grpc.ClientConn
59- client rlsgrpc.RouteLookupServiceClient
60- logger * internalgrpclog.PrefixLogger
60+ cc * grpc.ClientConn
61+ client rlsgrpc.RouteLookupServiceClient
62+ logger * internalgrpclog.PrefixLogger
63+ connectivityStateCh * buffer.Unbounded
64+ unsubscribe func ()
65+ monitorDoneCh chan struct {}
6166}
6267
6368// newControlChannel creates a controlChannel to rlsServerName and uses
6469// serviceConfig, if non-empty, as the default service config for the underlying
6570// gRPC channel.
6671func newControlChannel (rlsServerName , serviceConfig string , rpcTimeout time.Duration , bOpts balancer.BuildOptions , backToReadyFunc func ()) (* controlChannel , error ) {
6772 ctrlCh := & controlChannel {
68- rpcTimeout : rpcTimeout ,
69- backToReadyFunc : backToReadyFunc ,
70- throttler : newAdaptiveThrottler (),
73+ rpcTimeout : rpcTimeout ,
74+ backToReadyFunc : backToReadyFunc ,
75+ throttler : newAdaptiveThrottler (),
76+ connectivityStateCh : buffer .NewUnbounded (),
77+ monitorDoneCh : make (chan struct {}),
7178 }
7279 ctrlCh .logger = internalgrpclog .NewPrefixLogger (logger , fmt .Sprintf ("[rls-control-channel %p] " , ctrlCh ))
7380
7481 dopts , err := ctrlCh .dialOpts (bOpts , serviceConfig )
7582 if err != nil {
7683 return nil , err
7784 }
78- ctrlCh .cc , err = grpc .Dial (rlsServerName , dopts ... )
85+ ctrlCh .cc , err = grpc .NewClient (rlsServerName , dopts ... )
7986 if err != nil {
8087 return nil , err
8188 }
89+ // Subscribe to connectivity state before connecting to avoid missing initial
90+ // updates, which are only delivered to active subscribers.
91+ ctrlCh .unsubscribe = internal .SubscribeToConnectivityStateChanges .(func (cc * grpc.ClientConn , s grpcsync.Subscriber ) func ())(ctrlCh .cc , ctrlCh )
92+ ctrlCh .cc .Connect ()
8293 ctrlCh .client = rlsgrpc .NewRouteLookupServiceClient (ctrlCh .cc )
8394 ctrlCh .logger .Infof ("Control channel created to RLS server at: %v" , rlsServerName )
84-
85- go ctrlCh .monitorConnectivityState ()
95+ start := make (chan struct {})
96+ go func () {
97+ close (start )
98+ ctrlCh .monitorConnectivityState ()
99+ }()
100+ <- start
86101 return ctrlCh , nil
87102}
88103
104+ func (cc * controlChannel ) OnMessage (msg any ) {
105+ st , ok := msg .(connectivity.State )
106+ if ! ok {
107+ panic (fmt .Sprintf ("Unexpected message type %T , wanted connectectivity.State type" , msg ))
108+ }
109+ cc .connectivityStateCh .Put (st )
110+ }
111+
89112// dialOpts constructs the dial options for the control plane channel.
90113func (cc * controlChannel ) dialOpts (bOpts balancer.BuildOptions , serviceConfig string ) ([]grpc.DialOption , error ) {
91114 // The control plane channel will use the same authority as the parent
@@ -97,7 +120,6 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st
97120 if bOpts .Dialer != nil {
98121 dopts = append (dopts , grpc .WithContextDialer (bOpts .Dialer ))
99122 }
100-
101123 // The control channel will use the channel credentials from the parent
102124 // channel, including any call creds associated with the channel creds.
103125 var credsOpt grpc.DialOption
@@ -133,6 +155,8 @@ func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig st
133155
134156func (cc * controlChannel ) monitorConnectivityState () {
135157 cc .logger .Infof ("Starting connectivity state monitoring goroutine" )
158+ defer close (cc .monitorDoneCh )
159+
136160 // Since we use two mechanisms to deal with RLS server being down:
137161 // - adaptive throttling for the channel as a whole
138162 // - exponential backoff on a per-request basis
@@ -154,39 +178,45 @@ func (cc *controlChannel) monitorConnectivityState() {
154178 // returning only one new picker, regardless of how many backoff timers are
155179 // cancelled.
156180
157- // Using the background context is fine here since we check for the ClientConn
158- // entering SHUTDOWN and return early in that case.
159- ctx := context .Background ()
181+ // Wait for the control channel to become READY for the first time.
182+ for s , ok := <- cc .connectivityStateCh .Get (); s != connectivity .Ready ; s , ok = <- cc .connectivityStateCh .Get () {
183+ if ! ok {
184+ return
185+ }
186+
187+ cc .connectivityStateCh .Load ()
188+ if s == connectivity .Shutdown {
189+ return
190+ }
191+ }
192+ cc .connectivityStateCh .Load ()
193+ cc .logger .Infof ("Connectivity state is READY" )
160194
161- first := true
162195 for {
163- // Wait for the control channel to become READY.
164- for s := cc .cc .GetState (); s != connectivity .Ready ; s = cc .cc .GetState () {
165- if s == connectivity .Shutdown {
166- return
167- }
168- cc .cc .WaitForStateChange (ctx , s )
196+ s , ok := <- cc .connectivityStateCh .Get ()
197+ if ! ok {
198+ return
169199 }
170- cc .logger . Infof ( "Connectivity state is READY" )
200+ cc .connectivityStateCh . Load ( )
171201
172- if ! first {
202+ if s == connectivity .Shutdown {
203+ return
204+ }
205+ if s == connectivity .Ready {
173206 cc .logger .Infof ("Control channel back to READY" )
174207 cc .backToReadyFunc ()
175208 }
176- first = false
177209
178- // Wait for the control channel to move out of READY.
179- cc .cc .WaitForStateChange (ctx , connectivity .Ready )
180- if cc .cc .GetState () == connectivity .Shutdown {
181- return
182- }
183- cc .logger .Infof ("Connectivity state is %s" , cc .cc .GetState ())
210+ cc .logger .Infof ("Connectivity state is %s" , s )
184211 }
185212}
186213
187214func (cc * controlChannel ) close () {
188- cc .logger .Infof ("Closing control channel" )
215+ cc .unsubscribe ()
216+ cc .connectivityStateCh .Close ()
217+ <- cc .monitorDoneCh
189218 cc .cc .Close ()
219+ cc .logger .Infof ("Shutdown" )
190220}
191221
192222type lookupCallback func (targets []string , headerData string , err error )
0 commit comments