Skip to content

Commit 477bd62

Browse files
authored
xds/internal/resolver: switch to generic xDS API for LDS/RDS (#6729)
1 parent a03c7f1 commit 477bd62

File tree

5 files changed

+442
-395
lines changed

5 files changed

+442
-395
lines changed

test/xds/xds_client_federation_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) {
188188

189189
target = fmt.Sprintf("xds://unknown-authority/%s", serviceName)
190190
t.Logf("Dialing target %q with unknown authority which is expected to fail", target)
191-
const wantErr = `authority "unknown-authority" is not found in the bootstrap file`
191+
wantErr := fmt.Sprintf("authority \"unknown-authority\" specified in dial target %q is not found in the bootstrap file", target)
192192
_, err = grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
193193
if err == nil || !strings.Contains(err.Error(), wantErr) {
194194
t.Fatalf("grpc.Dial(%q) returned %v, want: %s", target, err, wantErr)

xds/internal/resolver/serviceconfig.go

Lines changed: 6 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
4040
"google.golang.org/grpc/xds/internal/balancer/ringhash"
4141
"google.golang.org/grpc/xds/internal/httpfilter"
42-
rinternal "google.golang.org/grpc/xds/internal/resolver/internal"
4342
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
4443
)
4544

@@ -72,16 +71,6 @@ type xdsClusterManagerConfig struct {
7271
Children map[string]xdsChildConfig `json:"children"`
7372
}
7473

75-
// pruneActiveClusters deletes entries in r.activeClusters with zero
76-
// references.
77-
func (r *xdsResolver) pruneActiveClusters() {
78-
for cluster, ci := range r.activeClusters {
79-
if atomic.LoadInt32(&ci.refCount) == 0 {
80-
delete(r.activeClusters, cluster)
81-
}
82-
}
83-
}
84-
8574
// serviceConfigJSON produces a service config in JSON format representing all
8675
// the clusters referenced in activeClusters. This includes clusters with zero
8776
// references, so they must be pruned first.
@@ -193,10 +182,9 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
193182
if v := atomic.AddInt32(ref, -1); v == 0 {
194183
// This entry will be removed from activeClusters when
195184
// producing the service config for the empty update.
196-
select {
197-
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
198-
default:
199-
}
185+
cs.r.serializer.Schedule(func(context.Context) {
186+
cs.r.onClusterRefDownToZero()
187+
})
200188
}
201189
},
202190
Interceptor: interceptor,
@@ -338,97 +326,10 @@ func (cs *configSelector) stop() {
338326
// selector; we need another update to delete clusters from the config (if
339327
// we don't have another update pending already).
340328
if needUpdate {
341-
select {
342-
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
343-
default:
344-
}
345-
}
346-
}
347-
348-
// newConfigSelector creates the config selector for su; may add entries to
349-
// r.activeClusters for previously-unseen clusters.
350-
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
351-
cs := &configSelector{
352-
r: r,
353-
virtualHost: virtualHost{
354-
httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
355-
retryConfig: su.virtualHost.RetryConfig,
356-
},
357-
routes: make([]route, len(su.virtualHost.Routes)),
358-
clusters: make(map[string]*clusterInfo),
359-
httpFilterConfig: su.ldsConfig.httpFilterConfig,
329+
cs.r.serializer.Schedule(func(context.Context) {
330+
cs.r.onClusterRefDownToZero()
331+
})
360332
}
361-
362-
for i, rt := range su.virtualHost.Routes {
363-
clusters := rinternal.NewWRR.(func() wrr.WRR)()
364-
if rt.ClusterSpecifierPlugin != "" {
365-
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
366-
clusters.Add(&routeCluster{
367-
name: clusterName,
368-
}, 1)
369-
cs.initializeCluster(clusterName, xdsChildConfig{
370-
ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]),
371-
})
372-
} else {
373-
for cluster, wc := range rt.WeightedClusters {
374-
clusterName := clusterPrefix + cluster
375-
clusters.Add(&routeCluster{
376-
name: clusterName,
377-
httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
378-
}, int64(wc.Weight))
379-
cs.initializeCluster(clusterName, xdsChildConfig{
380-
ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}),
381-
})
382-
}
383-
}
384-
cs.routes[i].clusters = clusters
385-
386-
var err error
387-
cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
388-
if err != nil {
389-
return nil, err
390-
}
391-
cs.routes[i].actionType = rt.ActionType
392-
if rt.MaxStreamDuration == nil {
393-
cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
394-
} else {
395-
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
396-
}
397-
398-
cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
399-
cs.routes[i].retryConfig = rt.RetryConfig
400-
cs.routes[i].hashPolicies = rt.HashPolicies
401-
}
402-
403-
// Account for this config selector's clusters. Do this after no further
404-
// errors may occur. Note: cs.clusters are pointers to entries in
405-
// activeClusters.
406-
for _, ci := range cs.clusters {
407-
atomic.AddInt32(&ci.refCount, 1)
408-
}
409-
410-
return cs, nil
411-
}
412-
413-
// initializeCluster initializes entries in cs.clusters map, creating entries in
414-
// r.activeClusters as necessary. Any created entries will have a ref count set
415-
// to zero as their ref count will be incremented by incRefs.
416-
func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) {
417-
ci := cs.r.activeClusters[clusterName]
418-
if ci == nil {
419-
ci = &clusterInfo{refCount: 0}
420-
cs.r.activeClusters[clusterName] = ci
421-
}
422-
cs.clusters[clusterName] = ci
423-
cs.clusters[clusterName].cfg = cfg
424-
}
425-
426-
type clusterInfo struct {
427-
// number of references to this cluster; accessed atomically
428-
refCount int32
429-
// cfg is the child configuration for this cluster, containing either the
430-
// csp config or the cds cluster config.
431-
cfg xdsChildConfig
432333
}
433334

