@@ -24,12 +24,14 @@ import (
2424 "sync"
2525
2626 "google.golang.org/grpc/balancer"
27+ "google.golang.org/grpc/codes"
2728 "google.golang.org/grpc/connectivity"
2829 "google.golang.org/grpc/internal"
2930 "google.golang.org/grpc/internal/balancer/gracefulswitch"
3031 "google.golang.org/grpc/internal/channelz"
3132 "google.golang.org/grpc/internal/grpcsync"
3233 "google.golang.org/grpc/resolver"
34+ "google.golang.org/grpc/status"
3335)
3436
3537var setConnectedAddress = internal .SetConnectedAddress .(func (* balancer.SubConnState , resolver.Address ))
@@ -256,17 +258,20 @@ type acBalancerWrapper struct {
256258 ccb * ccBalancerWrapper // read-only
257259 stateListener func (balancer.SubConnState )
258260
259- mu sync.Mutex
260- producers map [balancer.ProducerBuilder ]* refCountedProducer
261+ producersMu sync.Mutex
262+ producers map [balancer.ProducerBuilder ]* refCountedProducer
261263}
262264
263265// updateState is invoked by grpc to push a subConn state update to the
264266// underlying balancer.
265- func (acbw * acBalancerWrapper ) updateState (s connectivity.State , curAddr resolver.Address , err error , readyChan chan struct {} ) {
267+ func (acbw * acBalancerWrapper ) updateState (s connectivity.State , curAddr resolver.Address , err error ) {
266268 acbw .ccb .serializer .TrySchedule (func (ctx context.Context ) {
267269 if ctx .Err () != nil || acbw .ccb .balancer == nil {
268270 return
269271 }
272+ // Invalidate all producers on any state change.
273+ acbw .closeProducers ()
274+
270275 // Even though it is optional for balancers, gracefulswitch ensures
271276 // opts.StateListener is set, so this cannot ever be nil.
272277 // TODO: delete this comment when UpdateSubConnState is removed.
@@ -275,15 +280,6 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
275280 setConnectedAddress (& scs , curAddr )
276281 }
277282 acbw .stateListener (scs )
278- acbw .ac .mu .Lock ()
279- defer acbw .ac .mu .Unlock ()
280- if s == connectivity .Ready {
281- // When changing states to READY, close stateReadyChan. Wait until
282- // after we notify the LB policy's listener(s) in order to prevent
283- // ac.getTransport() from unblocking before the LB policy starts
284- // tracking the subchannel as READY.
285- close (readyChan )
286- }
287283 })
288284}
289285
@@ -300,16 +296,18 @@ func (acbw *acBalancerWrapper) Connect() {
300296}
301297
302298func (acbw * acBalancerWrapper ) Shutdown () {
299+ acbw .closeProducers ()
303300 acbw .ccb .cc .removeAddrConn (acbw .ac , errConnDrain )
304301}
305302
306303// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
307304// ready, blocks until it is or ctx expires. Returns an error when the context
308305// expires or the addrConn is shut down.
309306func (acbw * acBalancerWrapper ) NewStream (ctx context.Context , desc * StreamDesc , method string , opts ... CallOption ) (ClientStream , error ) {
310- transport , err := acbw .ac .getTransport (ctx )
311- if err != nil {
312- return nil , err
307+ transport := acbw .ac .getReadyTransport ()
308+ if transport == nil {
309+ return nil , status .Errorf (codes .Unavailable , "SubConn state is not Ready" )
310+
313311 }
314312 return newNonRetryClientStream (ctx , desc , method , transport , acbw .ac , opts ... )
315313}
@@ -334,8 +332,8 @@ type refCountedProducer struct {
334332}
335333
336334func (acbw * acBalancerWrapper ) GetOrBuildProducer (pb balancer.ProducerBuilder ) (balancer.Producer , func ()) {
337- acbw .mu .Lock ()
338- defer acbw .mu .Unlock ()
335+ acbw .producersMu .Lock ()
336+ defer acbw .producersMu .Unlock ()
339337
340338 // Look up existing producer from this builder.
341339 pData := acbw .producers [pb ]
@@ -352,13 +350,26 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (
352350 // and delete the refCountedProducer from the map if the total reference
353351 // count goes to zero.
354352 unref := func () {
355- acbw .mu .Lock ()
353+ acbw .producersMu .Lock ()
354+ // If closeProducers has already closed this producer instance, refs is
355+ // set to 0, so the check after decrementing will never pass, and the
356+ // producer will not be double-closed.
356357 pData .refs --
357358 if pData .refs == 0 {
358359 defer pData .close () // Run outside the acbw mutex
359360 delete (acbw .producers , pb )
360361 }
361- acbw .mu .Unlock ()
362+ acbw .producersMu .Unlock ()
362363 }
363364 return pData .producer , grpcsync .OnceFunc (unref )
364365}
366+
367+ func (acbw * acBalancerWrapper ) closeProducers () {
368+ acbw .producersMu .Lock ()
369+ defer acbw .producersMu .Unlock ()
370+ for pb , pData := range acbw .producers {
371+ pData .refs = 0
372+ pData .close ()
373+ delete (acbw .producers , pb )
374+ }
375+ }
0 commit comments