Skip to content

Commit c76442c

Browse files
authored
xds/resolver: move service watching tests to resolver_test package (#6682)
1 parent 5a6773c commit c76442c

File tree

4 files changed

+426
-477
lines changed

4 files changed

+426
-477
lines changed

xds/internal/resolver/cluster_specifier_plugin_test.go

Lines changed: 46 additions & 184 deletions
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,18 @@ import (
2323
"encoding/json"
2424
"fmt"
2525
"testing"
26-
"time"
2726

2827
"github.com/golang/protobuf/proto"
29-
"github.com/google/go-cmp/cmp"
3028
"github.com/google/uuid"
3129
"google.golang.org/grpc/balancer"
32-
"google.golang.org/grpc/internal"
3330
"google.golang.org/grpc/internal/envconfig"
34-
"google.golang.org/grpc/internal/grpctest"
3531
iresolver "google.golang.org/grpc/internal/resolver"
3632
"google.golang.org/grpc/internal/testutils"
37-
xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
3833
"google.golang.org/grpc/internal/testutils/xds/e2e"
3934
"google.golang.org/grpc/resolver"
4035
"google.golang.org/grpc/serviceconfig"
4136
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
4237
"google.golang.org/grpc/xds/internal/clusterspecifier"
43-
xdsresolver "google.golang.org/grpc/xds/internal/resolver"
4438
protov2 "google.golang.org/protobuf/proto"
4539
"google.golang.org/protobuf/types/known/anypb"
4640
"google.golang.org/protobuf/types/known/wrapperspb"
@@ -49,80 +43,6 @@ import (
4943
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
5044
)
5145

52-
const (
53-
defaultTestTimeout = 10 * time.Second
54-
defaultTestShortTimeout = 100 * time.Microsecond
55-
)
56-
57-
type s struct {
58-
grpctest.Tester
59-
}
60-
61-
func Test(t *testing.T) {
62-
grpctest.RunSubTests(t, s{})
63-
}
64-
65-
// verifyUpdateFromResolver waits for the resolver to push an update to the fake
66-
// resolver.ClientConn and verifies that update matches the provided service
67-
// config.
68-
//
69-
// Tests that want to skip verifying the contents of the service config can pass
70-
// an empty string.
71-
//
72-
// Returns the config selector from the state update pushed by the resolver.
73-
// Tests that don't need the config selector can ignore the return value.
74-
func verifyUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State, wantSC string) iresolver.ConfigSelector {
75-
t.Helper()
76-
77-
var state resolver.State
78-
select {
79-
case <-ctx.Done():
80-
t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
81-
case state = <-stateCh:
82-
if err := state.ServiceConfig.Err; err != nil {
83-
t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
84-
}
85-
if wantSC == "" {
86-
break
87-
}
88-
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantSC)
89-
if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
90-
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
91-
}
92-
}
93-
cs := iresolver.GetConfigSelector(state)
94-
if cs == nil {
95-
t.Fatal("Received nil config selector in update from resolver")
96-
}
97-
return cs
98-
}
99-
100-
// buildResolverForTarget builds an xDS resolver for the given target. It
101-
// returns the following:
102-
// - a channel to read updates from the resolver
103-
// - the newly created xDS resolver
104-
func buildResolverForTarget(t *testing.T, target resolver.Target) (chan resolver.State, resolver.Resolver) {
105-
t.Helper()
106-
107-
builder := resolver.Get(xdsresolver.Scheme)
108-
if builder == nil {
109-
t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
110-
}
111-
112-
stateCh := make(chan resolver.State, 1)
113-
updateStateF := func(s resolver.State) error {
114-
stateCh <- s
115-
return nil
116-
}
117-
tcc := &testutils.ResolverClientConn{Logger: t, UpdateStateF: updateStateF}
118-
r, err := builder.Build(target, tcc, resolver.BuildOptions{})
119-
if err != nil {
120-
t.Fatalf("Failed to build xDS resolver for target %q: %v", target, err)
121-
}
122-
t.Cleanup(r.Close)
123-
return stateCh, r
124-
}
125-
12646
func init() {
12747
balancer.Register(cspBalancerBuilder{})
12848
clusterspecifier.Register(testClusterSpecifierPlugin{})
@@ -201,46 +121,24 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
201121
envconfig.XDSRLS = oldRLS
202122
}()
203123

