Skip to content

Commit a5e7cd6

Browse files
authored
stats: add DelayedPickComplete and follow correct semantics (#8465)
1 parent 89d2281 commit a5e7cd6

File tree

10 files changed

+70
-64
lines changed

10 files changed

+70
-64
lines changed

clientconn.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
208208
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
209209

210210
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
211-
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
211+
cc.pickerWrapper = newPickerWrapper()
212212

213213
cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
214214

@@ -1076,13 +1076,6 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
10761076
return cc.sc.healthCheckConfig
10771077
}
10781078

1079-
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
1080-
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
1081-
Ctx: ctx,
1082-
FullMethodName: method,
1083-
})
1084-
}
1085-
10861079
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
10871080
if sc == nil {
10881081
// should never reach here.

picker_wrapper.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"google.golang.org/grpc/internal/channelz"
3030
istatus "google.golang.org/grpc/internal/status"
3131
"google.golang.org/grpc/internal/transport"
32-
"google.golang.org/grpc/stats"
3332
"google.golang.org/grpc/status"
3433
)
3534

@@ -48,14 +47,11 @@ type pickerGeneration struct {
4847
// actions and unblock when there's a picker update.
4948
type pickerWrapper struct {
5049
// If pickerGen holds a nil pointer, the pickerWrapper is closed.
51-
pickerGen atomic.Pointer[pickerGeneration]
52-
statsHandlers []stats.Handler // to record blocking picker calls
50+
pickerGen atomic.Pointer[pickerGeneration]
5351
}
5452

55-
func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
56-
pw := &pickerWrapper{
57-
statsHandlers: statsHandlers,
58-
}
53+
func newPickerWrapper() *pickerWrapper {
54+
pw := &pickerWrapper{}
5955
pw.pickerGen.Store(&pickerGeneration{
6056
blockingCh: make(chan struct{}),
6157
})
@@ -93,22 +89,29 @@ func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
9389
}
9490
}
9591

92+
type pick struct {
93+
transport transport.ClientTransport // the selected transport
94+
result balancer.PickResult // the contents of the pick from the LB policy
95+
blocked bool // set if a picker call queued for a new picker
96+
}
97+
9698
// pick returns the transport that will be used for the RPC.
9799
// It may block in the following cases:
98100
// - there's no picker
99101
// - the current picker returns ErrNoSubConnAvailable
100102
// - the current picker returns other errors and failfast is false.
101103
// - the subConn returned by the current picker is not READY
102104
// When one of these situations happens, pick blocks until the picker gets updated.
103-
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
105+
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (pick, error) {
104106
var ch chan struct{}
105107

106108
var lastPickErr error
109+
pickBlocked := false
107110

108111
for {
109112
pg := pw.pickerGen.Load()
110113
if pg == nil {
111-
return nil, balancer.PickResult{}, ErrClientConnClosing
114+
return pick{}, ErrClientConnClosing
112115
}
113116
if pg.picker == nil {
114117
ch = pg.blockingCh
@@ -127,9 +130,9 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
127130
}
128131
switch ctx.Err() {
129132
case context.DeadlineExceeded:
130-
return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
133+
return pick{}, status.Error(codes.DeadlineExceeded, errStr)
131134
case context.Canceled:
132-
return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
135+
return pick{}, status.Error(codes.Canceled, errStr)
133136
}
134137
case <-ch:
135138
}
@@ -145,9 +148,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
145148
// In the second case, the only way it will get to this conditional is
146149
// if there is a new picker.
147150
if ch != nil {
148-
for _, sh := range pw.statsHandlers {
149-
sh.HandleRPC(ctx, &stats.PickerUpdated{})
150-
}
151+
pickBlocked = true
151152
}
152153

153154
ch = pg.blockingCh
@@ -164,15 +165,15 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
164165
if istatus.IsRestrictedControlPlaneCode(st) {
165166
err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
166167
}
167-
return nil, balancer.PickResult{}, dropError{error: err}
168+
return pick{}, dropError{error: err}
168169
}
169170
// For all other errors, wait for ready RPCs should block and other
170171
// RPCs should fail with unavailable.
171172
if !failfast {
172173
lastPickErr = err
173174
continue
174175
}
175-
return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
176+
return pick{}, status.Error(codes.Unavailable, err.Error())
176177
}
177178

