Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/xds/xds_client_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) {

target = fmt.Sprintf("xds://unknown-authority/%s", serviceName)
t.Logf("Dialing target %q with unknown authority which is expected to fail", target)
const wantErr = `authority "unknown-authority" is not found in the bootstrap file`
wantErr := fmt.Sprintf("authority \"unknown-authority\" specified in dial target %q is not found in the bootstrap file", target)
_, err = grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Fatalf("grpc.Dial(%q) returned %v, want: %s", target, err, wantErr)
Expand Down
111 changes: 6 additions & 105 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/httpfilter"
rinternal "google.golang.org/grpc/xds/internal/resolver/internal"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

Expand Down Expand Up @@ -73,16 +72,6 @@ type xdsClusterManagerConfig struct {
Children map[string]xdsChildConfig `json:"children"`
}

// pruneActiveClusters deletes entries in r.activeClusters with zero
// references.
func (r *xdsResolver) pruneActiveClusters() {
for cluster, ci := range r.activeClusters {
if atomic.LoadInt32(&ci.refCount) == 0 {
delete(r.activeClusters, cluster)
}
}
}

// serviceConfigJSON produces a service config in JSON format representing all
// the clusters referenced in activeClusters. This includes clusters with zero
// references, so they must be pruned first.
Expand Down Expand Up @@ -197,10 +186,9 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
if v := atomic.AddInt32(ref, -1); v == 0 {
// This entry will be removed from activeClusters when
// producing the service config for the empty update.
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
cs.r.serializer.Schedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}
},
Interceptor: interceptor,
Expand Down Expand Up @@ -342,97 +330,10 @@ func (cs *configSelector) stop() {
// selector; we need another update to delete clusters from the config (if
// we don't have another update pending already).
if needUpdate {
select {
case cs.r.updateCh <- suWithError{emptyUpdate: true}:
default:
}
}
}

// newConfigSelector creates the config selector for su; may add entries to
// r.activeClusters for previously-unseen clusters.
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
virtualHost: virtualHost{
httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
retryConfig: su.virtualHost.RetryConfig,
},
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: su.ldsConfig.httpFilterConfig,
cs.r.serializer.Schedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}

for i, rt := range su.virtualHost.Routes {
clusters := rinternal.NewWRR.(func() wrr.WRR)()
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{
name: clusterName,
}, 1)
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]),
})
} else {
for cluster, wc := range rt.WeightedClusters {
clusterName := clusterPrefix + cluster
clusters.Add(&routeCluster{
name: clusterName,
httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
}, int64(wc.Weight))
cs.initializeCluster(clusterName, xdsChildConfig{
ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}),
})
}
}
cs.routes[i].clusters = clusters

var err error
cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].actionType = rt.ActionType
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
} else {
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
}

cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
cs.routes[i].retryConfig = rt.RetryConfig
cs.routes[i].hashPolicies = rt.HashPolicies
}

// Account for this config selector's clusters. Do this after no further
// errors may occur. Note: cs.clusters are pointers to entries in
// activeClusters.
for _, ci := range cs.clusters {
atomic.AddInt32(&ci.refCount, 1)
}

return cs, nil
}

// initializeCluster initializes entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Any created entries will have a ref count set
// to zero as their ref count will be incremented by incRefs.
func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) {
ci := cs.r.activeClusters[clusterName]
if ci == nil {
ci = &clusterInfo{refCount: 0}
cs.r.activeClusters[clusterName] = ci
}
cs.clusters[clusterName] = ci
cs.clusters[clusterName].cfg = cfg
}

type clusterInfo struct {
// number of references to this cluster; accessed atomically
refCount int32
// cfg is the child configuration for this cluster, containing either the
// csp config or the cds cluster config.
cfg xdsChildConfig
}

type interceptorList struct {
Expand Down
Loading