Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
var labels *istats.Labels
if labels = istats.GetLabels(ctx); labels == nil {
labels = &istats.Labels{
TelemetryLabels: make(map[string]string),
// The defaults for all the per call optional labels that this
// OpenTelemetry component currently supports.
TelemetryLabels: map[string]string{
"grpc.lb.locality": "",
},
}
ctx = istats.SetLabels(ctx, labels)
}
Expand Down Expand Up @@ -232,8 +236,11 @@ func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo,
}

for _, o := range h.options.MetricsOptions.OptionalLabels {
// TODO: Add a filter for converting to unknown if not present in the
// CSM Plugin Option layer by adding an optional labels API.
if val, ok := ai.xdsLabels[o]; ok {
attributes = append(attributes, otelattribute.String(o, val))
continue
}
}

Expand Down
4 changes: 4 additions & 0 deletions stats/opentelemetry/csm/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (o *perTargetDialOption) DialOptionForTarget(parsedTarget url.URL) grpc.Dia

func dialOptionWithCSMPluginOption(options opentelemetry.Options, po otelinternal.PluginOption) grpc.DialOption {
options.MetricsOptions.OptionalLabels = []string{"csm.service_name", "csm.service_namespace_name"} // Attach the two xDS Optional Labels for this component to not filter out.
return dialOptionSetCSM(options, po)
}

func dialOptionSetCSM(options opentelemetry.Options, po otelinternal.PluginOption) grpc.DialOption {
otelinternal.SetPluginOption.(func(options *opentelemetry.Options, po otelinternal.PluginOption))(&options, po)
return opentelemetry.DialOption(options)
}
Expand Down
16 changes: 11 additions & 5 deletions stats/opentelemetry/csm/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,12 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) {
func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = istats.SetLabels(ctx, &istats.Labels{
TelemetryLabels: map[string]string{
// mock what the cluster impl would write here ("csm." xDS Labels)
// mock what the cluster impl would write here ("csm." xDS Labels
// and locality label)
"csm.service_name": "service_name_val",
"csm.service_namespace_name": "service_namespace_val",

"grpc.lb.locality": "grpc.lb.locality_val",
},
})

Expand All @@ -441,8 +444,9 @@ func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, re
// Optional Labels turned on. It then configures an interceptor to attach
// labels, representing the cluster_impl picker. It then makes a unary RPC, and
// expects xDS Labels labels to be attached to emitted relevant metrics. Full
// xDS System alongside OpenTelemetry will be tested with interop. (there is
// a test for xDS -> Stats handler and this tests -> OTel -> emission).
// xDS System alongside OpenTelemetry will be tested with interop. (there is a
// test for xDS -> Stats handler and this tests -> OTel -> emission). It also
// tests the optional per call locality label in the same manner.
func (s) TestXDSLabels(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
Expand All @@ -457,11 +461,11 @@ func (s) TestXDSLabels(t *testing.T) {
}

po := newPluginOption(ctx)
dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{
dopts := []grpc.DialOption{dialOptionSetCSM(opentelemetry.Options{
MetricsOptions: opentelemetry.MetricsOptions{
MeterProvider: provider,
Metrics: opentelemetry.DefaultMetrics(),
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"},
OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality"},
},
}, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)}
if err := ss.Start(nil, dopts...); err != nil {
Expand Down Expand Up @@ -489,6 +493,7 @@ func (s) TestXDSLabels(t *testing.T) {

serviceNameAttr := attribute.String("csm.service_name", "service_name_val")
serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val")
localityAttr := attribute.String("grpc.lb.locality", "grpc.lb.locality_val")
meshIDAttr := attribute.String("csm.mesh_id", "unknown")
workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown")
remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown")
Expand All @@ -500,6 +505,7 @@ func (s) TestXDSLabels(t *testing.T) {
unaryStatusAttr,
serviceNameAttr,
serviceNamespaceAttr,
localityAttr,
meshIDAttr,
workloadCanonicalServiceAttr,
remoteWorkloadTypeAttr,
Expand Down
7 changes: 6 additions & 1 deletion test/xds/xds_telemetry_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const serviceNamespaceKeyCSM = "csm.service_namespace_name"
const serviceNameValue = "grpc-service"
const serviceNamespaceValue = "grpc-service-namespace"

const localityKey = "grpc.lb.locality"
const localityValue = "{\"region\":\"region-1\",\"zone\":\"zone-1\",\"subZone\":\"subzone-1\"}"

// TestTelemetryLabels tests that telemetry labels from CDS make their way to
// the stats handler. The stats handler sets the mutable context value that the
// cluster impl picker will write telemetry labels to, and then the stats
Expand Down Expand Up @@ -132,7 +135,9 @@ func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKeyCSM]; !ok || label != serviceNamespaceValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKeyCSM, serviceNamespaceValue, label)
}

if label, ok := fsh.labels.TelemetryLabels[localityKey]; !ok || label != localityValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", localityKey, localityValue, label)
}
default:
// Nothing to assert for the other stats.Handler callouts.
}
Expand Down
6 changes: 6 additions & 0 deletions xds/internal/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return pr, err
}

if info.Ctx != nil {
if labels := stats.GetLabels(info.Ctx); labels != nil && labels.TelemetryLabels != nil {
labels.TelemetryLabels["grpc.lb.locality"] = lIDStr
}
}

if d.loadStore != nil {
d.loadStore.CallStarted(lIDStr)
oldDone := pr.Done
Expand Down