Skip to content

Commit 82a568d

Browse files
authored
cdsbalancer: switch cluster watch to generic xDS client API (#6600)
1 parent 0317200 commit 82a568d

File tree

4 files changed

+564
-639
lines changed

4 files changed

+564
-639
lines changed

xds/internal/balancer/cdsbalancer/cluster_handler_test.go renamed to xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go

Lines changed: 124 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clus
6767
// cluster resource. The test verifies that the load balancing configuration
6868
// pushed to the cluster_resolver LB policy is contains the expected discovery
6969
// mechanism corresponding to the leaf cluster, on both occasions.
70-
func (s) TestClusterHandlerSuccess_LeafNode(t *testing.T) {
70+
func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
7171
tests := []struct {
7272
name string
7373
firstClusterResource *v3clusterpb.Cluster
@@ -657,9 +657,11 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
657657
// Tests the scenario where the aggregate cluster graph has a node that has
658658
// child node of itself. The case for this is A -> A, and since there is no base
659659
// cluster (EDS or Logical DNS), no configuration should be pushed to the child
660-
// policy. Then the test updates A -> B, where B is a leaf EDS cluster. Verifies
661-
// that configuration is pushed to the child policy and that an RPC can be
662-
// successfully made.
660+
// policy. The channel is expected to move to TRANSIENT_FAILURE and RPCs are
661+
// expected to fail with code UNAVAILABLE and an error message specifying that
662+
// the aggregate cluster grpah no leaf clusters. Then the test updates A -> B,
663+
// where B is a leaf EDS cluster. Verifies that configuration is pushed to the
664+
// child policy and that an RPC can be successfully made.
663665
func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
664666
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
665667
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
@@ -687,6 +689,19 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
687689
case <-time.After(defaultTestShortTimeout):
688690
}
689691

692+
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
693+
694+
// Verify that the RPC fails with expected code.
695+
client := testgrpc.NewTestServiceClient(cc)
696+
_, err := client.EmptyCall(ctx, &testpb.Empty{})
697+
if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
698+
t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
699+
}
700+
const wantErr = "aggregate cluster graph has no leaf clusters"
701+
if !strings.Contains(err.Error(), wantErr) {
702+
t.Fatalf("EmptyCall() failed with err: %v, want error containing %s", err, wantErr)
703+
}
704+
690705
// Start a test service backend.
691706
server := stubserver.StartTestService(t, nil)
692707
t.Cleanup(server.Stop)
@@ -719,6 +734,111 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
719734
t.Fatal(err)
720735
}
721736

737+
// Verify that a successful RPC can be made.
738+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
739+
t.Fatalf("EmptyCall() failed: %v", err)
740+
}
741+
}
742+
743+
// Tests the scenario where the aggregate cluster graph contains a cycle and
744+
// contains no leaf clusters. The case used here is [A -> B, B -> A]. As there
745+
// are no leaf clusters in this graph, no configuration should be pushed to the
746+
// child policy. The channel is expected to move to TRANSIENT_FAILURE and RPCs
747+
// are expected to fail with code UNAVAILABLE and an error message specifying
748+
// that the aggregate cluster graph has no leaf clusters.
749+
func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
750+
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
751+
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
752+
753+
const (
754+
clusterNameA = clusterName // cluster name in cds LB policy config
755+
clusterNameB = clusterName + "-B"
756+
)
757+
// Configure the management server with an aggregate cluster resource graph
758+
// that contains a cycle and no leaf clusters.
759+
resources := e2e.UpdateOptions{
760+
NodeID: nodeID,
761+
Clusters: []*v3clusterpb.Cluster{
762+
makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
763+
makeAggregateClusterResource(clusterNameB, []string{clusterNameA}),
764+
},
765+
SkipValidation: true,
766+
}
767+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
768+
defer cancel()
769+
if err := mgmtServer.Update(ctx, resources); err != nil {
770+
t.Fatal(err)
771+
}
772+
773+
select {
774+
case cfg := <-lbCfgCh:
775+
t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
776+
case <-time.After(defaultTestShortTimeout):
777+
}
778+
779+
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
780+
781+
// Verify that the RPC fails with expected code.
782+
client := testgrpc.NewTestServiceClient(cc)
783+
_, err := client.EmptyCall(ctx, &testpb.Empty{})
784+
if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
785+
t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
786+
}
787+
const wantErr = "aggregate cluster graph has no leaf clusters"
788+
if !strings.Contains(err.Error(), wantErr) {
789+
t.Fatalf("EmptyCall() failed with err: %v, want %s", err, wantErr)
790+
}
791+
}
792+
793+
// Tests the scenario where the aggregate cluster graph contains a cycle and
794+
// also contains a leaf cluster. The case used here is [A -> B, B -> A, C]. As
795+
// there is a leaf cluster in this graph , configuration should be pushed to the
796+
// child policy and RPCs should get routed to that leaf cluster.
797+
func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
798+
lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
799+
mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
800+
801+
// Start a test service backend.
802+
server := stubserver.StartTestService(t, nil)
803+
t.Cleanup(server.Stop)
804+
805+
const (
806+
clusterNameA = clusterName // cluster name in cds LB policy config
807+
clusterNameB = clusterName + "-B"
808+
clusterNameC = clusterName + "-C"
809+
)
810+
// Configure the management server with an aggregate cluster resource graph
811+
// that contains a cycle, but also contains a leaf cluster.
812+
resources := e2e.UpdateOptions{
813+
NodeID: nodeID,
814+
Clusters: []*v3clusterpb.Cluster{
815+
makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
816+
makeAggregateClusterResource(clusterNameB, []string{clusterNameA, clusterNameC}),
817+
e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone),
818+
},
819+
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
820+
SkipValidation: true,
821+
}
822+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
823+
defer cancel()
824+
if err := mgmtServer.Update(ctx, resources); err != nil {
825+
t.Fatal(err)
826+
}
827+
828+
// Verify the configuration pushed to the child policy.
829+
wantChildCfg := &clusterresolver.LBConfig{
830+
DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
831+
Cluster: clusterNameC,
832+
Type: clusterresolver.DiscoveryMechanismTypeEDS,
833+
EDSServiceName: serviceName,
834+
OutlierDetection: json.RawMessage(`{}`),
835+
}},
836+
XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
837+
}
838+
if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
839+
t.Fatal(err)
840+
}
841+
722842
// Verify that a successful RPC can be made.
723843
client := testgrpc.NewTestServiceClient(cc)
724844
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {

0 commit comments

Comments
 (0)