Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions xds/internal/clients/xdsclient/ads_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type adsStreamEventHandler interface {
onStreamError(error) // Called when the ADS stream breaks.
onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource.
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
onRequest(typeURL string) // Called when a request is about to be sent on the ADS stream.
}

// state corresponding to a resource type.
Expand Down Expand Up @@ -444,6 +445,11 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
}
}

// Call the event handler to remove unsubscribed cache entries. It is to
// ensure the cache entries are deleted even if discovery request fails. In
// case of failure when the stream restarts, nonce is reset anyways.
s.eventHandler.onRequest(url)

msg, err := proto.Marshal(req)
if err != nil {
s.logger.Warningf("Failed to marshal DiscoveryRequest: %v", err)
Expand All @@ -460,6 +466,7 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
} else if s.logger.V(2) {
s.logger.Warningf("ADS request sent for type %q, resources: %v, version: %q, nonce: %q", url, names, version, nonce)
}

return nil
}

Expand Down
80 changes: 59 additions & 21 deletions xds/internal/clients/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ func (a *authority) fallbackToServer(xc *xdsChannelWithConfig) bool {
// Subscribe to all existing resources from the new management server.
for typ, resources := range a.resources {
for name, state := range resources {
if len(state.watchers) == 0 {
continue
}
if a.logger.V(2) {
a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName, name)
}
Expand Down Expand Up @@ -655,6 +658,17 @@ func (a *authority) watchResource(rType ResourceType, resourceName string, watch
}
resources[resourceName] = state
xdsChannel.channel.subscribe(rType, resourceName)
} else if len(state.watchers) == 0 {
if a.logger.V(2) {
a.logger.Infof("Re-watch for type %q, resource name %q before unsubscription", rType.TypeName, resourceName)
}
// Add the active channel to the resource's channel configs if not
// already present.
state.xdsChannelConfigs[xdsChannel] = true
// Ensure the resource is subscribed on the active channel. We do this
// even if resource is present in cache as re-watches might occur
// after unsubscribes or channel changes.
xdsChannel.channel.subscribe(rType, resourceName)
}
// Always add the new watcher to the set of watchers.
state.watchers[watcher] = true
Expand Down Expand Up @@ -732,32 +746,16 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat
}

// There are no more watchers for this resource. Unsubscribe this
// resource from all channels where it was subscribed to and delete
// the state associated with it.
// resource from all channels where it was subscribed to but do not
// delete the state associated with it in case the resource is
// re-requested later before un-subscription request is completed by
// the management server.
if a.logger.V(2) {
a.logger.Infof("Removing last watch for resource name %q", resourceName)
}
for xcc := range state.xdsChannelConfigs {
xcc.channel.unsubscribe(rType, resourceName)
}
delete(resources, resourceName)

