@@ -25,6 +25,7 @@ import (
2525 "time"
2626
2727 "google.golang.org/grpc/internal"
28+ "google.golang.org/grpc/internal/backoff"
2829 "google.golang.org/grpc/internal/cache"
2930 "google.golang.org/grpc/internal/grpcsync"
3031 "google.golang.org/grpc/internal/xds/bootstrap"
@@ -53,16 +54,17 @@ const NameForServer = "#server"
5354// only when all references are released, and it is safe for the caller to
5455// invoke this close function multiple times.
5556func New (name string ) (XDSClient , func (), error ) {
56- return newRefCounted (name , defaultWatchExpiryTimeout , defaultIdleAuthorityDeleteTimeout )
57+ return newRefCounted (name , defaultWatchExpiryTimeout , defaultIdleAuthorityDeleteTimeout , backoff . DefaultExponential . Backoff )
5758}
5859
5960// newClientImpl returns a new xdsClient with the given config.
60- func newClientImpl (config * bootstrap.Config , watchExpiryTimeout time.Duration , idleAuthorityDeleteTimeout time.Duration ) (* clientImpl , error ) {
61+ func newClientImpl (config * bootstrap.Config , watchExpiryTimeout time.Duration , idleAuthorityDeleteTimeout time.Duration , streamBackoff func ( int ) time. Duration ) (* clientImpl , error ) {
6162 ctx , cancel := context .WithCancel (context .Background ())
6263 c := & clientImpl {
6364 done : grpcsync .NewEvent (),
6465 config : config ,
6566 watchExpiryTimeout : watchExpiryTimeout ,
67+ backoff : streamBackoff ,
6668 serializer : grpcsync .NewCallbackSerializer (ctx ),
6769 serializerClose : cancel ,
6870 resourceTypes : newResourceTypeRegistry (),
@@ -90,6 +92,11 @@ type OptionsForTesting struct {
9092 // AuthorityIdleTimeout is the timeout before idle authorities are deleted.
9193 // If unspecified, uses the default value used in non-test code.
9294 AuthorityIdleTimeout time.Duration
95+
96+ // StreamBackoffAfterFailure is the backoff function used to determine the
97+ // backoff duration after stream failures. If unspecified, uses the default
98+ // value used in non-test code.
99+ StreamBackoffAfterFailure func (int ) time.Duration
93100}
94101
95102// NewForTesting returns an xDS client configured with the provided options.
@@ -111,11 +118,14 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
111118 if opts .AuthorityIdleTimeout == 0 {
112119 opts .AuthorityIdleTimeout = defaultIdleAuthorityDeleteTimeout
113120 }
121+ if opts .StreamBackoffAfterFailure == nil {
122+ opts .StreamBackoffAfterFailure = defaultStreamBackoffFunc
123+ }
114124
115125 if err := bootstrap .SetFallbackBootstrapConfig (opts .Contents ); err != nil {
116126 return nil , nil , err
117127 }
118- client , cancel , err := newRefCounted (opts .Name , opts .WatchExpiryTimeout , opts .AuthorityIdleTimeout )
128+ client , cancel , err := newRefCounted (opts .Name , opts .WatchExpiryTimeout , opts .AuthorityIdleTimeout , opts . StreamBackoffAfterFailure )
119129 return client , func () { bootstrap .UnsetFallbackBootstrapConfigForTesting (); cancel () }, err
120130}
121131
0 commit comments