Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
resource := state.cache
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
}
// If the metadata field is updated to indicate that the management
// server does not have this resource, notify the new watcher.
if state.md.Status == xdsresource.ServiceStatusNotExist {
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnResourceDoesNotExist(func() {}) })
}
cleanup = a.unwatchResource(rType, resourceName, watcher)
}, func() {
if a.logger.V(2) {
Expand Down
81 changes: 81 additions & 0 deletions xds/internal/xdsclient/tests/lds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,87 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
}
}

// TestLDSWatch_NewWatcherForRemovedResource covers the case where a new
// watcher registers for a resource that has been removed. The test verifies
// the following scenarios:
// 1. When a resource is deleted by the management server, any active
// watchers of that resource should be notified with a "resource removed"
// error through their watch callback.
// 2. If a new watcher attempts to register for a resource that has already
// been deleted, its watch callback should be immediately invoked with a
// "resource removed" error.
func (s) TestLDSWatch_NewWatcherForRemovedResource(t *testing.T) {
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})

nodeID := uuid.New().String()
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)

// Create an xDS client with the above bootstrap contents.
client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{
Name: t.Name(),
Contents: bc,
})
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer close()

// Register watch for the listener resource and have the
// callbacks push the received updates on to a channel.
lw1 := newListenerWatcher()
ldsCancel1 := xdsresource.WatchListener(client, ldsName, lw1)
defer ldsCancel1()

// Configure the management server to return listener resource,
// corresponding to the registered watch.
resource := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resource); err != nil {
t.Fatalf("Failed to update management server with resource: %v, err: %v", resource, err)
}

// Verify the contents of the received update for existing watch.
wantUpdate := listenerUpdateErrTuple{
update: xdsresource.ListenerUpdate{
RouteConfigName: rdsName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}},
},
}
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Remove the listener resource on the management server.
resource = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resource); err != nil {
t.Fatalf("Failed to update management server with resource: %v, err: %v", resource, err)
}

// The existing watcher should receive a resource removed error.
updateError := listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "")}
if err := verifyListenerUpdate(ctx, lw1.updateCh, updateError); err != nil {
t.Fatal(err)
}

// New watchers attempting to register for a deleted resource should also
// receive a "resource removed" error.
lw2 := newListenerWatcher()
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
defer ldsCancel2()
if err := verifyListenerUpdate(ctx, lw2.updateCh, updateError); err != nil {
t.Fatal(err)
}
}

// TestLDSWatch_NACKError covers the case where an update from the management
// server is NACK'ed by the xdsclient. The test verifies that the error is
// propagated to the watcher.
Expand Down