Skip to content

Commit 6f853db

Browse files
committed
Process telemetry labels from CDS in xDS Balancer's
1 parent 308dbc4 commit 6f853db

File tree

8 files changed

+222
-19
lines changed

8 files changed

+222
-19
lines changed

internal/stats/labels.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package stats
20+
21+
import "context"
22+
23+
// Labels are the labels for metrics
24+
type Labels struct {
25+
// TelemetryLabels are the telemetry labels to record.
26+
TelemetryLabels map[string]string
27+
}
28+
29+
type labelsKey struct{}
30+
31+
// GetLabels returns the Labels stored in theo context, or nil if there is one
32+
func GetLabels(ctx context.Context) *Labels {
33+
labels, _ := ctx.Value(labelsKey{}).(*Labels)
34+
return labels
35+
}
36+
37+
// SetLabels sets the Labels
38+
func SetLabels(ctx context.Context, labels *Labels) context.Context {
39+
// could also append
40+
return context.WithValue(ctx, labelsKey{}, labels)
41+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
package xds_test
19+
20+
import (
21+
"context"
22+
"fmt"
23+
"testing"
24+
25+
"google.golang.org/grpc"
26+
"google.golang.org/grpc/credentials/insecure"
27+
istats "google.golang.org/grpc/internal/stats"
28+
"google.golang.org/grpc/internal/stubserver"
29+
"google.golang.org/grpc/internal/testutils"
30+
"google.golang.org/grpc/internal/testutils/xds/e2e"
31+
testgrpc "google.golang.org/grpc/interop/grpc_testing"
32+
testpb "google.golang.org/grpc/interop/grpc_testing"
33+
"google.golang.org/grpc/stats"
34+
35+
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
36+
"google.golang.org/protobuf/types/known/structpb"
37+
)
38+
39+
const serviceNameKey = "service_name"
40+
const serviceNamespaceKey = "service_namespace"
41+
const serviceNameValue = "grpc-service"
42+
const serviceNamespaceValue = "grpc-service-namespace"
43+
44+
// TestTelemetryLabels tests that telemetry labels from CDS make their way to
45+
// the stats handler. The stats handler sets the mutable context value that the
46+
// cluster impl picker will write telemetry labels to, and then the stats
47+
// handler asserts that subsequent HandleRPC calls from the RPC lifecycle
48+
// contain telemetry labels that it can see.
49+
func (s) TestTelemetryLabels(t *testing.T) {
50+
managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
51+
defer cleanup1()
52+
53+
server := stubserver.StartTestService(t, nil)
54+
defer server.Stop()
55+
56+
const xdsServiceName = "my-service-client-side-xds"
57+
resources := e2e.DefaultClientResources(e2e.ResourceParams{
58+
DialTarget: xdsServiceName,
59+
NodeID: nodeID,
60+
Host: "localhost",
61+
Port: testutils.ParsePort(t, server.Address),
62+
SecLevel: e2e.SecurityLevelNone,
63+
})
64+
65+
resources.Clusters[0].Metadata = &v3corepb.Metadata{
66+
FilterMetadata: map[string]*structpb.Struct{
67+
"com.google.csm.telemetry_labels": {
68+
Fields: map[string]*structpb.Value{
69+
serviceNameKey: structpb.NewStringValue(serviceNameValue),
70+
serviceNamespaceKey: structpb.NewStringValue(serviceNamespaceValue),
71+
},
72+
},
73+
},
74+
}
75+
76+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
77+
defer cancel()
78+
if err := managementServer.Update(ctx, resources); err != nil {
79+
t.Fatal(err)
80+
}
81+
82+
fsh := &fakeStatsHandler{
83+
t: t,
84+
}
85+
86+
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", xdsServiceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver), grpc.WithStatsHandler(fsh))
87+
if err != nil {
88+
t.Fatalf("failed to create a new client to local test server: %v", err)
89+
}
90+
defer cc.Close()
91+
92+
client := testgrpc.NewTestServiceClient(cc)
93+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
94+
t.Fatalf("rpc EmptyCall() failed: %v", err)
95+
}
96+
}
97+
98+
type fakeStatsHandler struct {
99+
labels *istats.Labels
100+
101+
t *testing.T
102+
}
103+
104+
func (fsh *fakeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
105+
return ctx
106+
}
107+
108+
func (fsh *fakeStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
109+
110+
func (fsh *fakeStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
111+
labels := &istats.Labels{
112+
TelemetryLabels: make(map[string]string),
113+
}
114+
fsh.labels = labels
115+
ctx = istats.SetLabels(ctx, labels) // ctx passed is immutable, however cluster_impl writes to the map of Telemetry Labels on the heap.
116+
return ctx
117+
}
118+
119+
func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
120+
switch rs.(type) {
121+
// stats.Begin won't get Telemetry Labels because happens after picker
122+
// picks.
123+
124+
// These three stats callouts trigger all metrics for OpenTelemetry that
125+
// aren't started. All of these should have access to the desired telemetry
126+
// labels.
127+
case *stats.OutPayload:
128+
case *stats.InPayload:
129+
case *stats.End:
130+
if label, ok := fsh.labels.TelemetryLabels[serviceNameKey]; !ok || label != serviceNameValue {
131+
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKey, serviceNameValue, label)
132+
}
133+
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKey]; !ok || label != serviceNamespaceValue {
134+
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKey, serviceNamespaceValue, label)
135+
}
136+
137+
default:
138+
// Nothing to assert for the other stats.Handler callouts.
139+
}
140+
141+
}

