Skip to content

Commit dceb6ee

Browse files
authored
xds/clusterimpl: stop forwarding UpdateSubConnState calls (#6518)
1 parent 8def12a commit dceb6ee

File tree

2 files changed

+14
-40
lines changed

2 files changed

+14
-40
lines changed

xds/internal/balancer/clusterimpl/balancer_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -374,24 +374,25 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
374374
stub.Register(childPolicyName, stub.BalancerFuncs{
375375
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
376376
// Create a subConn which will be used later on to test the race
377-
// between UpdateSubConnState() and Close().
378-
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
377+
// between StateListener() and Close().
378+
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{
379+
StateListener: func(balancer.SubConnState) {
380+
go func() {
381+
// Wait for Close() to be called on the parent policy before
382+
// sending the picker update.
383+
<-closeCh
384+
bd.ClientConn.UpdateState(balancer.State{
385+
Picker: base.NewErrPicker(errors.New("dummy error picker")),
386+
})
387+
}()
388+
},
389+
})
379390
if err != nil {
380391
return err
381392
}
382393
sc.Connect()
383394
return nil
384395
},
385-
UpdateSubConnState: func(bd *stub.BalancerData, _ balancer.SubConn, _ balancer.SubConnState) {
386-
go func() {
387-
// Wait for Close() to be called on the parent policy before
388-
// sending the picker update.
389-
<-closeCh
390-
bd.ClientConn.UpdateState(balancer.State{
391-
Picker: base.NewErrPicker(errors.New("dummy error picker")),
392-
})
393-
}()
394-
},
395396
})
396397

397398
var maxRequest uint32 = 50

xds/internal/balancer/clusterimpl/clusterimpl.go

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
6565
closed: grpcsync.NewEvent(),
6666
done: grpcsync.NewEvent(),
6767
loadWrapper: loadstore.NewWrapper(),
68-
scWrappers: make(map[balancer.SubConn]*scWrapper),
6968
pickerUpdateCh: buffer.NewUnbounded(),
7069
requestCountMax: defaultRequestCountMax,
7170
}
@@ -113,18 +112,6 @@ type clusterImplBalancer struct {
113112
clusterNameMu sync.Mutex
114113
clusterName string
115114

116-
scWrappersMu sync.Mutex
117-
// The SubConns passed to the child policy are wrapped in a wrapper, to keep
118-
// locality ID. But when the parent ClientConn sends updates, it's going to
119-
// give the original SubConn, not the wrapper. But the child policies only
120-
// know about the wrapper, so when forwarding SubConn updates, they must be
121-
// sent for the wrappers.
122-
//
123-
// This keeps a map from original SubConn to wrapper, so that when
124-
// forwarding the SubConn state update, the child policy will get the
125-
// wrappers.
126-
scWrappers map[balancer.SubConn]*scWrapper
127-
128115
// childState/drops/requestCounter keeps the state used by the most recently
129116
// generated picker. All fields can only be accessed in run(). And run() is
130117
// the only goroutine that sends picker to the parent ClientConn. All
@@ -296,24 +283,13 @@ func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer
296283
b.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
297284
}
298285

299-
b.scWrappersMu.Lock()
300-
if scw, ok := b.scWrappers[sc]; ok {
301-
sc = scw
302-
if s.ConnectivityState == connectivity.Shutdown {
303-
// Remove this SubConn from the map on Shutdown.
304-
delete(b.scWrappers, scw.SubConn)
305-
}
306-
}
307-
b.scWrappersMu.Unlock()
308286
if cb != nil {
309287
cb(s)
310-
} else {
311-
b.child.UpdateSubConnState(sc, s)
312288
}
313289
}
314290

315291
func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
316-
b.updateSubConnState(sc, s, nil)
292+
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, s)
317293
}
318294

319295
func (b *clusterImplBalancer) Close() {
@@ -394,11 +370,8 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
394370
return nil, err
395371
}
396372
// Wrap this SubConn in a wrapper, and add it to the map.
397-
b.scWrappersMu.Lock()
398373
ret := &scWrapper{SubConn: sc}
399374
ret.updateLocalityID(lID)
400-
b.scWrappers[sc] = ret
401-
b.scWrappersMu.Unlock()
402375
return ret, nil
403376
}
404377

0 commit comments

Comments
 (0)