@@ -527,139 +527,3 @@ func (s) TestWatchErrorsContainNodeID_ChannelCreationFailure(t *testing.T) {
527527 }
528528 }
529529}
530-
531- // TestUnsubscribeAndResubscribe tests the scenario where the client is busy
532- // processing a response (simulating a pending ACK at a higher level by holding
533- // the onDone callback from watchers). During this busy state, a resource is
534- // unsubscribed and then immediately resubscribed which causes the
535- // unsubscription and new subscription requests to be buffered due to flow
536- // control.
537- //
538- // The test verifies the following:
539- // - The resubscribed resource is served from the cache.
540- // - No "resource does not exist" error is generated for the resubscribed
541- // resource.
542- func (s ) TestRaceUnsubscribeResubscribe (t * testing.T ) {
543- ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
544- defer cancel ()
545-
546- mgmtServer := e2e .StartManagementServer (t , e2e.ManagementServerOptions {})
547- nodeID := uuid .New ().String ()
548-
549- resourceTypes := map [string ]xdsclient.ResourceType {xdsresource .V3ListenerURL : listenerType }
550- si := clients.ServerIdentifier {
551- ServerURI : mgmtServer .Address ,
552- Extensions : grpctransport.ServerIdentifierExtension {ConfigName : "insecure" },
553- }
554-
555- configs := map [string ]grpctransport.Config {"insecure" : {Credentials : insecure .NewBundle ()}}
556- xdsClientConfig := xdsclient.Config {
557- Servers : []xdsclient.ServerConfig {{ServerIdentifier : si }},
558- Node : clients.Node {ID : nodeID },
559- TransportBuilder : grpctransport .NewBuilder (configs ),
560- ResourceTypes : resourceTypes ,
561- // Xdstp resource names used in this test do not specify an
562- // authority. These will end up looking up an entry with the
563- // empty key in the authorities map. Having an entry with an
564- // empty key and empty configuration, results in these
565- // resources also using the top-level configuration.
566- Authorities : map [string ]xdsclient.Authority {
567- "" : {XDSServers : []xdsclient.ServerConfig {}},
568- },
569- }
570-
571- // Create an xDS client with the above config.
572- client , err := xdsclient .New (xdsClientConfig )
573- if err != nil {
574- t .Fatalf ("Failed to create xDS client: %v" , err )
575- }
576- defer client .Close ()
577-
578- const ldsResourceName1 = "test-listener-resource1"
579- const ldsResourceName2 = "test-listener-resource2"
580- const rdsName1 = "test-route-configuration-resource1"
581- const rdsName2 = "test-route-configuration-resource2"
582- listenerResource1 := e2e .DefaultClientListener (ldsResourceName1 , rdsName1 )
583- listenerResource2 := e2e .DefaultClientListener (ldsResourceName2 , rdsName2 )
584-
585- // Watch ldsResourceName1 with a regular watcher to ensure it's in cache
586- // and ACKed.
587- watcherInitial := newListenerWatcher ()
588- cancelInitial := client .WatchResource (xdsresource .V3ListenerURL , ldsResourceName1 , watcherInitial )
589- if err := mgmtServer .Update (ctx , e2e.UpdateOptions {NodeID : nodeID , Listeners : []* v3listenerpb.Listener {listenerResource1 }, SkipValidation : true }); err != nil {
590- t .Fatalf ("mgmtServer.Update() for %s failed: %v" , ldsResourceName1 , err )
591- }
592- if err := verifyListenerUpdate (ctx , watcherInitial .updateCh , listenerUpdateErrTuple {update : listenerUpdate {RouteConfigName : rdsName1 }}); err != nil {
593- t .Fatalf ("watcherR1Initial did not receive update for %s: %v" , ldsResourceName1 , err )
594- }
595- cancelInitial ()
596-
597- // Watch ldsResourceName1 and ldsResourceName2 using blocking watchers.
598- // - Server sends {ldsResourceName1, ldsResourceName2}.
599- // - Watchers for both resources get the update but we HOLD on to their
600- // onDone callbacks.
601- blockingWatcherR1 := newBLockingListenerWatcher ()
602- cancelR1 := client .WatchResource (xdsresource .V3ListenerURL , ldsResourceName1 , blockingWatcherR1 )
603- // defer cancelR1 later to create the race
604-
605- blockingWatcherR2 := newBLockingListenerWatcher ()
606- cancelR2 := client .WatchResource (xdsresource .V3ListenerURL , ldsResourceName2 , blockingWatcherR2 )
607- defer cancelR2 ()
608-
609- // Configure the listener resources on the management server.
610- resources := e2e.UpdateOptions {
611- NodeID : nodeID ,
612- Listeners : []* v3listenerpb.Listener {listenerResource1 , listenerResource2 },
613- SkipValidation : true }
614- if err := mgmtServer .Update (ctx , resources ); err != nil {
615- t .Fatalf ("mgmtServer.Update() for %s and %s failed: %v" , ldsResourceName1 , ldsResourceName2 , err )
616- }
617-
618- var onDoneR1 , onDoneR2 func ()
619- select {
620- case <- blockingWatcherR1 .updateCh :
621- onDoneR1 = <- blockingWatcherR1 .doneNotifierCh
622- case <- ctx .Done ():
623- t .Fatalf ("Timeout waiting for update for %s on blockingWatcherR1: %v" , ldsResourceName1 , ctx .Err ())
624- }
625- select {
626- case <- blockingWatcherR2 .updateCh :
627- onDoneR2 = <- blockingWatcherR2 .doneNotifierCh
628- case <- ctx .Done ():
629- t .Fatalf ("Timeout waiting for update for %s on blockingWatcherR2: %v" , ldsResourceName2 , ctx .Err ())
630- }
631-
632- // At this point, ACK for {listenerResource1,listenerResource2} has been
633- // sent by the client but s.fc.pending.Load() is true because onDoneR1 and
634- // onDoneR2 are held.
635- //
636- // Unsubscribe listenerResource1. This request should be buffered by
637- // adsStreamImpl because s.fc.pending.Load() is true.
638- cancelR1 ()
639-
640- // Resubscribe listenerResource1 with a new regular watcher, which should
641- // be served from cache.
642- watcherR1New := newListenerWatcher ()
643- cancelR1New := client .WatchResource (xdsresource .V3ListenerURL , ldsResourceName1 , watcherR1New )
644- defer cancelR1New ()
645-
646- if err := verifyListenerUpdate (ctx , watcherR1New .updateCh , listenerUpdateErrTuple {update : listenerUpdate {RouteConfigName : rdsName1 }}); err != nil {
647- t .Fatalf ("watcherR1New did not receive cached update for %s: %v" , ldsResourceName1 , err )
648- }
649-
650- // Release the onDone callbacks.
651- if onDoneR1 != nil { // onDoneR1 might be nil if cancelR1() completed very fast.
652- onDoneR1 ()
653- }
654- onDoneR2 ()
655-
656- // Verify watcherR1New does not get a "resource does not exist" error.
657- sCtx , sCancel := context .WithTimeout (ctx , defaultTestShortTimeout * 10 ) // Slightly longer to catch delayed errors
658- defer sCancel ()
659- if err := verifyNoListenerUpdate (sCtx , watcherR1New .resourceErrCh ); err != nil {
660- t .Fatalf ("watcherR1New received unexpected resource error for %s: %v" , ldsResourceName1 , err )
661- }
662- if err := verifyNoListenerUpdate (sCtx , watcherR1New .ambientErrCh ); err != nil {
663- t .Fatalf ("watcherR1New received unexpected ambient error for %s: %v" , ldsResourceName1 , err )
664- }
665- }
0 commit comments