Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,13 +1076,6 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
return cc.sc.healthCheckConfig
}

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
}

func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
if sc == nil {
// should never reach here.
Expand Down
33 changes: 17 additions & 16 deletions picker_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,11 @@ type pickerGeneration struct {
// actions and unblock when there's a picker update.
type pickerWrapper struct {
// If pickerGen holds a nil pointer, the pickerWrapper is closed.
pickerGen atomic.Pointer[pickerGeneration]
statsHandlers []stats.Handler // to record blocking picker calls
pickerGen atomic.Pointer[pickerGeneration]
}

func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
pw := &pickerWrapper{
statsHandlers: statsHandlers,
}
pw := &pickerWrapper{}
pw.pickerGen.Store(&pickerGeneration{
blockingCh: make(chan struct{}),
})
Expand Down Expand Up @@ -93,22 +90,29 @@ func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
}
}

type pick struct {
transport transport.ClientTransport // the selected transport
result balancer.PickResult // the contents of the pick from the LB policy
blocked bool // set if a picker call queued for a new picker
}

// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
// - the current picker returns ErrNoSubConnAvailable
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (pick, error) {
var ch chan struct{}

var lastPickErr error
pickBlocked := false

for {
pg := pw.pickerGen.Load()
if pg == nil {
return nil, balancer.PickResult{}, ErrClientConnClosing
return pick{}, ErrClientConnClosing
}
if pg.picker == nil {
ch = pg.blockingCh
Expand All @@ -127,9 +131,9 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}
switch ctx.Err() {
case context.DeadlineExceeded:
return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
return pick{}, status.Error(codes.DeadlineExceeded, errStr)
case context.Canceled:
return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
return pick{}, status.Error(codes.Canceled, errStr)
}
case <-ch:
}
Expand All @@ -145,9 +149,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
// In the second case, the only way it will get to this conditional is
// if there is a new picker.
if ch != nil {
for _, sh := range pw.statsHandlers {
sh.HandleRPC(ctx, &stats.PickerUpdated{})
}
pickBlocked = true
}

ch = pg.blockingCh
Expand All @@ -164,15 +166,15 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
}
return nil, balancer.PickResult{}, dropError{error: err}
return pick{}, dropError{error: err}
}
// For all other errors, wait for ready RPCs should block and other
// RPCs should fail with unavailable.
if !failfast {
lastPickErr = err
continue
}
return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
return pick{}, status.Error(codes.Unavailable, err.Error())
}

acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
Expand All @@ -183,9 +185,8 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if t := acbw.ac.getReadyTransport(); t != nil {
if channelz.IsOn() {
doneChannelzWrapper(acbw, &pickResult)
return t, pickResult, nil
}
return t, pickResult, nil
return pick{transport: t, result: pickResult, blocked: pickBlocked}, nil
}
if pickResult.Done != nil {
// Calling done with nil error, no bytes sent and no bytes received.
Expand Down
18 changes: 9 additions & 9 deletions picker_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s) TestBlockingPickTimeout(t *testing.T) {
bp := newPickerWrapper(nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
if _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
t.Errorf("bp.pick returned error %v, want DeadlineExceeded", err)
}
}
Expand All @@ -85,8 +85,8 @@ func (s) TestBlockingPick(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
Expand All @@ -112,8 +112,8 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
Expand All @@ -140,8 +140,8 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
Expand All @@ -167,8 +167,8 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion stats/opentelemetry/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) {
)
// increment previous rpc attempts applicable for next attempt
atomic.AddUint32(&ai.previousRPCAttempts, 1)
case *stats.PickerUpdated:
case *stats.DelayedPickComplete:
span.AddEvent("Delayed LB pick complete")
case *stats.InPayload:
// message id - "must be calculated as two different counters starting
Expand Down
19 changes: 12 additions & 7 deletions stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,20 @@ func (s *Begin) IsClient() bool { return s.Client }

func (s *Begin) isRPCStats() {}

// PickerUpdated indicates that the LB policy provided a new picker while the
// RPC was waiting for one.
type PickerUpdated struct{}
// DelayedPickComplete indicates that the RPC is unblocked following a delay in
// selecting a connection for the call.
type DelayedPickComplete struct{}

// IsClient indicates if the stats information is from client side. Only Client
// Side interfaces with a Picker, thus always returns true.
func (*PickerUpdated) IsClient() bool { return true }
// IsClient indicates DelayedPickComplete is available on the client.
func (*DelayedPickComplete) IsClient() bool { return true }

func (*PickerUpdated) isRPCStats() {}
func (*DelayedPickComplete) isRPCStats() {}

// PickerUpdated indicates that the RPC is unblocked following a delay in
// selecting a connection for the call.
//
// Deprecated: will be removed in a future release.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also specify that DelayedPickComplete should be used instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Related: I think technically we're not supposed to put "Deprecated" on this until after DelayedPickComplete exists for a release. But since this package is experimental, I think we can skip that and call it deprecated immediately.

type PickerUpdated = DelayedPickComplete

// InPayload contains stats about an incoming payload.
type InPayload struct {
Expand Down
10 changes: 8 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
func (a *csAttempt) getTransport() error {
cs := a.cs

var err error
a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
a.transport, a.pickResult = pick.transport, pick.result
if err != nil {
if de, ok := err.(dropError); ok {
err = de.error
Expand All @@ -481,6 +482,11 @@ func (a *csAttempt) getTransport() error {
if a.trInfo != nil {
a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
}
if pick.blocked {
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also call stats.PickerUpdated to keep the previous behaviour until PickerUpdated is removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a type alias so the stats handler will get one event that matches both of the events.

}
}
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6649,13 +6649,13 @@ func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) {
t.Fatalf("Unexpected error from UnaryCall: %v", err)
}

var pickerUpdatedCount uint
var delayedPickCompleteCount uint
for _, stat := range sh.s {
if _, ok := stat.(*stats.PickerUpdated); ok {
pickerUpdatedCount++
if _, ok := stat.(*stats.DelayedPickComplete); ok {
delayedPickCompleteCount++
}
}
if pickerUpdatedCount != 1 {
t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2)
if delayedPickCompleteCount != 1 {
t.Fatalf("sh.delayedPickComplete count: %v, want: %v", delayedPickCompleteCount, 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We compare delayedPickCompleteCount to 1, but the log mentions 2. I think the log message needs to be fixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
}
2 changes: 1 addition & 1 deletion test/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte
}
func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
// these calls come in nondeterministically - so can just ignore
if _, ok := s.(*stats.PickerUpdated); ok {
if _, ok := s.(*stats.DelayedPickComplete); ok {
return
}
h.mu.Lock()
Expand Down
16 changes: 8 additions & 8 deletions test/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func (s) TestPeerForClientStatsHandler(t *testing.T) {
// * Begin stats lack peer info (RPC starts pre-resolution).
// * PickerUpdated: no peer info (picker lacks transport details).
expectedCallouts := map[stats.RPCStats]bool{
&stats.OutPayload{}: true,
&stats.InHeader{}: true,
&stats.OutHeader{}: true,
&stats.InTrailer{}: true,
&stats.OutTrailer{}: true,
&stats.End{}: true,
&stats.Begin{}: false,
&stats.PickerUpdated{}: false,
&stats.OutPayload{}: true,
&stats.InHeader{}: true,
&stats.OutHeader{}: true,
&stats.InTrailer{}: true,
&stats.OutTrailer{}: true,
&stats.End{}: true,
&stats.Begin{}: false,
&stats.DelayedPickComplete{}: false,
}

// Start server.
Expand Down
Loading