Skip to content

Commit 7eb5727

Browse files
authored
xds: switch EDS watch to new generic xdsClient API (#6414)
1 parent e859984 commit 7eb5727

File tree

8 files changed

+156
-193
lines changed

8 files changed

+156
-193
lines changed

xds/internal/balancer/clusterresolver/resource_resolver_eds.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,8 @@ import (
2525
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
2626
)
2727

28-
type edsResourceWatcher interface {
29-
WatchEndpoints(string, func(xdsresource.EndpointsUpdate, error)) func()
30-
}
31-
3228
type edsDiscoveryMechanism struct {
29+
nameToWatch string
3330
cancelWatch func()
3431
topLevelResolver topLevelResolver
3532
stopped *grpcsync.Event
@@ -64,31 +61,44 @@ func (er *edsDiscoveryMechanism) stop() {
6461
er.cancelWatch()
6562
}
6663

67-
func (er *edsDiscoveryMechanism) handleEndpointsUpdate(update xdsresource.EndpointsUpdate, err error) {
68-
if er.stopped.HasFired() {
69-
return
64+
// newEDSResolver returns an implementation of the endpointsResolver interface
65+
// that uses EDS to resolve the given name to endpoints.
66+
func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver) *edsDiscoveryMechanism {
67+
ret := &edsDiscoveryMechanism{
68+
nameToWatch: nameToWatch,
69+
topLevelResolver: topLevelResolver,
70+
stopped: grpcsync.NewEvent(),
7071
}
72+
ret.cancelWatch = xdsresource.WatchEndpoints(producer, nameToWatch, ret)
73+
return ret
74+
}
7175

72-
if err != nil {
73-
er.topLevelResolver.onError(err)
76+
// OnUpdate is invoked to report an update for the resource being watched.
77+
func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData) {
78+
if er.stopped.HasFired() {
7479
return
7580
}
7681

7782
er.mu.Lock()
78-
er.update = update
83+
er.update = update.Resource
7984
er.updateReceived = true
8085
er.mu.Unlock()
8186

8287
er.topLevelResolver.onUpdate()
8388
}
8489

85-
// newEDSResolver returns an implementation of the endpointsResolver interface
86-
// that uses EDS to resolve the given name to endpoints.
87-
func newEDSResolver(nameToWatch string, watcher edsResourceWatcher, topLevelResolver topLevelResolver) *edsDiscoveryMechanism {
88-
ret := &edsDiscoveryMechanism{
89-
topLevelResolver: topLevelResolver,
90-
stopped: grpcsync.NewEvent(),
90+
func (er *edsDiscoveryMechanism) OnError(err error) {
91+
if er.stopped.HasFired() {
92+
return
9193
}
92-
ret.cancelWatch = watcher.WatchEndpoints(nameToWatch, ret.handleEndpointsUpdate)
93-
return ret
94+
95+
er.topLevelResolver.onError(err)
96+
}
97+
98+
func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
99+
if er.stopped.HasFired() {
100+
return
101+
}
102+
103+
er.topLevelResolver.onError(xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Endpoints not found in received response", er.nameToWatch))
94104
}

xds/internal/xdsclient/client.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ type XDSClient interface {
3333
WatchListener(string, func(xdsresource.ListenerUpdate, error)) func()
3434
WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func()
3535
WatchCluster(string, func(xdsresource.ClusterUpdate, error)) func()
36-
WatchEndpoints(string, func(xdsresource.EndpointsUpdate, error)) func()
3736

3837
// WatchResource uses xDS to discover the resource associated with the
3938
// provided resource name. The resource type implementation determines how

xds/internal/xdsclient/clientimpl_watchers.go

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -112,37 +112,6 @@ func (c *clientImpl) WatchCluster(resourceName string, cb func(xdsresource.Clust
112112
return xdsresource.WatchCluster(c, resourceName, watcher)
113113
}
114114

115-
// This is only required temporarily, while we modify the
116-
// clientImpl.WatchEndpoints API to be implemented via the wrapper
117-
// WatchEndpoints() API which calls the WatchResource() API.
118-
type endpointsWatcher struct {
119-
resourceName string
120-
cb func(xdsresource.EndpointsUpdate, error)
121-
}
122-
123-
func (c *endpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {
124-
c.cb(update.Resource, nil)
125-
}
126-
127-
func (c *endpointsWatcher) OnError(err error) {
128-
c.cb(xdsresource.EndpointsUpdate{}, err)
129-
}
130-
131-
func (c *endpointsWatcher) OnResourceDoesNotExist() {
132-
err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Endpoints not found in received response", c.resourceName)
133-
c.cb(xdsresource.EndpointsUpdate{}, err)
134-
}
135-
136-
// WatchEndpoints uses EDS to discover information about the
137-
// ClusterLoadAssignment resource identified by resourceName.
138-
//
139-
// WatchEndpoints can be called multiple times, with same or different
140-
// clusterNames. Each call will start an independent watcher for the resource.
141-
func (c *clientImpl) WatchEndpoints(resourceName string, cb func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
142-
watcher := &endpointsWatcher{resourceName: resourceName, cb: cb}
143-
return xdsresource.WatchEndpoints(c, resourceName, watcher)
144-
}
145-
146115
// WatchResource uses xDS to discover the resource associated with the provided
147116
// resource name. The resource type implementation determines how xDS requests
148117
// are sent out and how responses are deserialized and validated. Upon receipt

xds/internal/xdsclient/tests/dump_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ func compareDump(ctx context.Context, client xdsclient.XDSClient, want map[strin
6060
}
6161
}
6262

63+
type noopEndpointsWatcher struct{}
64+
65+
func (noopEndpointsWatcher) OnUpdate(update *xdsresource.EndpointsResourceData) {}
66+
func (noopEndpointsWatcher) OnError(err error) {}
67+
func (noopEndpointsWatcher) OnResourceDoesNotExist() {}
68+
6369
func (s) TestDumpResources(t *testing.T) {
6470
// Initialize the xDS resources to be used in this test.
6571
ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"}
@@ -122,7 +128,7 @@ func (s) TestDumpResources(t *testing.T) {
122128
client.WatchCluster(target, func(xdsresource.ClusterUpdate, error) {})
123129
}
124130
for _, target := range edsTargets {
125-
client.WatchEndpoints(target, func(xdsresource.EndpointsUpdate, error) {})
131+
xdsresource.WatchEndpoints(client, target, noopEndpointsWatcher{})
126132
}
127133
want := map[string]map[string]xdsresource.UpdateWithMD{
128134
"type.googleapis.com/envoy.config.listener.v3.Listener": {

0 commit comments

Comments
 (0)