Skip to content

Commit 03da31a

Browse files
authored
client: implement maxAttempts for retryPolicy (#7229)
1 parent f7d3d3e commit 03da31a

File tree

6 files changed

+129
-13
lines changed

6 files changed

+129
-13
lines changed

clientconn.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
170170
}
171171

172172
if cc.dopts.defaultServiceConfigRawJSON != nil {
173-
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
173+
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
174174
if scpr.Err != nil {
175175
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
176176
}
@@ -693,7 +693,7 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
693693
var emptyServiceConfig *ServiceConfig
694694

695695
func init() {
696-
cfg := parseServiceConfig("{}")
696+
cfg := parseServiceConfig("{}", defaultMaxCallAttempts)
697697
if cfg.Err != nil {
698698
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
699699
}

dialoptions.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ import (
3737
"google.golang.org/grpc/stats"
3838
)
3939

40+
const (
41+
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#limits-on-retries-and-hedges
42+
defaultMaxCallAttempts = 5
43+
)
44+
4045
func init() {
4146
internal.AddGlobalDialOptions = func(opt ...DialOption) {
4247
globalDialOptions = append(globalDialOptions, opt...)
@@ -89,6 +94,7 @@ type dialOptions struct {
8994
idleTimeout time.Duration
9095
recvBufferPool SharedBufferPool
9196
defaultScheme string
97+
maxCallAttempts int
9298
}
9399

94100
// DialOption configures how we set up the connection.
@@ -677,6 +683,7 @@ func defaultDialOptions() dialOptions {
677683
idleTimeout: 30 * time.Minute,
678684
recvBufferPool: nopBufferPool{},
679685
defaultScheme: "dns",
686+
maxCallAttempts: defaultMaxCallAttempts,
680687
}
681688
}
682689

@@ -734,6 +741,23 @@ func WithIdleTimeout(d time.Duration) DialOption {
734741
})
735742
}
736743

744+
// WithMaxCallAttempts returns a DialOption that configures the maximum number
745+
// of attempts per call (including retries and hedging) using the channel.
746+
// Service owners may specify a higher value for these parameters, but higher
747+
// values will be treated as equal to the maximum value by the client
748+
// implementation. This mitigates security concerns related to the service
749+
// config being transferred to the client via DNS.
750+
//
751+
// A value of 5 will be used if this dial option is not set or n < 2.
752+
func WithMaxCallAttempts(n int) DialOption {
753+
return newFuncDialOption(func(o *dialOptions) {
754+
if n < 2 {
755+
n = defaultMaxCallAttempts
756+
}
757+
o.maxCallAttempts = n
758+
})
759+
}
760+
737761
// WithRecvBufferPool returns a DialOption that configures the ClientConn
738762
// to use the provided shared buffer pool for parsing incoming messages. Depending
739763
// on the application's workload, this could result in reduced memory allocation.

resolver_wrapper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
171171
// ParseServiceConfig is called by resolver implementations to parse a JSON
172172
// representation of the service config.
173173
func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
174-
return parseServiceConfig(scJSON)
174+
return parseServiceConfig(scJSON, ccr.cc.dopts.maxCallAttempts)
175175
}
176176

177177
// addChannelzTraceEvent adds a channelz trace event containing the new

service_config.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,11 @@ type jsonSC struct {
164164
}
165165