// If there are no more watchers for this resource type, delete the
// resource type from the top-level map.
if len(resources) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing last watch for resource type %q", rType.TypeName)
}
delete(a.resources, rType)
}
// If there are no more watchers for any resource type, release the
// reference to the xdsChannels.
if len(a.resources) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing last watch for for any resource type, releasing reference to the xdsChannel")
}
a.closeXDSChannels()
}
}, func() { close(done) })
<-done
})
Expand Down Expand Up @@ -809,7 +807,7 @@ func (a *authority) closeXDSChannels() {
func (a *authority) watcherExistsForUncachedResource() bool {
for _, resourceStates := range a.resources {
for _, state := range resourceStates {
if state.md.Status == xdsresource.ServiceStatusRequested {
if len(state.watchers) > 0 && state.md.Status == xdsresource.ServiceStatusRequested {
return true
}
}
Expand Down Expand Up @@ -841,6 +839,9 @@ func (a *authority) resourceConfig() []*v3statuspb.ClientConfig_GenericXdsConfig
for rType, resourceStates := range a.resources {
typeURL := rType.TypeURL
for name, state := range resourceStates {
if len(state.watchers) == 0 {
continue
}
var raw *anypb.Any
if state.cache != nil {
raw = &anypb.Any{TypeUrl: typeURL, Value: state.cache.Bytes()}
Expand Down Expand Up @@ -874,6 +875,43 @@ func (a *authority) close() {
}
}

// removeUnsubscribedCacheEntries iterates through all resources of the given type and
// removes the state for resources that have no active watchers. This is called
// after sending a discovery request to ensure that resources that were
// unsubscribed (and thus have no watchers) are eventually removed from the
// authority's cache.
//
// This method is only executed in the context of a serializer callback.
func (a *authority) removeUnsubscribedCacheEntries(rType ResourceType) {
resources := a.resources[rType]
if resources == nil {
return
}

for name, state := range resources {
if len(state.watchers) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing resource state for %q of type %q as it has no watchers", name, rType.TypeName)
}
delete(resources, name)
}
}

if len(resources) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing resource type %q from cache as it has no more resources", rType.TypeName)
}
delete(a.resources, rType)
}

if len(a.resources) == 0 {
if a.logger.V(2) {
a.logger.Infof("Removing last watch for any resource type, releasing reference to the xdsChannels")
}
a.closeXDSChannels()
}
}

func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
switch serviceStatus {
case xdsresource.ServiceStatusUnknown:
Expand Down
48 changes: 48 additions & 0 deletions xds/internal/clients/xdsclient/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type xdsChannelEventHandler interface {
// adsResourceDoesNotExist is called when the xdsChannel determines that a
// requested ADS resource does not exist.
adsResourceDoesNotExist(ResourceType, string)

// adsResourceRemoveUnsubscribedCacheEntries is called when the xdsChannel
// needs to remove unsubscribed cache entries.
adsResourceRemoveUnsubscribedCacheEntries(ResourceType)
}

// xdsChannelOpts holds the options for creating a new xdsChannel.
Expand Down Expand Up @@ -136,8 +140,32 @@ type xdsChannel struct {
}

func (xc *xdsChannel) close() {
if xc.closed.HasFired() {
return
}
xc.closed.Fire()

// Get the resource types that this specific ADS stream was handling
// before stopping it.
//
// TODO: Revisit if we can avoid acquiring the lock of ads (another type).
xc.ads.mu.Lock()
typesHandledByStream := make([]ResourceType, 0, len(xc.ads.resourceTypeState))
for typ := range xc.ads.resourceTypeState {
typesHandledByStream = append(typesHandledByStream, typ)
}
xc.ads.mu.Unlock()

xc.ads.Stop()

// Schedule removeUnsubscribedCacheEntries for the types this stream was handling,
// on all authorities that were interested in this channel.
if _, ok := xc.eventHandler.(*channelState); ok {
for _, typ := range typesHandledByStream {
xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(typ)
}
}

xc.transport.Close()
xc.logger.Infof("Shutdown")
}
Expand Down Expand Up @@ -228,6 +256,26 @@ func (xc *xdsChannel) onResponse(resp response, onDone func()) ([]string, error)
return names, err
}

// onRequest invoked when a request is about to be sent on the ADS stream. It
// removes the cache entries for the resource type that are no longer subscribed to.
func (xc *xdsChannel) onRequest(typeURL string) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received an update from the ADS stream on closed ADS stream")
}
return
}

// Lookup the resource parser based on the resource type.
rType, ok := xc.clientConfig.ResourceTypes[typeURL]
if !ok {
logger.Warningf("Resource type URL %q unknown in response from server", typeURL)
return
}

xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(rType)
}

// decodeResponse decodes the resources in the given ADS response.
//
// The opts parameter provides configuration options for decoding the resources.
Expand Down
3 changes: 3 additions & 0 deletions xds/internal/clients/xdsclient/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,6 @@ func (ta *testEventHandler) waitForResourceDoesNotExist(ctx context.Context) (Re
}
return typ, name, nil
}

func (*testEventHandler) adsResourceRemoveUnsubscribedCacheEntries(ResourceType) {
}
136 changes: 136 additions & 0 deletions xds/internal/clients/xdsclient/test/misc_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,139 @@ func (s) TestWatchErrorsContainNodeID_ChannelCreationFailure(t *testing.T) {
}
}
}

// TestUnsubscribeAndResubscribe tests the scenario where the client is busy
// processing a response (simulating a pending ACK at a higher level by holding
// the onDone callback from watchers). During this busy state, a resource is
// unsubscribed and then immediately resubscribed which causes the
// unsubscription and new subscription requests to be buffered due to flow
// control.
//
// The test verifies the following:
// - The resubscribed resource is served from the cache.
// - No "resource does not exist" error is generated for the resubscribed
// resource.
func (s) TestRaceUnsubscribeResubscribe(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
nodeID := uuid.New().String()

resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
si := clients.ServerIdentifier{
ServerURI: mgmtServer.Address,
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
}

configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
xdsClientConfig := xdsclient.Config{
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
Node: clients.Node{ID: nodeID},
TransportBuilder: grpctransport.NewBuilder(configs),
ResourceTypes: resourceTypes,
// Xdstp resource names used in this test do not specify an
// authority. These will end up looking up an entry with the
// empty key in the authorities map. Having an entry with an
// empty key and empty configuration, results in these
// resources also using the top-level configuration.
Authorities: map[string]xdsclient.Authority{
"": {XDSServers: []xdsclient.ServerConfig{}},
},
}

// Create an xDS client with the above config.
client, err := xdsclient.New(xdsClientConfig)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()

const ldsResourceName1 = "test-listener-resource1"
const ldsResourceName2 = "test-listener-resource2"
const rdsName1 = "test-route-configuration-resource1"
const rdsName2 = "test-route-configuration-resource2"
listenerResource1 := e2e.DefaultClientListener(ldsResourceName1, rdsName1)
listenerResource2 := e2e.DefaultClientListener(ldsResourceName2, rdsName2)

// Watch ldsResourceName1 with a regular watcher to ensure it's in cache
// and ACKed.
watcherInitial := newListenerWatcher()
cancelInitial := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, watcherInitial)
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID, Listeners: []*v3listenerpb.Listener{listenerResource1}, SkipValidation: true}); err != nil {
t.Fatalf("mgmtServer.Update() for %s failed: %v", ldsResourceName1, err)
}
if err := verifyListenerUpdate(ctx, watcherInitial.updateCh, listenerUpdateErrTuple{update: listenerUpdate{RouteConfigName: rdsName1}}); err != nil {
t.Fatalf("watcherR1Initial did not receive update for %s: %v", ldsResourceName1, err)
}
cancelInitial()

