Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 46 additions & 184 deletions xds/internal/resolver/cluster_specifier_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,18 @@ import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/testutils"
xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/clusterspecifier"
xdsresolver "google.golang.org/grpc/xds/internal/resolver"
protov2 "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -49,80 +43,6 @@ import (
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
)

const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 100 * time.Microsecond
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// verifyUpdateFromResolver waits for the resolver to push an update to the fake
// resolver.ClientConn and verifies that update matches the provided service
// config.
//
// Tests that want to skip verifying the contents of the service config can pass
// an empty string.
//
// Returns the config selector from the state update pushed by the resolver.
// Tests that don't need the config selector can ignore the return value.
func verifyUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State, wantSC string) iresolver.ConfigSelector {
t.Helper()

var state resolver.State
select {
case <-ctx.Done():
t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
case state = <-stateCh:
if err := state.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
}
if wantSC == "" {
break
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantSC)
if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
}
}
cs := iresolver.GetConfigSelector(state)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
return cs
}

// buildResolverForTarget builds an xDS resolver for the given target. It
// returns the following:
// - a channel to read updates from the resolver
// - the newly created xDS resolver
func buildResolverForTarget(t *testing.T, target resolver.Target) (chan resolver.State, resolver.Resolver) {
t.Helper()

builder := resolver.Get(xdsresolver.Scheme)
if builder == nil {
t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
}

stateCh := make(chan resolver.State, 1)
updateStateF := func(s resolver.State) error {
stateCh <- s
return nil
}
tcc := &testutils.ResolverClientConn{Logger: t, UpdateStateF: updateStateF}
r, err := builder.Build(target, tcc, resolver.BuildOptions{})
if err != nil {
t.Fatalf("Failed to build xDS resolver for target %q: %v", target, err)
}
t.Cleanup(r.Close)
return stateCh, r
}

func init() {
balancer.Register(cspBalancerBuilder{})
clusterspecifier.Register(testClusterSpecifierPlugin{})
Expand Down Expand Up @@ -201,46 +121,24 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
envconfig.XDSRLS = oldRLS
}()

mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()

// Configure listener and route configuration resources on the management
// server.
const serviceName = "my-service-client-side-xds"
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})},
SkipValidation: true,
}
// Spin up an xDS management server for the test.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
nodeID := uuid.New().String()
mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)

// Configure resources on the management server.
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})

// Wait for an update from the resolver, and verify the service config.
wantSC := `
Expand Down Expand Up @@ -276,21 +174,14 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
}

// Change the cluster specifier plugin configuration.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

// Wait for an update from the resolver, and verify the service config.
wantSC = `
Expand Down Expand Up @@ -328,46 +219,24 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
envconfig.XDSRLS = oldRLS
}()

mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatalf("Failed to start xDS management server: %v", err)
}
defer mgmtServer.Stop()

// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()

// Configure listener and route configuration resources on the management
// server.
const serviceName = "my-service-client-side-xds"
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
})},
SkipValidation: true,
}
// Spin up an xDS management server for the test.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
nodeID := uuid.New().String()
mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)

// Configure resources on the management server.
listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})

// Wait for an update from the resolver, and verify the service config.
wantSC := `
Expand Down Expand Up @@ -407,21 +276,14 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
// clusters, they still appear in the service config.

// Change the cluster specifier plugin configuration.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: rdsName,
ListenerName: serviceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspB",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
})},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspB",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
})}
configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)

// Wait for an update from the resolver, and verify the service config.
wantSC = `
Expand Down
Loading