178179
acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
@@ -183,9 +184,8 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
183184
if t := acbw.ac.getReadyTransport(); t != nil {
184185
if channelz.IsOn() {
185186
doneChannelzWrapper(acbw, &pickResult)
186-
return t, pickResult, nil
187187
}
188-
return t, pickResult, nil
188+
return pick{transport: t, result: pickResult, blocked: pickBlocked}, nil
189189
}
190190
if pickResult.Done != nil {
191191
// Calling done with nil error, no bytes sent and no bytes received.

picker_wrapper_test.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,16 @@ func (p *testingPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
6767
}
6868

6969
func (s) TestBlockingPickTimeout(t *testing.T) {
70-
bp := newPickerWrapper(nil)
70+
bp := newPickerWrapper()
7171
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
7272
defer cancel()
73-
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
73+
if _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
7474
t.Errorf("bp.pick returned error %v, want DeadlineExceeded", err)
7575
}
7676
}
7777

7878
func (s) TestBlockingPick(t *testing.T) {
79-
bp := newPickerWrapper(nil)
79+
bp := newPickerWrapper()
8080
// All goroutines should block because picker is nil in bp.
8181
var finishedCount uint64
8282
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -85,8 +85,8 @@ func (s) TestBlockingPick(t *testing.T) {
8585
wg.Add(goroutineCount)
8686
for i := goroutineCount; i > 0; i-- {
8787
go func() {
88-
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
89-
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
88+
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
89+
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
9090
}
9191
atomic.AddUint64(&finishedCount, 1)
9292
wg.Done()
@@ -102,7 +102,7 @@ func (s) TestBlockingPick(t *testing.T) {
102102
}
103103

104104
func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
105-
bp := newPickerWrapper(nil)
105+
bp := newPickerWrapper()
106106
var finishedCount uint64
107107
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
108108
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -112,8 +112,8 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
112112
wg.Add(goroutineCount)
113113
for i := goroutineCount; i > 0; i-- {
114114
go func() {
115-
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
116-
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
115+
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
116+
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
117117
}
118118
atomic.AddUint64(&finishedCount, 1)
119119
wg.Done()
@@ -129,7 +129,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
129129
}
130130

131131
func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
132-
bp := newPickerWrapper(nil)
132+
bp := newPickerWrapper()
133133
bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
134134
var finishedCount uint64
135135
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -140,8 +140,8 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
140140
wg.Add(goroutineCount)
141141
for i := goroutineCount; i > 0; i-- {
142142
go func() {
143-
if tr, _, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || tr != testT {
144-
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
143+
if pick, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || pick.transport != testT {
144+
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
145145
}
146146
atomic.AddUint64(&finishedCount, 1)
147147
wg.Done()
@@ -157,7 +157,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
157157
}
158158

159159
func (s) TestBlockingPickSCNotReady(t *testing.T) {
160-
bp := newPickerWrapper(nil)
160+
bp := newPickerWrapper()
161161
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
162162
var finishedCount uint64
163163
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -167,8 +167,8 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
167167
wg.Add(goroutineCount)
168168
for i := goroutineCount; i > 0; i-- {
169169
go func() {
170-
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
171-
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
170+
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
171+
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
172172
}
173173
atomic.AddUint64(&finishedCount, 1)
174174
wg.Done()

scripts/vet.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ NewSubConn is deprecated:
179179
OverrideServerName is deprecated:
180180
RemoveSubConn is deprecated:
181181
SecurityVersion is deprecated:
182+
stats.PickerUpdated is deprecated:
182183
Target is deprecated: Use the Target field in the BuildOptions instead.
183184
UpdateAddresses is deprecated:
184185
UpdateSubConnState is deprecated:

stats/opentelemetry/trace.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) {
5252
)
5353
// increment previous rpc attempts applicable for next attempt
5454
atomic.AddUint32(&ai.previousRPCAttempts, 1)
55-
case *stats.PickerUpdated:
55+
case *stats.DelayedPickComplete:
5656
span.AddEvent("Delayed LB pick complete")
5757
case *stats.InPayload:
5858
// message id - "must be calculated as two different counters starting

stats/stats.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,21 @@ func (s *Begin) IsClient() bool { return s.Client }
6464

6565
func (s *Begin) isRPCStats() {}
6666

67-
// PickerUpdated indicates that the LB policy provided a new picker while the
68-
// RPC was waiting for one.
69-
type PickerUpdated struct{}
67+
// DelayedPickComplete indicates that the RPC is unblocked following a delay in
68+
// selecting a connection for the call.
69+
type DelayedPickComplete struct{}
7070

71-
// IsClient indicates if the stats information is from client side. Only Client
72-
// Side interfaces with a Picker, thus always returns true.
73-
func (*PickerUpdated) IsClient() bool { return true }
71+
// IsClient indicates DelayedPickComplete is available on the client.
72+
func (*DelayedPickComplete) IsClient() bool { return true }
7473

75-
func (*PickerUpdated) isRPCStats() {}
74+
func (*DelayedPickComplete) isRPCStats() {}
75+
76+
// PickerUpdated indicates that the RPC is unblocked following a delay in
77+
// selecting a connection for the call.
78+
//
79+
// Deprecated: will be removed in a future release; use DelayedPickComplete
80+
// instead.
81+
type PickerUpdated = DelayedPickComplete
7682

7783
// InPayload contains stats about an incoming payload.
7884
type InPayload struct {

stream.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
469469
func (a *csAttempt) getTransport() error {
470470
cs := a.cs
471471

472-
var err error
473-
a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
472+
pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
473+
pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
474+
a.transport, a.pickResult = pick.transport, pick.result
474475
if err != nil {
475476
if de, ok := err.(dropError); ok {
476477
err = de.error
@@ -481,6 +482,11 @@ func (a *csAttempt) getTransport() error {
481482
if a.trInfo != nil {
482483
a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
483484
}
485+
if pick.blocked {
486+
for _, sh := range a.statsHandlers {
487+
sh.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
488+
}
489+
}
484490
return nil
485491
}
486492

test/end2end_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6698,13 +6698,13 @@ func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) {
66986698
t.Fatalf("Unexpected error from UnaryCall: %v", err)
66996699
}
67006700

6701-
var pickerUpdatedCount uint
6701+
var delayedPickCompleteCount int
67026702
for _, stat := range sh.s {
6703-
if _, ok := stat.(*stats.PickerUpdated); ok {
6704-
pickerUpdatedCount++
6703+
if _, ok := stat.(*stats.DelayedPickComplete); ok {
6704+
delayedPickCompleteCount++
67056705
}
67066706
}
6707-
if pickerUpdatedCount != 1 {
6708-
t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2)
6707+
if got, want := delayedPickCompleteCount, 1; got != want {
6708+
t.Fatalf("sh.delayedPickComplete count: %v, want: %v", got, want)
67096709
}
67106710
}

test/retry_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte
578578
}
579579
func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
580580
// these calls come in nondeterministically - so can just ignore
581-
if _, ok := s.(*stats.PickerUpdated); ok {
581+
if _, ok := s.(*stats.DelayedPickComplete); ok {
582582
return
583583
}
584584
h.mu.Lock()

test/stats_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ func (s) TestPeerForClientStatsHandler(t *testing.T) {
4646
// * Begin stats lack peer info (RPC starts pre-resolution).
4747
// * PickerUpdated: no peer info (picker lacks transport details).
4848
expectedCallouts := map[stats.RPCStats]bool{
49-
&stats.OutPayload{}: true,
50-
&stats.InHeader{}: true,
51-
&stats.OutHeader{}: true,
52-
&stats.InTrailer{}: true,
53-
&stats.OutTrailer{}: true,
54-
&stats.End{}: true,
55-
&stats.Begin{}: false,
56-
&stats.PickerUpdated{}: false,
49+
&stats.OutPayload{}: true,
50+
&stats.InHeader{}: true,
51+
&stats.OutHeader{}: true,
52+
&stats.InTrailer{}: true,
53+
&stats.OutTrailer{}: true,
54+
&stats.End{}: true,
55+
&stats.Begin{}: false,
56+
&stats.DelayedPickComplete{}: false,
5757
}
5858

5959
// Start server.

0 commit comments

Comments
 (0)