// Watch ldsResourceName1 and ldsResourceName2 using blocking watchers.
// - Server sends {ldsResourceName1, ldsResourceName2}.
// - Watchers for both resources get the update but we HOLD on to their
// onDone callbacks.
blockingWatcherR1 := newBLockingListenerWatcher()
cancelR1 := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, blockingWatcherR1)
// defer cancelR1 later to create the race

blockingWatcherR2 := newBLockingListenerWatcher()
cancelR2 := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName2, blockingWatcherR2)
defer cancelR2()

// Configure the listener resources on the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listenerResource1, listenerResource2},
SkipValidation: true}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("mgmtServer.Update() for %s and %s failed: %v", ldsResourceName1, ldsResourceName2, err)
}

var onDoneR1, onDoneR2 func()
select {
case <-blockingWatcherR1.updateCh:
onDoneR1 = <-blockingWatcherR1.doneNotifierCh
case <-ctx.Done():
t.Fatalf("Timeout waiting for update for %s on blockingWatcherR1: %v", ldsResourceName1, ctx.Err())
}
select {
case <-blockingWatcherR2.updateCh:
onDoneR2 = <-blockingWatcherR2.doneNotifierCh
case <-ctx.Done():
t.Fatalf("Timeout waiting for update for %s on blockingWatcherR2: %v", ldsResourceName2, ctx.Err())
}

// At this point, ACK for {listenerResource1,listenerResource2} has been
// sent by the client but s.fc.pending.Load() is true because onDoneR1 and
// onDoneR2 are held.
//
// Unsubscribe listenerResource1. This request should be buffered by
// adsStreamImpl because s.fc.pending.Load() is true.
cancelR1()

// Resubscribe listenerResource1 with a new regular watcher, which should
// be served from cache.
watcherR1New := newListenerWatcher()
cancelR1New := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, watcherR1New)
defer cancelR1New()

if err := verifyListenerUpdate(ctx, watcherR1New.updateCh, listenerUpdateErrTuple{update: listenerUpdate{RouteConfigName: rdsName1}}); err != nil {
t.Fatalf("watcherR1New did not receive cached update for %s: %v", ldsResourceName1, err)
}

// Release the onDone callbacks.
if onDoneR1 != nil { // onDoneR1 might be nil if cancelR1() completed very fast.
onDoneR1()
}
onDoneR2()

// Verify watcherR1New does not get a "resource does not exist" error.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout*10) // Slightly longer to catch delayed errors
defer sCancel()
if err := verifyNoListenerUpdate(sCtx, watcherR1New.resourceErrCh); err != nil {
t.Fatalf("watcherR1New received unexpected resource error for %s: %v", ldsResourceName1, err)
}
if err := verifyNoListenerUpdate(sCtx, watcherR1New.ambientErrCh); err != nil {
t.Fatalf("watcherR1New received unexpected ambient error for %s: %v", ldsResourceName1, err)
}
}
15 changes: 15 additions & 0 deletions xds/internal/clients/xdsclient/xdsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,21 @@ func (cs *channelState) adsResourceDoesNotExist(typ ResourceType, resourceName s
}
}

func (cs *channelState) adsResourceRemoveUnsubscribedCacheEntries(rType ResourceType) {
if cs.parent.done.HasFired() {
return
}

cs.parent.channelsMu.Lock()
defer cs.parent.channelsMu.Unlock()

for authority := range cs.interestedAuthorities {
authority.xdsClientSerializer.TrySchedule(func(context.Context) {
authority.removeUnsubscribedCacheEntries(rType)
})
}
}

func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) {
c.channelsMu.Lock()
defer c.channelsMu.Unlock()
Expand Down