204-
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
205-
if err != nil {
206-
t.Fatalf("Failed to start xDS management server: %v", err)
207-
}
208-
defer mgmtServer.Stop()
209-
210-
// Create a bootstrap configuration specifying the above management server.
211-
nodeID := uuid.New().String()
212-
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
213-
NodeID: nodeID,
214-
ServerURI: mgmtServer.Address,
215-
})
216-
if err != nil {
217-
t.Fatal(err)
218-
}
219-
defer cleanup()
220-
221-
// Configure listener and route configuration resources on the management
222-
// server.
223-
const serviceName = "my-service-client-side-xds"
224-
rdsName := "route-" + serviceName
225-
resources := e2e.UpdateOptions{
226-
NodeID: nodeID,
227-
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
228-
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
229-
RouteConfigName: rdsName,
230-
ListenerName: serviceName,
231-
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
232-
ClusterSpecifierPluginName: "cspA",
233-
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
234-
})},
235-
SkipValidation: true,
236-
}
124+
// Spin up an xDS management server for the test.
237125
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
238126
defer cancel()
239-
if err := mgmtServer.Update(ctx, resources); err != nil {
240-
t.Fatal(err)
241-
}
127+
nodeID := uuid.New().String()
128+
mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
129+
130+
// Configure resources on the management server.
131+
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
132+
routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
133+
RouteConfigName: defaultTestRouteConfigName,
134+
ListenerName: defaultTestServiceName,
135+
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
136+
ClusterSpecifierPluginName: "cspA",
137+
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
138+
})}
139+
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
242140

243-
stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
141+
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
244142

245143
// Wait for an update from the resolver, and verify the service config.
246144
wantSC := `
@@ -276,21 +174,14 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
276174
}
277175

278176
// Change the cluster specifier plugin configuration.
279-
resources = e2e.UpdateOptions{
280-
NodeID: nodeID,
281-
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
282-
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
283-
RouteConfigName: rdsName,
284-
ListenerName: serviceName,
285-
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
286-
ClusterSpecifierPluginName: "cspA",
287-
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
288-
})},
289-
SkipValidation: true,
290-
}
291-
if err := mgmtServer.Update(ctx, resources); err != nil {
292-
t.Fatal(err)
293-
}
177+
routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
178+
RouteConfigName: defaultTestRouteConfigName,
179+
ListenerName: defaultTestServiceName,
180+
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
181+
ClusterSpecifierPluginName: "cspA",
182+
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
183+
})}
184+
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
294185

295186
// Wait for an update from the resolver, and verify the service config.
296187
wantSC = `
@@ -328,46 +219,24 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
328219
envconfig.XDSRLS = oldRLS
329220
}()
330221

331-
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
332-
if err != nil {
333-
t.Fatalf("Failed to start xDS management server: %v", err)
334-
}
335-
defer mgmtServer.Stop()
336-
337-
// Create a bootstrap configuration specifying the above management server.
338-
nodeID := uuid.New().String()
339-
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
340-
NodeID: nodeID,
341-
ServerURI: mgmtServer.Address,
342-
})
343-
if err != nil {
344-
t.Fatal(err)
345-
}
346-
defer cleanup()
347-
348-
// Configure listener and route configuration resources on the management
349-
// server.
350-
const serviceName = "my-service-client-side-xds"
351-
rdsName := "route-" + serviceName
352-
resources := e2e.UpdateOptions{
353-
NodeID: nodeID,
354-
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
355-
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
356-
RouteConfigName: rdsName,
357-
ListenerName: serviceName,
358-
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
359-
ClusterSpecifierPluginName: "cspA",
360-
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
361-
})},
362-
SkipValidation: true,
363-
}
222+
// Spin up an xDS management server for the test.
364223
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
365224
defer cancel()
366-
if err := mgmtServer.Update(ctx, resources); err != nil {
367-
t.Fatal(err)
368-
}
225+
nodeID := uuid.New().String()
226+
mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
227+
228+
// Configure resources on the management server.
229+
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
230+
routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
231+
RouteConfigName: defaultTestRouteConfigName,
232+
ListenerName: defaultTestServiceName,
233+
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
234+
ClusterSpecifierPluginName: "cspA",
235+
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
236+
})}
237+
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
369238

370-
stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
239+
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
371240

372241
// Wait for an update from the resolver, and verify the service config.
373242
wantSC := `
@@ -407,21 +276,14 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
407276
// clusters, they still appear in the service config.
408277

409278
// Change the cluster specifier plugin configuration.
410-
resources = e2e.UpdateOptions{
411-
NodeID: nodeID,
412-
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
413-
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
414-
RouteConfigName: rdsName,
415-
ListenerName: serviceName,
416-
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
417-
ClusterSpecifierPluginName: "cspB",
418-
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
419-
})},
420-
SkipValidation: true,
421-
}
422-
if err := mgmtServer.Update(ctx, resources); err != nil {
423-
t.Fatal(err)
424-
}
279+
routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
280+
RouteConfigName: defaultTestRouteConfigName,
281+
ListenerName: defaultTestServiceName,
282+
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
283+
ClusterSpecifierPluginName: "cspB",
284+
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
285+
})}
286+
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
425287

426288
// Wait for an update from the resolver, and verify the service config.
427289
wantSC = `

0 commit comments

Comments
 (0)