diff --git a/xds/internal/httpfilter/fault/fault_test.go b/xds/internal/httpfilter/fault/fault_test.go index 20dd0a2c95cd..48fac43c46d7 100644 --- a/xds/internal/httpfilter/fault/fault_test.go +++ b/xds/internal/httpfilter/fault/fault_test.go @@ -53,8 +53,9 @@ import ( testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" - _ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers. - _ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver. + _ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers. + _ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter. + _ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver. ) const defaultTestTimeout = 10 * time.Second diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index d1dd79354ae0..913f6b3ae7c9 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -39,7 +39,6 @@ import ( "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/httpfilter" - "google.golang.org/grpc/xds/internal/httpfilter/router" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -121,6 +120,7 @@ type routeCluster struct { type route struct { m *xdsresource.CompositeMatcher // converted from route matchers + actionType xdsresource.RouteActionType // holds route action type clusters wrr.WRR // holds *routeCluster entries maxStreamDuration time.Duration // map from filter name to its config @@ -142,6 +142,7 @@ type configSelector struct { } var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") +var errUnsupportedClientRouteAction = status.Errorf(codes.Unavailable, "matched route does not have a supported route action type") func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) { if cs == nil { @@ -155,10 +156,15 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP break } } + if rt == nil || rt.clusters == nil { return nil, errNoMatchedRouteFound } + if rt.actionType != xdsresource.RouteActionRoute { + return nil, errUnsupportedClientRouteAction + } + cluster, ok := rt.clusters.Next().(*routeCluster) if !ok { return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) @@ -280,11 +286,6 @@ func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (ires } interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig)) for _, filter := range cs.httpFilterConfig { - if router.IsRouterFilter(filter.Filter) { - // Ignore any filters after the router filter. The router itself - // is currently a nop. - return &interceptorList{interceptors: interceptors}, nil - } override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority if override == nil { override = rt.httpFilterConfigOverride[filter.Name] // route is second priority @@ -305,7 +306,7 @@ func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (ires interceptors = append(interceptors, i) } } - return nil, fmt.Errorf("error in xds config: no router filter present") + return &interceptorList{interceptors: interceptors}, nil } // stop decrements refs of all clusters referenced by this config selector. @@ -381,6 +382,7 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro if err != nil { return nil, err } + cs.routes[i].actionType = rt.ActionType if rt.MaxStreamDuration == nil { cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration } else { diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 43b835fada93..4e0782646f4c 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -79,9 +79,13 @@ const ( defaultTestShortTimeout = 100 * time.Microsecond ) -var target = resolver.Target{URL: *testutils.MustParseURL("xds:///" + targetStr)} +var ( + target = resolver.Target{URL: *testutils.MustParseURL("xds:///" + targetStr)} -var routerFilter = xdsresource.HTTPFilter{Name: "rtr", Filter: httpfilter.Get(router.TypeURL)} + routerHTTPFilter = httpfilter.Get(router.TypeURL) + routerConfig, _ = routerHTTPFilter.ParseFilterConfig(testutils.MarshalAny(&v3routerpb.Router{})) + routerFilter = xdsresource.HTTPFilter{Name: "rtr", Filter: routerHTTPFilter, Config: routerConfig} +) type s struct { grpctest.Tester @@ -1802,42 +1806,65 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { testCases := []struct { name string ldsFilters []xdsresource.HTTPFilter - vhOverrides map[string]httpfilter.FilterConfig - rtOverrides map[string]httpfilter.FilterConfig - clOverrides map[string]httpfilter.FilterConfig + rtCfgUpdate xdsresource.RouteConfigUpdate rpcRes map[string][][]string selectErr string newStreamErr string }{ + { - name: "no router filter", + name: "route type RouteActionUnsupported invalid for client", ldsFilters: []xdsresource.HTTPFilter{ {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, }, + rtCfgUpdate: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("1"), + WeightedClusters: map[string]xdsresource.WeightedCluster{ + "A": {Weight: 1}, + "B": {Weight: 1}, + }, + ActionType: xdsresource.RouteActionUnsupported, + }}, + }, + }, + }, rpcRes: map[string][][]string{ "1": { {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, }, }, - selectErr: "no router filter present", + selectErr: errUnsupportedClientRouteAction.Error(), }, { - name: "ignored after router filter", + name: "route type RouteActionNonForwardingAction invalid for client", ldsFilters: []xdsresource.HTTPFilter{ {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, - routerFilter, - {Name: "foo2", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo2"}}, + }, + rtCfgUpdate: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("1"), + WeightedClusters: map[string]xdsresource.WeightedCluster{ + "A": {Weight: 1}, + "B": {Weight: 1}, + }, + ActionType: xdsresource.RouteActionNonForwardingAction, + }}, + }, + }, }, rpcRes: map[string][][]string{ "1": { - {"build:foo1", "newstream:foo1", "done:foo1"}, - }, - "2": { - {"build:foo1", "newstream:foo1", "done:foo1"}, - {"build:foo1", "newstream:foo1", "done:foo1"}, - {"build:foo1", "newstream:foo1", "done:foo1"}, + {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, }, }, + selectErr: errUnsupportedClientRouteAction.Error(), }, { name: "NewStream error; ensure earlier interceptor Done is still called", @@ -1846,13 +1873,25 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { {Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1", newStreamErr: errors.New("bar newstream err")}}, routerFilter, }, + rtCfgUpdate: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("1"), + WeightedClusters: map[string]xdsresource.WeightedCluster{ + "A": {Weight: 1}, + "B": {Weight: 1}, + }, + ActionType: xdsresource.RouteActionRoute, + }}, + }, + }, + }, rpcRes: map[string][][]string{ "1": { {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1" /* */, "done:foo1"}, }, - "2": { - {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1" /* */, "done:foo1"}, - }, }, newStreamErr: "bar newstream err", }, @@ -1863,9 +1902,30 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { {Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1"}}, routerFilter, }, - vhOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo2"}, "bar": filterCfg{s: "bar2"}}, - rtOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo3"}, "bar": filterCfg{s: "bar3"}}, - clOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo4"}, "bar": filterCfg{s: "bar4"}}, + rtCfgUpdate: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{ + Prefix: newStringP("1"), + WeightedClusters: map[string]xdsresource.WeightedCluster{ + "A": {Weight: 1}, + "B": {Weight: 1}, + }, + ActionType: xdsresource.RouteActionRoute, + }, { + Prefix: newStringP("2"), + WeightedClusters: map[string]xdsresource.WeightedCluster{ + "A": {Weight: 1}, + "B": {Weight: 1, HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo4"}, "bar": filterCfg{s: "bar4"}}}, + }, + HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo3"}, "bar": filterCfg{s: "bar3"}}, + ActionType: xdsresource.RouteActionRoute, + }}, + HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo2"}, "bar": filterCfg{s: "bar2"}}, + }, + }, + }, rpcRes: map[string][][]string{ "1": { {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, @@ -1904,26 +1964,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { // Invoke the watchAPI callback with a good service update and wait for the // UpdateState method to be called on the ClientConn. - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("1"), WeightedClusters: map[string]xdsresource.WeightedCluster{ - "A": {Weight: 1}, - "B": {Weight: 1}, - }, - }, { - Prefix: newStringP("2"), WeightedClusters: map[string]xdsresource.WeightedCluster{ - "A": {Weight: 1}, - "B": {Weight: 1, HTTPFilterConfigOverride: tc.clOverrides}, - }, - HTTPFilterConfigOverride: tc.rtOverrides, - }}, - HTTPFilterConfigOverride: tc.vhOverrides, - }, - }, - }, nil) + xdsC.InvokeWatchRouteConfigCallback("", tc.rtCfgUpdate, nil) gotState, err := tcc.stateCh.Receive(ctx) if err != nil {