Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions internal/dag/accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ func (d *DAG) GetClusters() []*Cluster {
for _, route := range vhost.Routes {
res = append(res, route.Clusters...)

if route.MirrorPolicy != nil && route.MirrorPolicy.Cluster != nil {
res = append(res, route.MirrorPolicy.Cluster)
for _, mp := range route.MirrorPolicies {
if mp.Cluster != nil {
res = append(res, mp.Cluster)
}
}
}
}
Expand All @@ -212,8 +214,10 @@ func (d *DAG) GetClusters() []*Cluster {
for _, route := range vhost.Routes {
res = append(res, route.Clusters...)

if route.MirrorPolicy != nil && route.MirrorPolicy.Cluster != nil {
res = append(res, route.MirrorPolicy.Cluster)
for _, mp := range route.MirrorPolicies {
if mp.Cluster != nil {
res = append(res, mp.Cluster)
}
}
}

Expand Down
24 changes: 13 additions & 11 deletions internal/dag/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3840,7 +3840,7 @@ func TestDAGInsertGatewayAPI(t *testing.T) {
&Listener{
Name: "http-80",
VirtualHosts: virtualhosts(virtualhost("test.projectcontour.io",
withMirror(prefixrouteHTTPRoute("/", service(kuardService)), service(kuardService2), 100))),
withMirror(prefixrouteHTTPRoute("/", service(kuardService)), []*Service{service(kuardService2)}, 100))),
},
),
},
Expand Down Expand Up @@ -3882,8 +3882,8 @@ func TestDAGInsertGatewayAPI(t *testing.T) {
&Listener{
Name: "http-80",
VirtualHosts: virtualhosts(virtualhost("test.projectcontour.io",
withMirror(prefixrouteHTTPRoute("/", service(kuardService)), service(kuardService2), 100),
withMirror(segmentPrefixHTTPRoute("/another-match", service(kuardService)), service(kuardService2), 100),
withMirror(prefixrouteHTTPRoute("/", service(kuardService)), []*Service{service(kuardService2)}, 100),
withMirror(segmentPrefixHTTPRoute("/another-match", service(kuardService)), []*Service{service(kuardService2)}, 100),
)),
},
),
Expand Down Expand Up @@ -5702,7 +5702,7 @@ func TestDAGInsertGatewayAPI(t *testing.T) {
&Listener{
Name: "http-80",
VirtualHosts: virtualhosts(virtualhost("test.projectcontour.io",
withMirror(exactrouteGRPCRoute("/io.projectcontour/Login", grpcService(kuardService, "h2c")), grpcService(kuardService2, "h2c"), 100))),
withMirror(exactrouteGRPCRoute("/io.projectcontour/Login", grpcService(kuardService, "h2c")), []*Service{grpcService(kuardService2, "h2c")}, 100))),
},
),
},
Expand Down Expand Up @@ -11169,7 +11169,7 @@ func TestDAGInsert(t *testing.T) {
Port: 8080,
VirtualHosts: virtualhosts(
virtualhost("example.com",
withMirror(prefixroute("/", service(s1)), service(s2), 100),
withMirror(prefixroute("/", service(s1)), []*Service{service(s2)}, 100),
),
),
},
Expand Down Expand Up @@ -16223,12 +16223,14 @@ func prefixSegment(prefix string) MatchCondition {
func exact(path string) MatchCondition { return &ExactMatchCondition{Path: path} }
func regex(regex string) MatchCondition { return &RegexMatchCondition{Regex: regex} }

func withMirror(r *Route, mirror *Service, weight int64) *Route {
r.MirrorPolicy = &MirrorPolicy{
Cluster: &Cluster{
Upstream: mirror,
},
Weight: weight,
func withMirror(r *Route, mirrors []*Service, weight int64) *Route {
for _, mirror := range mirrors {
r.MirrorPolicies = append(r.MirrorPolicies, &MirrorPolicy{
Cluster: &Cluster{
Upstream: mirror,
},
Weight: weight,
})
}
return r
}
4 changes: 2 additions & 2 deletions internal/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ type Route struct {
// Indicates that during forwarding, the matched prefix (or path) should be swapped with this value
PathRewritePolicy *PathRewritePolicy

// Mirror Policy defines the mirroring policy for this Route.
MirrorPolicy *MirrorPolicy
// MirrorPolicies is a list defining the mirroring policies for this Route.
MirrorPolicies []*MirrorPolicy

// RequestHeadersPolicy defines how headers are managed during forwarding
RequestHeadersPolicy *HeadersPolicy
Expand Down
25 changes: 13 additions & 12 deletions internal/dag/gatewayapi_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ func (p *GatewayAPIProcessor) computeHTTPRouteForListener(route *gatewayapi_v1be
requestHeaderPolicy *HeadersPolicy
responseHeaderPolicy *HeadersPolicy
redirect *Redirect
mirrorPolicy *MirrorPolicy
mirrorPolicies []*MirrorPolicy
pathRewritePolicy *PathRewritePolicy
urlRewriteHostname string
)
Expand Down Expand Up @@ -1222,7 +1222,7 @@ func (p *GatewayAPIProcessor) computeHTTPRouteForListener(route *gatewayapi_v1be
PathRewritePolicy: pathRewritePolicy,
}
case gatewayapi_v1beta1.HTTPRouteFilterRequestMirror:
if filter.RequestMirror == nil || mirrorPolicy != nil {
if filter.RequestMirror == nil {
continue
}

Expand All @@ -1231,12 +1231,12 @@ func (p *GatewayAPIProcessor) computeHTTPRouteForListener(route *gatewayapi_v1be
routeAccessor.AddCondition(gatewayapi_v1beta1.RouteConditionType(cond.Type), cond.Status, gatewayapi_v1beta1.RouteConditionReason(cond.Reason), cond.Message)
continue
}
mirrorPolicy = &MirrorPolicy{
mirrorPolicies = append(mirrorPolicies, &MirrorPolicy{
Cluster: &Cluster{
Upstream: mirrorService,
},
Weight: 100,
}
})
case gatewayapi_v1beta1.HTTPRouteFilterURLRewrite:
if filter.URLRewrite == nil || pathRewritePolicy != nil {
continue
Expand Down Expand Up @@ -1337,7 +1337,7 @@ func (p *GatewayAPIProcessor) computeHTTPRouteForListener(route *gatewayapi_v1be
matchconditions,
requestHeaderPolicy,
responseHeaderPolicy,
mirrorPolicy,
mirrorPolicies,
clusters,
totalWeight,
priority,
Expand Down Expand Up @@ -1403,7 +1403,7 @@ func (p *GatewayAPIProcessor) computeGRPCRouteForListener(route *gatewayapi_v1al
// Process rule-level filters.
var (
requestHeaderPolicy, responseHeaderPolicy *HeadersPolicy
mirrorPolicy *MirrorPolicy
mirrorPolicies []*MirrorPolicy
)

// Per Gateway API docs: "Specifying a core filter multiple times
Expand Down Expand Up @@ -1433,7 +1433,8 @@ func (p *GatewayAPIProcessor) computeGRPCRouteForListener(route *gatewayapi_v1al
routeAccessor.AddCondition(gatewayapi_v1beta1.RouteConditionResolvedRefs, metav1.ConditionFalse, status.ReasonDegraded, fmt.Sprintf("%s on response headers", err))
}
case gatewayapi_v1alpha2.GRPCRouteFilterRequestMirror:
if filter.RequestMirror == nil || mirrorPolicy != nil {
// If more than one, we only take the first RequestMirror filter.
if filter.RequestMirror == nil || len(mirrorPolicies) > 0 {
continue
}

Expand All @@ -1444,12 +1445,12 @@ func (p *GatewayAPIProcessor) computeGRPCRouteForListener(route *gatewayapi_v1al
}
// If protocol is not set on the service, need to set a default one based on listener's protocol type.
setDefaultServiceProtocol(mirrorService, listener.listener.Protocol)
mirrorPolicy = &MirrorPolicy{
mirrorPolicies = append(mirrorPolicies, &MirrorPolicy{
Cluster: &Cluster{
Upstream: mirrorService,
},
Weight: 100,
}
})
default:
routeAccessor.AddCondition(
gatewayapi_v1beta1.RouteConditionAccepted,
Expand Down Expand Up @@ -1481,7 +1482,7 @@ func (p *GatewayAPIProcessor) computeGRPCRouteForListener(route *gatewayapi_v1al
matchconditions,
requestHeaderPolicy,
responseHeaderPolicy,
mirrorPolicy,
mirrorPolicies,
clusters,
totalWeight,
priority,
Expand Down Expand Up @@ -2043,7 +2044,7 @@ func (p *GatewayAPIProcessor) clusterRoutes(
matchConditions []*matchConditions,
requestHeaderPolicy *HeadersPolicy,
responseHeaderPolicy *HeadersPolicy,
mirrorPolicy *MirrorPolicy,
mirrorPolicies []*MirrorPolicy,
clusters []*Cluster,
totalWeight uint32,
priority uint8,
Expand Down Expand Up @@ -2071,7 +2072,7 @@ func (p *GatewayAPIProcessor) clusterRoutes(
QueryParamMatchConditions: mc.queryParams,
RequestHeadersPolicy: requestHeaderPolicy,
ResponseHeadersPolicy: responseHeaderPolicy,
MirrorPolicy: mirrorPolicy,
MirrorPolicies: mirrorPolicies,
Priority: priority,
PathRewritePolicy: pathRewritePolicy,
}
Expand Down
10 changes: 5 additions & 5 deletions internal/dag/httpproxy_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ func (p *HTTPProxyProcessor) computeRoutes(
MaxRequestsPerConnection: p.MaxRequestsPerConnection,
PerConnectionBufferLimitBytes: p.PerConnectionBufferLimitBytes,
}
if service.Mirror && r.MirrorPolicy != nil {
if service.Mirror && len(r.MirrorPolicies) > 0 {
validCond.AddError(contour_api_v1.ConditionTypeServiceError, "OnlyOneMirror",
"only one service per route may be nominated as mirror")
return nil
Expand All @@ -1024,15 +1024,15 @@ func (p *HTTPProxyProcessor) computeRoutes(
// EDGE CASE: This means that explicitly setting Weight to 0 will also result in 100%
// mirroring. The Mirror field must be set to false or removed to disable the mirror.
if service.Weight == 0 {
r.MirrorPolicy = &MirrorPolicy{
r.MirrorPolicies = []*MirrorPolicy{{
Cluster: c,
Weight: 100,
}
}}
} else {
r.MirrorPolicy = &MirrorPolicy{
r.MirrorPolicies = []*MirrorPolicy{{
Cluster: c,
Weight: service.Weight,
}
}}
}
} else {
r.Clusters = append(r.Clusters, c)
Expand Down
12 changes: 8 additions & 4 deletions internal/debug/dot.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ func collectDag(b DagBuilder) (nodeCollection, edgeCollection) {
nodes[route] = true

clusters := route.Clusters
if route.MirrorPolicy != nil && route.MirrorPolicy.Cluster != nil {
clusters = append(clusters, route.MirrorPolicy.Cluster)
for _, mp := range route.MirrorPolicies {
if mp.Cluster != nil {
clusters = append(clusters, mp.Cluster)
}
}
for _, cluster := range clusters {
edges[pair{route, cluster}] = true
Expand All @@ -93,8 +95,10 @@ func collectDag(b DagBuilder) (nodeCollection, edgeCollection) {
nodes[route] = true

clusters := route.Clusters
if route.MirrorPolicy != nil && route.MirrorPolicy.Cluster != nil {
clusters = append(clusters, route.MirrorPolicy.Cluster)
for _, mp := range route.MirrorPolicies {
if mp.Cluster != nil {
clusters = append(clusters, mp.Cluster)
}
}
for _, cluster := range clusters {
edges[pair{route, cluster}] = true
Expand Down
22 changes: 13 additions & 9 deletions internal/envoy/v3/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,19 +491,23 @@ func hashPolicy(requestHashPolicies []dag.RequestHashPolicy) []*envoy_route_v3.R
}

func mirrorPolicy(r *dag.Route) []*envoy_route_v3.RouteAction_RequestMirrorPolicy {
if r.MirrorPolicy == nil {
if len(r.MirrorPolicies) == 0 {
return nil
}

return []*envoy_route_v3.RouteAction_RequestMirrorPolicy{{
Cluster: envoy.Clustername(r.MirrorPolicy.Cluster),
RuntimeFraction: &envoy_core_v3.RuntimeFractionalPercent{
DefaultValue: &envoy_type_v3.FractionalPercent{
Numerator: uint32(r.MirrorPolicy.Weight),
Denominator: envoy_type_v3.FractionalPercent_HUNDRED,
mirrorPolicies := []*envoy_route_v3.RouteAction_RequestMirrorPolicy{}
for _, mp := range r.MirrorPolicies {
mirrorPolicies = append(mirrorPolicies, &envoy_route_v3.RouteAction_RequestMirrorPolicy{
Cluster: envoy.Clustername(mp.Cluster),
RuntimeFraction: &envoy_core_v3.RuntimeFractionalPercent{
DefaultValue: &envoy_type_v3.FractionalPercent{
Numerator: uint32(mp.Weight),
Denominator: envoy_type_v3.FractionalPercent_HUNDRED,
},
},
},
}}
})
}
return mirrorPolicies
}

func retryPolicy(r *dag.Route) *envoy_route_v3.RetryPolicy {
Expand Down
24 changes: 13 additions & 11 deletions internal/envoy/v3/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ func TestRouteRoute(t *testing.T) {
},
},

//TODO(liorlieberman) add multiple mirror test
"mirror": {
route: &dag.Route{
Clusters: []*dag.Cluster{{
Expand All @@ -690,20 +691,21 @@ func TestRouteRoute(t *testing.T) {
},
Weight: 90,
}},
MirrorPolicy: &dag.MirrorPolicy{
Cluster: &dag.Cluster{
Upstream: &dag.Service{
Weighted: dag.WeightedService{
Weight: 1,
ServiceName: s1.Name,
ServiceNamespace: s1.Namespace,
ServicePort: s1.Spec.Ports[0],
MirrorPolicies: []*dag.MirrorPolicy{
{
Cluster: &dag.Cluster{
Upstream: &dag.Service{
Weighted: dag.WeightedService{
Weight: 1,
ServiceName: s1.Name,
ServiceNamespace: s1.Namespace,
ServicePort: s1.Spec.Ports[0],
},
},
},
Weight: 100,
},
Weight: 100,
},
},
}},
want: &envoy_route_v3.Route_Route{
Route: &envoy_route_v3.RouteAction{
ClusterSpecifier: &envoy_route_v3.RouteAction_Cluster{
Expand Down