Skip to content

Commit a21e374

Browse files
xds/cdsbalancer: correctly remove the unwanted cds watchers (#8428)
1 parent 64a6b62 commit a21e374

File tree

3 files changed

+58
-7
lines changed

3 files changed

+58
-7
lines changed

xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,3 +866,53 @@ func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
866866
t.Fatalf("EmptyCall() failed: %v", err)
867867
}
868868
}
869+
870+
// Tests the scenario where the cluster tree changes, and verifies that the
871+
// watchers for the cds balancer are updated accordingly. That is the cluster
872+
// removed from the tree no longer has a watcher and the new cluster added has a
873+
// new watcher.
874+
func (s) TestWatchers(t *testing.T) {
875+
mgmtServer, nodeID, _, _, _, cdsResourceRequestedCh, _ := setupWithManagementServer(t)
876+
877+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
878+
defer cancel()
879+
880+
const (
881+
clusterA = clusterName
882+
clusterB = clusterName + "-B"
883+
clusterC = clusterName + "-C"
884+
clusterD = clusterName + "-D"
885+
)
886+
887+
// Initial CDS resources: A -> B, C
888+
initialResources := e2e.UpdateOptions{
889+
NodeID: nodeID,
890+
Clusters: []*v3clusterpb.Cluster{
891+
makeAggregateClusterResource(clusterA, []string{clusterB, clusterC}),
892+
},
893+
SkipValidation: true,
894+
}
895+
if err := mgmtServer.Update(ctx, initialResources); err != nil {
896+
t.Fatalf("Update failed: %v", err)
897+
}
898+
wantNames := []string{clusterA, clusterB, clusterC}
899+
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
900+
t.Fatal(err)
901+
}
902+
903+
// Update the CDS resources to remove cluster C and add cluster D.
904+
updatedResources := e2e.UpdateOptions{
905+
NodeID: nodeID,
906+
Clusters: []*v3clusterpb.Cluster{
907+
makeAggregateClusterResource(clusterA, []string{clusterB, clusterD}),
908+
},
909+
SkipValidation: true,
910+
}
911+
if err := mgmtServer.Update(ctx, updatedResources); err != nil {
912+
t.Fatalf("Update failed: %v", err)
913+
}
914+
wantNames = []string{clusterA, clusterB, clusterD}
915+
if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
916+
t.Fatal(err)
917+
}
918+
}

xds/internal/balancer/cdsbalancer/cdsbalancer.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -530,13 +530,11 @@ func (b *cdsBalancer) onClusterUpdate(name string, update xdsresource.ClusterUpd
530530
}
531531
// We no longer need the clusters that we did not see in this iteration of
532532
// generateDMsForCluster().
533-
for cluster := range clustersSeen {
534-
state, ok := b.watchers[cluster]
535-
if ok {
536-
continue
533+
for cluster, state := range b.watchers {
534+
if !clustersSeen[cluster] {
535+
state.cancelWatch()
536+
delete(b.watchers, cluster)
537537
}
538-
state.cancelWatch()
539-
delete(b.watchers, cluster)
540538
}
541539
}
542540

xds/internal/balancer/cdsbalancer/cdsbalancer_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
"github.com/google/go-cmp/cmp"
29+
"github.com/google/go-cmp/cmp/cmpopts"
2930
"github.com/google/uuid"
3031
"google.golang.org/grpc"
3132
"google.golang.org/grpc/balancer"
@@ -85,7 +86,9 @@ func waitForResourceNames(ctx context.Context, resourceNamesCh chan []string, wa
8586
select {
8687
case <-ctx.Done():
8788
case gotNames := <-resourceNamesCh:
88-
if cmp.Equal(gotNames, wantNames) {
89+
// Sort both slices before comparing them, as the order of clusters
90+
// does not matter.
91+
if cmp.Equal(gotNames, wantNames, cmpopts.SortSlices(func(a, b string) bool { return a < b })) {
8992
return nil
9093
}
9194
}

0 commit comments

Comments
 (0)