diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 27abb64ef6d5..04bd278d2c47 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -641,6 +641,11 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w resource := state.cache a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) }) } + // If the metadata field is updated to indicate that the management + // server does not have this resource, notify the new watcher. + if state.md.Status == xdsresource.ServiceStatusNotExist { + a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(func() {}) }) + } cleanup = a.unwatchResource(rType, resourceName, watcher) }, func() { if a.logger.V(2) { diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index 2ea2c50ce18b..38e1f1760383 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -871,6 +871,87 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { } } +// TestLDSWatch_NewWatcherForRemovedResource covers the case where a new +// watcher registers for a resource that has been removed. The test verifies +// the following scenarios: +// 1. When a resource is deleted by the management server, any active +// watchers of that resource should be notified with a "resource removed" +// error through their watch callback. +// 2. If a new watcher attempts to register for a resource that has already +// been deleted, its watch callback should be immediately invoked with a +// "resource removed" error. +func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) { + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + nodeID := uuid.New().String() + bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + // Create an xDS client with the above bootstrap contents. + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: t.Name(), + Contents: bc, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Register watch for the listener resource and have the + // callbacks push the received updates on to a channel. + lw1 := newListenerWatcher() + ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1) + defer ldsCancel1() + + // Configure the management server to return listener resource, + // corresponding to the registered watch. + resource := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resource); err != nil { + t.Fatalf("Failed to update management server with resource: %v, err: %v", resource, err) + } + + // Verify the contents of the received update for existing watch. + wantUpdate := listenerUpdateErrTuple{ + update: xdsresource.ListenerUpdate{ + RouteConfigName: rdsName, + HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}, + }, + } + if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Remove the listener resource on the management server. + resource = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resource); err != nil { + t.Fatalf("Failed to update management server with resource: %v, err: %v", resource, err) + } + + // The existing watcher should receive a resource removed error. + updateError := listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")} + if err := verifyListenerUpdate(ctx, lw1.updateCh, updateError); err != nil { + t.Fatal(err) + } + + // New watchers attempting to register for a deleted resource should also + // receive a "resource removed" error. + lw2 := newListenerWatcher() + ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2) + defer ldsCancel2() + if err := verifyListenerUpdate(ctx, lw2.updateCh, updateError); err != nil { + t.Fatal(err) + } +} + // TestLDSWatch_NACKError covers the case where an update from the management // server is NACK'ed by the xdsclient. The test verifies that the error is // propagated to the watcher.