166166
func init() {
167-
internal.ParseServiceConfig = parseServiceConfig
167+
internal.ParseServiceConfig = func(js string) *serviceconfig.ParseResult {
168+
return parseServiceConfig(js, defaultMaxCallAttempts)
169+
}
168170
}
169-
func parseServiceConfig(js string) *serviceconfig.ParseResult {
171+
func parseServiceConfig(js string, maxAttempts int) *serviceconfig.ParseResult {
170172
if len(js) == 0 {
171173
return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")}
172174
}
@@ -219,7 +221,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
219221
WaitForReady: m.WaitForReady,
220222
Timeout: (*time.Duration)(m.Timeout),
221223
}
222-
if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {
224+
if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy, maxAttempts); err != nil {
223225
logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
224226
return &serviceconfig.ParseResult{Err: err}
225227
}
@@ -265,7 +267,7 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {
265267
return &serviceconfig.ParseResult{Config: &sc}
266268
}
267269

268-
func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPolicy, err error) {
270+
func convertRetryPolicy(jrp *jsonRetryPolicy, maxAttempts int) (p *internalserviceconfig.RetryPolicy, err error) {
269271
if jrp == nil {
270272
return nil, nil
271273
}
@@ -279,17 +281,16 @@ func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPol
279281
return nil, nil
280282
}
281283

284+
if jrp.MaxAttempts < maxAttempts {
285+
maxAttempts = jrp.MaxAttempts
286+
}
282287
rp := &internalserviceconfig.RetryPolicy{
283-
MaxAttempts: jrp.MaxAttempts,
288+
MaxAttempts: maxAttempts,
284289
InitialBackoff: time.Duration(jrp.InitialBackoff),
285290
MaxBackoff: time.Duration(jrp.MaxBackoff),
286291
BackoffMultiplier: jrp.BackoffMultiplier,
287292
RetryableStatusCodes: make(map[codes.Code]bool),
288293
}
289-
if rp.MaxAttempts > 5 {
290-
// TODO(retry): Make the max maxAttempts configurable.
291-
rp.MaxAttempts = 5
292-
}
293294
for _, code := range jrp.RetryableStatusCodes {
294295
rp.RetryableStatusCodes[code] = true
295296
}

service_config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func runParseTests(t *testing.T, testCases []parseTestCase) {
6060
t.Helper()
6161
for i, c := range testCases {
6262
t.Run(fmt.Sprint(i), func(t *testing.T) {
63-
scpr := parseServiceConfig(c.scjs)
63+
scpr := parseServiceConfig(c.scjs, defaultMaxCallAttempts)
6464
var sc *ServiceConfig
6565
sc, _ = scpr.Config.(*ServiceConfig)
6666
if !c.wantErr {

test/retry_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,97 @@ func (s) TestRetryStreaming(t *testing.T) {
477477
}
478478
}
479479

480+
func (s) TestMaxCallAttempts(t *testing.T) {
481+
testCases := []struct {
482+
serviceMaxAttempts int
483+
clientMaxAttempts int
484+
expectedAttempts int
485+
}{
486+
{serviceMaxAttempts: 9, clientMaxAttempts: 4, expectedAttempts: 4},
487+
{serviceMaxAttempts: 9, clientMaxAttempts: 7, expectedAttempts: 7},
488+
{serviceMaxAttempts: 3, clientMaxAttempts: 10, expectedAttempts: 3},
489+
{serviceMaxAttempts: 8, clientMaxAttempts: -1, expectedAttempts: 5}, // 5 is default max
490+
{serviceMaxAttempts: 3, clientMaxAttempts: 0, expectedAttempts: 3},
491+
}
492+
493+
for _, tc := range testCases {
494+
clientOpts := []grpc.DialOption{
495+
grpc.WithMaxCallAttempts(tc.clientMaxAttempts),
496+
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{
497+
"methodConfig": [{
498+
"name": [{"service": "grpc.testing.TestService"}],
499+
"waitForReady": true,
500+
"retryPolicy": {
501+
"MaxAttempts": %d,
502+
"InitialBackoff": ".01s",
503+
"MaxBackoff": ".01s",
504+
"BackoffMultiplier": 1.0,
505+
"RetryableStatusCodes": [ "UNAVAILABLE" ]
506+
}
507+
}]}`, tc.serviceMaxAttempts),
508+
),
509+
}
510+
511+
streamCallCount := 0
512+
unaryCallCount := 0
513+
514+
ss := &stubserver.StubServer{
515+
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
516+
streamCallCount++
517+
return status.New(codes.Unavailable, "this is a test error").Err()
518+
},
519+
EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) {
520+
unaryCallCount++
521+
return nil, status.New(codes.Unavailable, "this is a test error").Err()
522+
},
523+
}
524+
525+
func() {
526+
527+
if err := ss.Start([]grpc.ServerOption{}, clientOpts...); err != nil {
528+
t.Fatalf("Error starting endpoint server: %v", err)
529+
}
530+
defer ss.Stop()
531+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
532+
defer cancel()
533+
534+
for {
535+
if ctx.Err() != nil {
536+
t.Fatalf("Timed out waiting for service config update")
537+
}
538+
if ss.CC.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
539+
break
540+
}
541+
time.Sleep(time.Millisecond)
542+
}
543+
544+
// Test streaming RPC
545+
stream, err := ss.Client.FullDuplexCall(ctx)
546+
if err != nil {
547+
t.Fatalf("Error while creating stream: %v", err)
548+
}
549+
if got, err := stream.Recv(); err == nil {
550+
t.Fatalf("client: Recv() = %s, %v; want <nil>, error", got, err)
551+
} else if status.Code(err) != codes.Unavailable {
552+
t.Fatalf("client: Recv() = _, %v; want _, Unavailable", err)
553+
}
554+
if streamCallCount != tc.expectedAttempts {
555+
t.Fatalf("stream expectedAttempts = %v; want %v", streamCallCount, tc.expectedAttempts)
556+
}
557+
558+
// Test unary RPC
559+
if ugot, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err == nil {
560+
t.Fatalf("client: EmptyCall() = %s, %v; want <nil>, error", ugot, err)
561+
} else if status.Code(err) != codes.Unavailable {
562+
t.Fatalf("client: EmptyCall() = _, %v; want _, Unavailable", err)
563+
}
564+
if unaryCallCount != tc.expectedAttempts {
565+
t.Fatalf("unary expectedAttempts = %v; want %v", unaryCallCount, tc.expectedAttempts)
566+
}
567+
}()
568+
}
569+
}
570+
480571
type retryStatsHandler struct {
481572
mu sync.Mutex
482573
s []stats.RPCStats

0 commit comments

Comments
 (0)