diff --git a/clientconn.go b/clientconn.go index cd3eaf8ddcbd..d2717fba501f 100644 --- a/clientconn.go +++ b/clientconn.go @@ -208,7 +208,7 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority) cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz) - cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) + cc.pickerWrapper = newPickerWrapper() cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers) @@ -1076,13 +1076,6 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { return cc.sc.healthCheckConfig } -func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) { - return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{ - Ctx: ctx, - FullMethodName: method, - }) -} - func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) { if sc == nil { // should never reach here. diff --git a/picker_wrapper.go b/picker_wrapper.go index a2d2a798d488..aa52bfe95fd8 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -29,7 +29,6 @@ import ( "google.golang.org/grpc/internal/channelz" istatus "google.golang.org/grpc/internal/status" "google.golang.org/grpc/internal/transport" - "google.golang.org/grpc/stats" "google.golang.org/grpc/status" ) @@ -48,14 +47,11 @@ type pickerGeneration struct { // actions and unblock when there's a picker update. type pickerWrapper struct { // If pickerGen holds a nil pointer, the pickerWrapper is closed. - pickerGen atomic.Pointer[pickerGeneration] - statsHandlers []stats.Handler // to record blocking picker calls + pickerGen atomic.Pointer[pickerGeneration] } -func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper { - pw := &pickerWrapper{ - statsHandlers: statsHandlers, - } +func newPickerWrapper() *pickerWrapper { + pw := &pickerWrapper{} pw.pickerGen.Store(&pickerGeneration{ blockingCh: make(chan struct{}), }) @@ -93,6 +89,12 @@ func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) { } } +type pick struct { + transport transport.ClientTransport // the selected transport + result balancer.PickResult // the contents of the pick from the LB policy + blocked bool // set if a picker call queued for a new picker +} + // pick returns the transport that will be used for the RPC. // It may block in the following cases: // - there's no picker @@ -100,15 +102,16 @@ func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) { // - the current picker returns other errors and failfast is false. // - the subConn returned by the current picker is not READY // When one of these situations happens, pick blocks until the picker gets updated. -func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) { +func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (pick, error) { var ch chan struct{} var lastPickErr error + pickBlocked := false for { pg := pw.pickerGen.Load() if pg == nil { - return nil, balancer.PickResult{}, ErrClientConnClosing + return pick{}, ErrClientConnClosing } if pg.picker == nil { ch = pg.blockingCh @@ -127,9 +130,9 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. } switch ctx.Err() { case context.DeadlineExceeded: - return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr) + return pick{}, status.Error(codes.DeadlineExceeded, errStr) case context.Canceled: - return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr) + return pick{}, status.Error(codes.Canceled, errStr) } case <-ch: } @@ -145,9 +148,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. // In the second case, the only way it will get to this conditional is // if there is a new picker. if ch != nil { - for _, sh := range pw.statsHandlers { - sh.HandleRPC(ctx, &stats.PickerUpdated{}) - } + pickBlocked = true } ch = pg.blockingCh @@ -164,7 +165,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. if istatus.IsRestrictedControlPlaneCode(st) { err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err) } - return nil, balancer.PickResult{}, dropError{error: err} + return pick{}, dropError{error: err} } // For all other errors, wait for ready RPCs should block and other // RPCs should fail with unavailable. @@ -172,7 +173,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. lastPickErr = err continue } - return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error()) + return pick{}, status.Error(codes.Unavailable, err.Error()) } acbw, ok := pickResult.SubConn.(*acBalancerWrapper) @@ -183,9 +184,8 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. if t := acbw.ac.getReadyTransport(); t != nil { if channelz.IsOn() { doneChannelzWrapper(acbw, &pickResult) - return t, pickResult, nil } - return t, pickResult, nil + return pick{transport: t, result: pickResult, blocked: pickBlocked}, nil } if pickResult.Done != nil { // Calling done with nil error, no bytes sent and no bytes received. diff --git a/picker_wrapper_test.go b/picker_wrapper_test.go index 20004d08b3d1..f20231a6727d 100644 --- a/picker_wrapper_test.go +++ b/picker_wrapper_test.go @@ -67,16 +67,16 @@ func (p *testingPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { } func (s) TestBlockingPickTimeout(t *testing.T) { - bp := newPickerWrapper(nil) + bp := newPickerWrapper() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded { + if _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded { t.Errorf("bp.pick returned error %v, want DeadlineExceeded", err) } } func (s) TestBlockingPick(t *testing.T) { - bp := newPickerWrapper(nil) + bp := newPickerWrapper() // All goroutines should block because picker is nil in bp. var finishedCount uint64 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -85,8 +85,8 @@ func (s) TestBlockingPick(t *testing.T) { wg.Add(goroutineCount) for i := goroutineCount; i > 0; i-- { go func() { - if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT { - t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT) + if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT { + t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT) } atomic.AddUint64(&finishedCount, 1) wg.Done() @@ -102,7 +102,7 @@ func (s) TestBlockingPick(t *testing.T) { } func (s) TestBlockingPickNoSubAvailable(t *testing.T) { - bp := newPickerWrapper(nil) + bp := newPickerWrapper() var finishedCount uint64 bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -112,8 +112,8 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) { wg.Add(goroutineCount) for i := goroutineCount; i > 0; i-- { go func() { - if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT { - t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT) + if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT { + t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT) } atomic.AddUint64(&finishedCount, 1) wg.Done() @@ -129,7 +129,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) { } func (s) TestBlockingPickTransientWaitforready(t *testing.T) { - bp := newPickerWrapper(nil) + bp := newPickerWrapper() bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount}) var finishedCount uint64 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -140,8 +140,8 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) { wg.Add(goroutineCount) for i := goroutineCount; i > 0; i-- { go func() { - if tr, _, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || tr != testT { - t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT) + if pick, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || pick.transport != testT { + t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT) } atomic.AddUint64(&finishedCount, 1) wg.Done() @@ -157,7 +157,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) { } func (s) TestBlockingPickSCNotReady(t *testing.T) { - bp := newPickerWrapper(nil) + bp := newPickerWrapper() bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount}) var finishedCount uint64 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -167,8 +167,8 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) { wg.Add(goroutineCount) for i := goroutineCount; i > 0; i-- { go func() { - if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT { - t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT) + if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT { + t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT) } atomic.AddUint64(&finishedCount, 1) wg.Done() diff --git a/scripts/vet.sh b/scripts/vet.sh index e3b77dd08c0c..18c4085fce54 100755 --- a/scripts/vet.sh +++ b/scripts/vet.sh @@ -179,6 +179,7 @@ NewSubConn is deprecated: OverrideServerName is deprecated: RemoveSubConn is deprecated: SecurityVersion is deprecated: +stats.PickerUpdated is deprecated: Target is deprecated: Use the Target field in the BuildOptions instead. UpdateAddresses is deprecated: UpdateSubConnState is deprecated: diff --git a/stats/opentelemetry/trace.go b/stats/opentelemetry/trace.go index efafdd0756eb..40ac7a1b6ef5 100644 --- a/stats/opentelemetry/trace.go +++ b/stats/opentelemetry/trace.go @@ -52,7 +52,7 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) { ) // increment previous rpc attempts applicable for next attempt atomic.AddUint32(&ai.previousRPCAttempts, 1) - case *stats.PickerUpdated: + case *stats.DelayedPickComplete: span.AddEvent("Delayed LB pick complete") case *stats.InPayload: // message id - "must be calculated as two different counters starting diff --git a/stats/stats.go b/stats/stats.go index baf7740efba9..10bf998aa5be 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -64,15 +64,21 @@ func (s *Begin) IsClient() bool { return s.Client } func (s *Begin) isRPCStats() {} -// PickerUpdated indicates that the LB policy provided a new picker while the -// RPC was waiting for one. -type PickerUpdated struct{} +// DelayedPickComplete indicates that the RPC is unblocked following a delay in +// selecting a connection for the call. +type DelayedPickComplete struct{} -// IsClient indicates if the stats information is from client side. Only Client -// Side interfaces with a Picker, thus always returns true. -func (*PickerUpdated) IsClient() bool { return true } +// IsClient indicates DelayedPickComplete is available on the client. +func (*DelayedPickComplete) IsClient() bool { return true } -func (*PickerUpdated) isRPCStats() {} +func (*DelayedPickComplete) isRPCStats() {} + +// PickerUpdated indicates that the RPC is unblocked following a delay in +// selecting a connection for the call. +// +// Deprecated: will be removed in a future release; use DelayedPickComplete +// instead. +type PickerUpdated = DelayedPickComplete // InPayload contains stats about an incoming payload. type InPayload struct { diff --git a/stream.go b/stream.go index ca6948926f93..5e7df5ffcf00 100644 --- a/stream.go +++ b/stream.go @@ -469,8 +469,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) func (a *csAttempt) getTransport() error { cs := a.cs - var err error - a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method) + pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method} + pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo) + a.transport, a.pickResult = pick.transport, pick.result if err != nil { if de, ok := err.(dropError); ok { err = de.error @@ -481,6 +482,11 @@ func (a *csAttempt) getTransport() error { if a.trInfo != nil { a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr()) } + if pick.blocked { + for _, sh := range a.statsHandlers { + sh.HandleRPC(a.ctx, &stats.DelayedPickComplete{}) + } + } return nil } diff --git a/test/end2end_test.go b/test/end2end_test.go index ad80c9c5a1af..d4d489ffc236 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -6649,13 +6649,13 @@ func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) { t.Fatalf("Unexpected error from UnaryCall: %v", err) } - var pickerUpdatedCount uint + var delayedPickCompleteCount int for _, stat := range sh.s { - if _, ok := stat.(*stats.PickerUpdated); ok { - pickerUpdatedCount++ + if _, ok := stat.(*stats.DelayedPickComplete); ok { + delayedPickCompleteCount++ } } - if pickerUpdatedCount != 1 { - t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2) + if got, want := delayedPickCompleteCount, 1; got != want { + t.Fatalf("sh.delayedPickComplete count: %v, want: %v", got, want) } } diff --git a/test/retry_test.go b/test/retry_test.go index a1173f576043..72d79a576f0c 100644 --- a/test/retry_test.go +++ b/test/retry_test.go @@ -578,7 +578,7 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte } func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) { // these calls come in nondeterministically - so can just ignore - if _, ok := s.(*stats.PickerUpdated); ok { + if _, ok := s.(*stats.DelayedPickComplete); ok { return } h.mu.Lock() diff --git a/test/stats_test.go b/test/stats_test.go index 5ee00bb37a2a..28156ed70fc9 100644 --- a/test/stats_test.go +++ b/test/stats_test.go @@ -46,14 +46,14 @@ func (s) TestPeerForClientStatsHandler(t *testing.T) { // * Begin stats lack peer info (RPC starts pre-resolution). // * PickerUpdated: no peer info (picker lacks transport details). expectedCallouts := map[stats.RPCStats]bool{ - &stats.OutPayload{}: true, - &stats.InHeader{}: true, - &stats.OutHeader{}: true, - &stats.InTrailer{}: true, - &stats.OutTrailer{}: true, - &stats.End{}: true, - &stats.Begin{}: false, - &stats.PickerUpdated{}: false, + &stats.OutPayload{}: true, + &stats.InHeader{}: true, + &stats.OutHeader{}: true, + &stats.InTrailer{}: true, + &stats.OutTrailer{}: true, + &stats.End{}: true, + &stats.Begin{}: false, + &stats.DelayedPickComplete{}: false, } // Start server.