434335
type interceptorList struct {

xds/internal/resolver/watch_service.go

Lines changed: 51 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -19,185 +19,77 @@
1919
package resolver
2020

2121
import (
22-
"fmt"
23-
"sync"
24-
"time"
22+
"context"
2523

26-
"google.golang.org/grpc/internal/grpclog"
27-
"google.golang.org/grpc/internal/pretty"
28-
"google.golang.org/grpc/xds/internal/clusterspecifier"
29-
"google.golang.org/grpc/xds/internal/xdsclient"
3024
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
3125
)
3226

33-
// serviceUpdate contains information received from the LDS/RDS responses which
34-
// are of interest to the xds resolver. The RDS request is built by first
35-
// making a LDS to get the RouteConfig name.
36-
type serviceUpdate struct {
37-
// virtualHost contains routes and other configuration to route RPCs.
38-
virtualHost *xdsresource.VirtualHost
39-
// clusterSpecifierPlugins contains the configurations for any cluster
40-
// specifier plugins emitted by the xdsclient.
41-
clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig
42-
// ldsConfig contains configuration that applies to all routes.
43-
ldsConfig ldsConfig
27+
type listenerWatcher struct {
28+
resourceName string
29+
cancel func()
30+
parent *xdsResolver
4431
}
4532

46-
// ldsConfig contains information received from the LDS responses which are of
47-
// interest to the xds resolver.
48-
type ldsConfig struct {
49-
// maxStreamDuration is from the HTTP connection manager's
50-
// common_http_protocol_options field.
51-
maxStreamDuration time.Duration
52-
httpFilterConfig []xdsresource.HTTPFilter
33+
func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatcher {
34+
lw := &listenerWatcher{resourceName: resourceName, parent: parent}
35+
lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw)
36+
return lw
5337
}
5438

55-
// watchService uses LDS and RDS to discover information about the provided
56-
// serviceName.
57-
//
58-
// Note that during race (e.g. an xDS response is received while the user is
59-
// calling cancel()), there's a small window where the callback can be called
60-
// after the watcher is canceled. The caller needs to handle this case.
61-
//
62-
// TODO(easwars): Make this function a method on the xdsResolver type.
63-
// Currently, there is a single call site for this function, and all arguments
64-
// passed to it are fields of the xdsResolver type.
65-
func watchService(c xdsclient.XDSClient, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) {
66-
w := &serviceUpdateWatcher{
67-
logger: logger,
68-
c: c,
69-
serviceName: serviceName,
70-
serviceCb: cb,
71-
}
72-
w.ldsCancel = c.WatchListener(serviceName, w.handleLDSResp)
73-
74-
return w.close
39+
func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
40+
l.parent.serializer.Schedule(func(context.Context) {
41+
l.parent.onListenerResourceUpdate(update.Resource)
42+
})
7543
}
7644

77-
// serviceUpdateWatcher handles LDS and RDS response, and calls the service
78-
// callback at the right time.
79-
type serviceUpdateWatcher struct {
80-
logger *grpclog.PrefixLogger
81-
c xdsclient.XDSClient
82-
serviceName string
83-
ldsCancel func()
84-
serviceCb func(serviceUpdate, error)
85-
lastUpdate serviceUpdate
86-
87-
mu sync.Mutex
88-
closed bool
89-
rdsName string
90-
rdsCancel func()
45+
func (l *listenerWatcher) OnError(err error) {
46+
l.parent.serializer.Schedule(func(context.Context) {
47+
l.parent.onListenerResourceError(err)
48+
})
9149
}
9250

93-
func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, err error) {
94-
w.logger.Infof("received LDS update: %+v, err: %v", pretty.ToJSON(update), err)
95-
w.mu.Lock()
96-
defer w.mu.Unlock()
97-
if w.closed {
98-
return
99-
}
100-
if err != nil {
101-
// We check the error type and do different things. For now, the only
102-
// type we check is ResourceNotFound, which indicates the LDS resource
103-
// was removed, and besides sending the error to callback, we also
104-
// cancel the RDS watch.
105-
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound && w.rdsCancel != nil {
106-
w.rdsCancel()
107-
w.rdsName = ""
108-
w.rdsCancel = nil
109-
w.lastUpdate = serviceUpdate{}
110-
}
111-
// The other error cases still return early without canceling the
112-
// existing RDS watch.
113-
w.serviceCb(serviceUpdate{}, err)
114-
return
115-
}
116-
117-
w.lastUpdate.ldsConfig = ldsConfig{
118-
maxStreamDuration: update.MaxStreamDuration,
119-
httpFilterConfig: update.HTTPFilters,
120-
}
121-
122-
if update.InlineRouteConfig != nil {
123-
// If there was an RDS watch, cancel it.
124-
w.rdsName = ""
125-
if w.rdsCancel != nil {
126-
w.rdsCancel()
127-
w.rdsCancel = nil
128-
}
51+
func (l *listenerWatcher) OnResourceDoesNotExist() {
52+
l.parent.serializer.Schedule(func(context.Context) {
53+
l.parent.onListenerResourceNotFound()
54+
})
55+
}
12956

130-
// Handle the inline RDS update as if it's from an RDS watch.
131-
w.applyRouteConfigUpdate(*update.InlineRouteConfig)
132-
return
133-
}
57+
func (l *listenerWatcher) stop() {
58+
l.cancel()
59+
l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
60+
}
13461

135-
// RDS name from update is not an empty string, need RDS to fetch the
136-
// routes.
62+
type routeConfigWatcher struct {
63+
resourceName string
64+
cancel func()
65+
parent *xdsResolver
66+
}
13767

138-
if w.rdsName == update.RouteConfigName {
139-
// If the new RouteConfigName is same as the previous, don't cancel and
140-
// restart the RDS watch.
141-
//
142-
// If the route name did change, then we must wait until the first RDS
143-
// update before reporting this LDS config.
144-
if w.lastUpdate.virtualHost != nil {
145-
// We want to send an update with the new fields from the new LDS
146-
// (e.g. max stream duration), and old fields from the previous
147-
// RDS.
148-
//
149-
// But note that this should only happen when virtual host is set,
150-
// which means an RDS was received.
151-
w.serviceCb(w.lastUpdate, nil)
152-
}
153-
return
154-
}
155-
w.rdsName = update.RouteConfigName
156-
if w.rdsCancel != nil {
157-
w.rdsCancel()
158-
}
159-
w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
68+
func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfigWatcher {
69+
rw := &routeConfigWatcher{resourceName: resourceName, parent: parent}
70+
rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw)
71+
return rw
16072
}
16173

162-
func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) {
163-
matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
164-
if matchVh == nil {
165-
// No matching virtual host found.
166-
w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
167-
return
168-
}
74+
func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
75+
r.parent.serializer.Schedule(func(context.Context) {
76+
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
77+
})
78+
}
16979

170-
w.lastUpdate.virtualHost = matchVh
171-
w.lastUpdate.clusterSpecifierPlugins = update.ClusterSpecifierPlugins
172-
w.serviceCb(w.lastUpdate, nil)
80+
func (r *routeConfigWatcher) OnError(err error) {
81+
r.parent.serializer.Schedule(func(context.Context) {
82+
r.parent.onRouteConfigResourceError(r.resourceName, err)
83+
})
17384
}
17485

175-
func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdate, err error) {
176-
w.logger.Infof("received RDS update: %+v, err: %v", pretty.ToJSON(update), err)
177-
w.mu.Lock()
178-
defer w.mu.Unlock()
179-
if w.closed {
180-
return
181-
}
182-
if w.rdsCancel == nil {
183-
// This mean only the RDS watch is canceled, can happen if the LDS
184-
// resource is removed.
185-
return
186-
}
187-
if err != nil {
188-
w.serviceCb(serviceUpdate{}, err)
189-
return
190-
}
191-
w.applyRouteConfigUpdate(update)
86+
func (r *routeConfigWatcher) OnResourceDoesNotExist() {
87+
r.parent.serializer.Schedule(func(context.Context) {
88+
r.parent.onRouteConfigResourceNotFound(r.resourceName)
89+
})
19290
}
19391

194-
func (w *serviceUpdateWatcher) close() {
195-
w.mu.Lock()
196-
defer w.mu.Unlock()
197-
w.closed = true
198-
w.ldsCancel()
199-
if w.rdsCancel != nil {
200-
w.rdsCancel()
201-
w.rdsCancel = nil
202-
}
92+
func (r *routeConfigWatcher) stop() {
93+
r.cancel()
94+
r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
20395
}

0 commit comments

Comments
 (0)