xds/internal/balancer/cdsbalancer/cdsbalancer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,8 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste
645645
}
646646
dm.OutlierDetection = odJSON
647647

648+
dm.TelemetryLabels = cluster.TelemetryLabels
649+
648650
return append(dms, dm), true, nil
649651
}
650652

xds/internal/balancer/clusterimpl/clusterimpl.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ type clusterImplBalancer struct {
123123
requestCounterService string // The service name for the request counter.
124124
requestCounter *xdsclient.ClusterRequestsCounter
125125
requestCountMax uint32
126+
telemetryLabels map[string]string
126127
pickerUpdateCh *buffer.Unbounded
127128
}
128129

@@ -469,14 +470,15 @@ func (b *clusterImplBalancer) run() {
469470
drops: b.drops,
470471
requestCounter: b.requestCounter,
471472
requestCountMax: b.requestCountMax,
472-
}, b.loadWrapper),
473+
}, b.loadWrapper, b.telemetryLabels),
473474
})
474475
case *LBConfig:
476+
b.telemetryLabels = u.TelemetryLabels
475477
dc := b.handleDropAndRequestCount(u)
476478
if dc != nil && b.childState.Picker != nil {
477479
b.ClientConn.UpdateState(balancer.State{
478480
ConnectivityState: b.childState.ConnectivityState,
479-
Picker: newPicker(b.childState, dc, b.loadWrapper),
481+
Picker: newPicker(b.childState, dc, b.loadWrapper, b.telemetryLabels),
480482
})
481483
}
482484
}

xds/internal/balancer/clusterimpl/config.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ type LBConfig struct {
4040
EDSServiceName string `json:"edsServiceName,omitempty"`
4141
// LoadReportingServer is the LRS server to send load reports to. If not
4242
// present, load reporting will be disabled.
43-
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
44-
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
45-
DropCategories []DropConfig `json:"dropCategories,omitempty"`
46-
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
43+
LoadReportingServer *bootstrap.ServerConfig `json:"lrsLoadReportingServer,omitempty"`
44+
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
45+
DropCategories []DropConfig `json:"dropCategories,omitempty"`
46+
// TelemetryLabels are the telemetry Labels associated with this cluster.
47+
TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"`
48+
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
4749
}
4850

4951
func parseConfig(c json.RawMessage) (*LBConfig, error) {

xds/internal/balancer/clusterimpl/picker.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"google.golang.org/grpc/balancer"
2323
"google.golang.org/grpc/codes"
2424
"google.golang.org/grpc/connectivity"
25+
"google.golang.org/grpc/internal/stats"
2526
"google.golang.org/grpc/internal/wrr"
2627
"google.golang.org/grpc/status"
2728
"google.golang.org/grpc/xds/internal/xdsclient"
@@ -78,24 +79,34 @@ type loadReporter interface {
7879

7980
// Picker implements RPC drop, circuit breaking drop and load reporting.
8081
type picker struct {
81-
drops []*dropper
82-
s balancer.State
83-
loadStore loadReporter
84-
counter *xdsclient.ClusterRequestsCounter
85-
countMax uint32
82+
drops []*dropper
83+
s balancer.State
84+
loadStore loadReporter
85+
counter *xdsclient.ClusterRequestsCounter
86+
countMax uint32
87+
telemetryLabels map[string]string
8688
}
8789

88-
func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *picker {
90+
func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter, telemetryLabels map[string]string) *picker {
8991
return &picker{
90-
drops: config.drops,
91-
s: s,
92-
loadStore: loadStore,
93-
counter: config.requestCounter,
94-
countMax: config.requestCountMax,
92+
drops: config.drops,
93+
s: s,
94+
loadStore: loadStore,
95+
counter: config.requestCounter,
96+
countMax: config.requestCountMax,
97+
telemetryLabels: telemetryLabels,
9598
}
9699
}
97100

98101
func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
102+
if info.Ctx != nil {
103+
if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil {
104+
for key, value := range d.telemetryLabels {
105+
labels.TelemetryLabels[key] = value
106+
}
107+
} // Unconditionally set, even dropped or queued RPC's can use this label.
108+
}
109+
99110
// Don't drop unless the inner picker is READY. Similar to
100111
// https://github.com/grpc/grpc-go/issues/2622.
101112
if d.s.ConnectivityState == connectivity.Ready {

xds/internal/balancer/clusterresolver/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ type DiscoveryMechanism struct {
103103
// OutlierDetection is the Outlier Detection LB configuration for this
104104
// priority.
105105
OutlierDetection json.RawMessage `json:"outlierDetection,omitempty"`
106+
// TelemetryLabels are the telemetry labels associated with this cluster.
107+
TelemetryLabels map[string]string `json:"telemetryLabels,omitempty"`
106108
outlierDetection outlierdetection.LBConfig
107109
}
108110

xds/internal/balancer/clusterresolver/configbuilder.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,9 @@ func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism
146146
retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
147147
}
148148
return pName, &clusterimpl.LBConfig{
149-
Cluster: mechanism.Cluster,
150-
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
149+
Cluster: mechanism.Cluster,
150+
TelemetryLabels: mechanism.TelemetryLabels,
151+
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
151152
}, retAddrs
152153
}
153154

@@ -283,6 +284,7 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority
283284
EDSServiceName: mechanism.EDSServiceName,
284285
LoadReportingServer: mechanism.LoadReportingServer,
285286
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
287+
TelemetryLabels: mechanism.TelemetryLabels,
286288
DropCategories: drops,
287289
ChildPolicy: xdsLBPolicy,
288290
}, addrs, nil

0 commit comments

Comments
 (0)