Skip to content

Commit 87f0254

Browse files
authored
xdsclient: fix new watcher hang when registering for removed resource (#7853)
1 parent c63aeef commit 87f0254

File tree

2 files changed

+86
-0
lines changed

2 files changed

+86
-0
lines changed

xds/internal/xdsclient/authority.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,11 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
641641
resource := state.cache
642642
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
643643
}
644+
// If the metadata field is updated to indicate that the management
645+
// server does not have this resource, notify the new watcher.
646+
if state.md.Status == xdsresource.ServiceStatusNotExist {
647+
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(func() {}) })
648+
}
644649
cleanup = a.unwatchResource(rType, resourceName, watcher)
645650
}, func() {
646651
if a.logger.V(2) {

xds/internal/xdsclient/tests/lds_watchers_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,87 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
871871
}
872872
}
873873

874+
// TestLDSWatch_NewWatcherForRemovedResource covers the case where a new
875+
// watcher registers for a resource that has been removed. The test verifies
876+
// the following scenarios:
877+
// 1. When a resource is deleted by the management server, any active
878+
// watchers of that resource should be notified with a "resource removed"
879+
// error through their watch callback.
880+
// 2. If a new watcher attempts to register for a resource that has already
881+
// been deleted, its watch callback should be immediately invoked with a
882+
// "resource removed" error.
883+
func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) {
884+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
885+
886+
nodeID := uuid.New().String()
887+
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
888+
889+
// Create an xDS client with the above bootstrap contents.
890+
client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
891+
Name: t.Name(),
892+
Contents: bc,
893+
})
894+
if err != nil {
895+
t.Fatalf("Failed to create xDS client: %v", err)
896+
}
897+
defer close()
898+
899+
// Register watch for the listener resource and have the
900+
// callbacks push the received updates on to a channel.
901+
lw1 := newListenerWatcher()
902+
ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1)
903+
defer ldsCancel1()
904+
905+
// Configure the management server to return listener resource,
906+
// corresponding to the registered watch.
907+
resource := e2e.UpdateOptions{
908+
NodeID: nodeID,
909+
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
910+
SkipValidation: true,
911+
}
912+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
913+
defer cancel()
914+
if err := mgmtServer.Update(ctx, resource); err != nil {
915+
t.Fatalf("Failed to update management server with resource: %v, err: %v", resource, err)
916+
}
917+
918+
// Verify the contents of the received update for existing watch.
919+
wantUpdate := listenerUpdateErrTuple{
920+
update: xdsresource.ListenerUpdate{
921+
RouteConfigName: rdsName,
922+
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
923+
},
924+
}
925+
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
926+
t.Fatal(err)
927+
}
928+
929+
// Remove the listener resource on the management server.
930+
resource = e2e.UpdateOptions{
931+
NodeID: nodeID,
932+
Listeners: []*v3listenerpb.Listener{},
933+
SkipValidation: true,
934+
}
935+
if err := mgmtServer.Update(ctx, resource); err != nil {
936+
t.Fatalf("Failed to update management server with resource: %v, err: %v", resource, err)
937+
}
938+
939+
// The existing watcher should receive a resource removed error.
940+
updateError := listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}
941+
if err := verifyListenerUpdate(ctx, lw1.updateCh, updateError); err != nil {
942+
t.Fatal(err)
943+
}
944+
945+
// New watchers attempting to register for a deleted resource should also
946+
// receive a "resource removed" error.
947+
lw2 := newListenerWatcher()
948+
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
949+
defer ldsCancel2()
950+
if err := verifyListenerUpdate(ctx, lw2.updateCh, updateError); err != nil {
951+
t.Fatal(err)
952+
}
953+
}
954+
874955
// TestLDSWatch_NACKError covers the case where an update from the management
875956
// server is NACK'ed by the xdsclient. The test verifies that the error is
876957
// propagated to the watcher.

0 commit comments

Comments
 (0)