Skip to content

Commit d1730c5

Browse files
committed
add initial code to support multiple mirror filters
1 parent d60959e commit d1730c5

File tree

8 files changed

+75
-58
lines changed

8 files changed

+75
-58
lines changed

internal/dag/accessors.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,10 @@ func (d *DAG) GetClusters() []*Cluster {
202202
for _, route := range vhost.Routes {
203203
res = append(res, route.Clusters...)
204204

205-
if route.MirrorPolicy != nil && route.MirrorPolicy.Cluster != nil {
206-
res = append(res, route.MirrorPolicy.Cluster)
205+
for _, mp := range route.MirrorPolicies {
206+
if mp.Cluster != nil {
207+
res = append(res, mp.Cluster)
208+
}
207209
}
208210
}
209211
}
@@ -212,8 +214,10 @@ func (d *DAG) GetClusters() []*Cluster {
212214
for _, route := range vhost.Routes {
213215
res = append(res, route.Clusters...)
214216

215-
if route.MirrorPolicy != nil && route.MirrorPolicy.Cluster != nil {
216-
res = append(res, route.MirrorPolicy.Cluster)
217+
for _, mp := range route.MirrorPolicies {
218+
if mp.Cluster != nil {
219+
res = append(res, mp.Cluster)
220+
}
217221
}
218222
}
219223

internal/dag/builder_test.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3840,7 +3840,7 @@ func TestDAGInsertGatewayAPI(t *testing.T) {
38403840
&Listener{
38413841
Name: "http-80",
38423842
VirtualHosts: virtualhosts(virtualhost("test.projectcontour.io",
3843-
withMirror(prefixrouteHTTPRoute("/", service(kuardService)), service(kuardService2), 100))),
3843+
withMirror(prefixrouteHTTPRoute("/", service(kuardService)), []*Service{service(kuardService2)}, 100))),
38443844
},
38453845
),
38463846
},
@@ -3882,8 +3882,8 @@ func TestDAGInsertGatewayAPI(t *testing.T) {
38823882
&Listener{
38833883
Name: "http-80",
38843884
VirtualHosts: virtualhosts(virtualhost("test.projectcontour.io",
3885-
withMirror(prefixrouteHTTPRoute("/", service(kuardService)), service(kuardService2), 100),
3886-
withMirror(segmentPrefixHTTPRoute("/another-match", service(kuardService)), service(kuardService2), 100),
3885+
withMirror(prefixrouteHTTPRoute("/", service(kuardService)), []*Service{service(kuardService2)}, 100),
3886+
withMirror(segmentPrefixHTTPRoute("/another-match", service(kuardService)), []*Service{service(kuardService2)}, 100),
38873887
)),
38883888
},
38893889
),
@@ -5702,7 +5702,7 @@ func TestDAGInsertGatewayAPI(t *testing.T) {
57025702
&Listener{
57035703
Name: "http-80",
57045704
VirtualHosts: virtualhosts(virtualhost("test.projectcontour.io",
5705-
withMirror(exactrouteGRPCRoute("/io.projectcontour/Login", grpcService(kuardService, "h2c")), grpcService(kuardService2, "h2c"), 100))),
5705+
withMirror(exactrouteGRPCRoute("/io.projectcontour/Login", grpcService(kuardService, "h2c")), []*Service{grpcService(kuardService2, "h2c")}, 100))),
57065706
},
57075707
),
57085708
},
@@ -11169,7 +11169,7 @@ func TestDAGInsert(t *testing.T) {
1116911169
Port: 8080,
1117011170
VirtualHosts: virtualhosts(
1117111171
virtualhost("example.com",
11172-
withMirror(prefixroute("/", service(s1)), service(s2), 100),
11172+
withMirror(prefixroute("/", service(s1)), []*Service{service(s2)}, 100),
1117311173
),
1117411174
),
1117511175
},
@@ -16223,12 +16223,14 @@ func prefixSegment(prefix string) MatchCondition {
1622316223
func exact(path string) MatchCondition { return &ExactMatchCondition{Path: path} }
1622416224
func regex(regex string) MatchCondition { return &RegexMatchCondition{Regex: regex} }
1622516225

16226-
func withMirror(r *Route, mirror *Service, weight int64) *Route {
16227-
r.MirrorPolicy = &MirrorPolicy{
16228-
Cluster: &Cluster{
16229-
Upstream: mirror,
16230-
},
16231-
Weight: weight,
16226+
func withMirror(r *Route, mirrors []*Service, weight int64) *Route {
16227+
for _, mirror := range mirrors {
16228+
r.MirrorPolicies = append(r.MirrorPolicies, &MirrorPolicy{
16229+
Cluster: &Cluster{
16230+
Upstream: mirror,
16231+
},
16232+
Weight: weight,
16233+
})
1623216234
}
1623316235
return r
1623416236
}

internal/dag/dag.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,8 @@ type Route struct {
319319
// Indicates that during forwarding, the matched prefix (or path) should be swapped with this value
320320
PathRewritePolicy *PathRewritePolicy
321321

322-
// Mirror Policy defines the mirroring policy for this Route.
323-
MirrorPolicy *MirrorPolicy
322+
// MirrorPolicies is a list defining the mirroring policies for this Route.
323+
MirrorPolicies []*MirrorPolicy
324324

325325
// RequestHeadersPolicy defines how headers are managed during forwarding
326326
RequestHeadersPolicy *HeadersPolicy

internal/dag/gatewayapi_processor.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,7 @@ func (p *GatewayAPIProcessor) computeHTTPRouteForListener(route *gatewayapi_v1be
11241124
requestHeaderPolicy *HeadersPolicy
11251125
responseHeaderPolicy *HeadersPolicy
11261126
redirect *Redirect
1127-
mirrorPolicy *MirrorPolicy
1127+
mirrorPolicies []*MirrorPolicy
11281128
pathRewritePolicy *PathRewritePolicy
11291129
urlRewriteHostname string
11301130
)
@@ -1222,7 +1222,7 @@ func (p *GatewayAPIProcessor) computeHTTPRouteForListener(route *gatewayapi_v1be
12221222
PathRewritePolicy: pathRewritePolicy,
12231223
}
12241224
case gatewayapi_v1beta1.HTTPRouteFilterRequestMirror:
1225-
if filter.RequestMirror == nil || mirrorPolicy != nil {
1225+
if filter.RequestMirror == nil {
12261226
continue
12271227
}
12281228

@@ -1231,12 +1231,12 @@ func (p *GatewayAPIProcessor) computeHTTPRouteForListener(route *gatewayapi_v1be
12311231
routeAccessor.AddCondition(gatewayapi_v1beta1.RouteConditionType(cond.Type), cond.Status, gatewayapi_v1beta1.RouteConditionReason(cond.Reason), cond.Message)
12321232
continue
12331233
}
1234-
mirrorPolicy = &MirrorPolicy{
1234+
mirrorPolicies = append(mirrorPolicies, &MirrorPolicy{
12351235
Cluster: &Cluster{
12361236
Upstream: mirrorService,
12371237
},
12381238
Weight: 100,
1239-
}
1239+
})
12401240
case gatewayapi_v1beta1.HTTPRouteFilterURLRewrite:
12411241
if filter.URLRewrite == nil || pathRewritePolicy != nil {
12421242
continue
@@ -1337,7 +1337,7 @@ func (p *GatewayAPIProcessor) computeHTTPRouteForListener(route *gatewayapi_v1be
13371337
matchconditions,
13381338
requestHeaderPolicy,
13391339
responseHeaderPolicy,
1340-
mirrorPolicy,
1340+
mirrorPolicies,
13411341
clusters,
13421342
totalWeight,
13431343
priority,
@@ -1403,7 +1403,7 @@ func (p *GatewayAPIProcessor) computeGRPCRouteForListener(route *gatewayapi_v1al
14031403
// Process rule-level filters.
14041404
var (
14051405
requestHeaderPolicy, responseHeaderPolicy *HeadersPolicy
1406-
mirrorPolicy *MirrorPolicy
1406+
mirrorPolicies []*MirrorPolicy
14071407
)
14081408

14091409
// Per Gateway API docs: "Specifying a core filter multiple times
@@ -1433,7 +1433,8 @@ func (p *GatewayAPIProcessor) computeGRPCRouteForListener(route *gatewayapi_v1al
14331433
routeAccessor.AddCondition(gatewayapi_v1beta1.RouteConditionResolvedRefs, metav1.ConditionFalse, status.ReasonDegraded, fmt.Sprintf("%s on response headers", err))
14341434
}
14351435
case gatewayapi_v1alpha2.GRPCRouteFilterRequestMirror:
1436-
if filter.RequestMirror == nil || mirrorPolicy != nil {
1436+
// If more than one, we only take the first RequestMirror filter.
1437+
if filter.RequestMirror == nil || len(mirrorPolicies) > 0 {
14371438
continue
14381439
}
14391440

@@ -1444,12 +1445,12 @@ func (p *GatewayAPIProcessor) computeGRPCRouteForListener(route *gatewayapi_v1al
14441445
}
14451446
// If protocol is not set on the service, need to set a default one based on listener's protocol type.
14461447
setDefaultServiceProtocol(mirrorService, listener.listener.Protocol)
1447-
mirrorPolicy = &MirrorPolicy{
1448+
mirrorPolicies = append(mirrorPolicies, &MirrorPolicy{
14481449
Cluster: &Cluster{
14491450
Upstream: mirrorService,
14501451
},
14511452
Weight: 100,
1452-
}
1453+
})
14531454
default:
14541455
routeAccessor.AddCondition(
14551456
gatewayapi_v1beta1.RouteConditionAccepted,
@@ -1481,7 +1482,7 @@ func (p *GatewayAPIProcessor) computeGRPCRouteForListener(route *gatewayapi_v1al
14811482
matchconditions,
14821483
requestHeaderPolicy,
14831484
responseHeaderPolicy,
1484-
mirrorPolicy,
1485+
mirrorPolicies,
14851486
clusters,
14861487
totalWeight,
14871488
priority,
@@ -2043,7 +2044,7 @@ func (p *GatewayAPIProcessor) clusterRoutes(
20432044
matchConditions []*matchConditions,
20442045
requestHeaderPolicy *HeadersPolicy,
20452046
responseHeaderPolicy *HeadersPolicy,
2046-
mirrorPolicy *MirrorPolicy,
2047+
mirrorPolicies []*MirrorPolicy,
20472048
clusters []*Cluster,
20482049
totalWeight uint32,
20492050
priority uint8,
@@ -2071,7 +2072,7 @@ func (p *GatewayAPIProcessor) clusterRoutes(
20712072
QueryParamMatchConditions: mc.queryParams,
20722073
RequestHeadersPolicy: requestHeaderPolicy,
20732074
ResponseHeadersPolicy: responseHeaderPolicy,
2074-
MirrorPolicy: mirrorPolicy,
2075+
MirrorPolicies: mirrorPolicies,
20752076
Priority: priority,
20762077
PathRewritePolicy: pathRewritePolicy,
20772078
}

internal/dag/httpproxy_processor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,7 @@ func (p *HTTPProxyProcessor) computeRoutes(
10131013
MaxRequestsPerConnection: p.MaxRequestsPerConnection,
10141014
PerConnectionBufferLimitBytes: p.PerConnectionBufferLimitBytes,
10151015
}
1016-
if service.Mirror && r.MirrorPolicy != nil {
1016+
if service.Mirror && len(r.MirrorPolicies) != 0 {
10171017
validCond.AddError(contour_api_v1.ConditionTypeServiceError, "OnlyOneMirror",
10181018
"only one service per route may be nominated as mirror")
10191019
return nil
@@ -1024,15 +1024,15 @@ func (p *HTTPProxyProcessor) computeRoutes(
10241024
// EDGE CASE: This means that explicitly setting Weight to 0 will also result in 100%
10251025
// mirroring. The Mirror field must be set to false or removed to disable the mirror.
10261026
if service.Weight == 0 {
1027-
r.MirrorPolicy = &MirrorPolicy{
1027+
r.MirrorPolicies = []*MirrorPolicy{{
10281028
Cluster: c,
10291029
Weight: 100,
1030-
}
1030+
}}
10311031
} else {
1032-
r.MirrorPolicy = &MirrorPolicy{
1032+
r.MirrorPolicies = []*MirrorPolicy{{
10331033
Cluster: c,
10341034
Weight: service.Weight,
1035-
}
1035+
}}
10361036
}
10371037
} else {
10381038
r.Clusters = append(r.Clusters, c)

internal/debug/dot.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@ func collectDag(b DagBuilder) (nodeCollection, edgeCollection) {
6969
nodes[route] = true
7070

7171
clusters := route.Clusters
72-
if route.MirrorPolicy != nil && route.MirrorPolicy.Cluster != nil {
73-
clusters = append(clusters, route.MirrorPolicy.Cluster)
72+
for _, mp := range route.MirrorPolicies {
73+
if mp.Cluster != nil {
74+
clusters = append(clusters, mp.Cluster)
75+
}
7476
}
7577
for _, cluster := range clusters {
7678
edges[pair{route, cluster}] = true
@@ -93,8 +95,10 @@ func collectDag(b DagBuilder) (nodeCollection, edgeCollection) {
9395
nodes[route] = true
9496

9597
clusters := route.Clusters
96-
if route.MirrorPolicy != nil && route.MirrorPolicy.Cluster != nil {
97-
clusters = append(clusters, route.MirrorPolicy.Cluster)
98+
for _, mp := range route.MirrorPolicies {
99+
if mp.Cluster != nil {
100+
clusters = append(clusters, mp.Cluster)
101+
}
98102
}
99103
for _, cluster := range clusters {
100104
edges[pair{route, cluster}] = true

internal/envoy/v3/route.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -491,19 +491,23 @@ func hashPolicy(requestHashPolicies []dag.RequestHashPolicy) []*envoy_route_v3.R
491491
}
492492

493493
func mirrorPolicy(r *dag.Route) []*envoy_route_v3.RouteAction_RequestMirrorPolicy {
494-
if r.MirrorPolicy == nil {
494+
if len(r.MirrorPolicies) == 0 {
495495
return nil
496496
}
497497

498-
return []*envoy_route_v3.RouteAction_RequestMirrorPolicy{{
499-
Cluster: envoy.Clustername(r.MirrorPolicy.Cluster),
500-
RuntimeFraction: &envoy_core_v3.RuntimeFractionalPercent{
501-
DefaultValue: &envoy_type_v3.FractionalPercent{
502-
Numerator: uint32(r.MirrorPolicy.Weight),
503-
Denominator: envoy_type_v3.FractionalPercent_HUNDRED,
498+
mirrorPolicies := []*envoy_route_v3.RouteAction_RequestMirrorPolicy{}
499+
for _, mp := range r.MirrorPolicies {
500+
mirrorPolicies = append(mirrorPolicies, &envoy_route_v3.RouteAction_RequestMirrorPolicy{
501+
Cluster: envoy.Clustername(mp.Cluster),
502+
RuntimeFraction: &envoy_core_v3.RuntimeFractionalPercent{
503+
DefaultValue: &envoy_type_v3.FractionalPercent{
504+
Numerator: uint32(mp.Weight),
505+
Denominator: envoy_type_v3.FractionalPercent_HUNDRED,
506+
},
504507
},
505-
},
506-
}}
508+
})
509+
}
510+
return mirrorPolicies
507511
}
508512

509513
func retryPolicy(r *dag.Route) *envoy_route_v3.RetryPolicy {

internal/envoy/v3/route_test.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,7 @@ func TestRouteRoute(t *testing.T) {
677677
},
678678
},
679679

680+
//TODO(liorlieberman) add multiple mirror test
680681
"mirror": {
681682
route: &dag.Route{
682683
Clusters: []*dag.Cluster{{
@@ -690,20 +691,21 @@ func TestRouteRoute(t *testing.T) {
690691
},
691692
Weight: 90,
692693
}},
693-
MirrorPolicy: &dag.MirrorPolicy{
694-
Cluster: &dag.Cluster{
695-
Upstream: &dag.Service{
696-
Weighted: dag.WeightedService{
697-
Weight: 1,
698-
ServiceName: s1.Name,
699-
ServiceNamespace: s1.Namespace,
700-
ServicePort: s1.Spec.Ports[0],
694+
MirrorPolicies: []*dag.MirrorPolicy{
695+
{
696+
Cluster: &dag.Cluster{
697+
Upstream: &dag.Service{
698+
Weighted: dag.WeightedService{
699+
Weight: 1,
700+
ServiceName: s1.Name,
701+
ServiceNamespace: s1.Namespace,
702+
ServicePort: s1.Spec.Ports[0],
703+
},
701704
},
702705
},
706+
Weight: 100,
703707
},
704-
Weight: 100,
705-
},
706-
},
708+
}},
707709
want: &envoy_route_v3.Route_Route{
708710
Route: &envoy_route_v3.RouteAction{
709711
ClusterSpecifier: &envoy_route_v3.RouteAction_Cluster{

0 commit comments

Comments
 (0)