Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 336 additions & 0 deletions stats/opentelemetry/csm/pluginoption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package csm contains the implementation of the CSM Plugin Option.
package csm

import (
"context"
"encoding/base64"
"net/url"
"os"
"strings"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats/opentelemetry/internal"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
)

var logger = grpclog.Component("csm-observability-plugin")

// pluginOption emits CSM Labels from the environment and metadata exchange
// for csm channels and all servers.
//
// Do not use this directly; use newPluginOption instead.
type pluginOption struct {
// localLabels are the labels that identify the local environment a binary
// is run in, and will be emitted from the CSM Plugin Option.
localLabels map[string]string
// metadataExchangeLabelsEncoded are the metadata exchange labels to be sent
// as the value of metadata key "x-envoy-peer-metadata" in proto wire format
// and base 64 encoded. This gets sent out from all the servers running in
// this process and for csm channels.
metadataExchangeLabelsEncoded string
}

// newPluginOption returns a new pluginOption with local labels and metadata
// exchange labels derived from the environment.
func newPluginOption(ctx context.Context) internal.PluginOption {
localLabels, metadataExchangeLabelsEncoded := constructMetadataFromEnv(ctx)

return &pluginOption{
localLabels: localLabels,
metadataExchangeLabelsEncoded: metadataExchangeLabelsEncoded,
}
}

// NewLabelsMD returns a metadata.MD with the CSM labels as an encoded protobuf
// Struct as the value of "x-envoy-peer-metadata".
func (cpo *pluginOption) GetMetadata() metadata.MD {
return metadata.New(map[string]string{
metadataExchangeKey: cpo.metadataExchangeLabelsEncoded,
})
}

// GetLabels gets the CSM peer labels from the metadata provided. It returns
// "unknown" for labels not found. Labels returned depend on the remote type.
// Additionally, local labels determined at initialization time are appended to
// labels returned, in addition to the optionalLabels provided.
func (cpo *pluginOption) GetLabels(md metadata.MD, optionalLabels map[string]string) map[string]string {
labels := map[string]string{ // Remote labels if type is unknown (i.e. unset or error processing x-envoy-peer-metadata)
"csm.remote_workload_type": "unknown",
"csm.remote_workload_canonical_service": "unknown",
}
// Append the local labels.
for k, v := range cpo.localLabels {
labels[k] = v
}

// Append the optional labels. To avoid string comparisons, assume the
// caller only passes in two potential xDS Optional Labels: service_name and
// service_namespace.
for k, v := range optionalLabels {
labels[k] = v
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we merge these labels externally, rather than passing them in here and merging them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this confused me too. This is a built in function of how this API should work (one API call that emits labels from the three sources described above). Thus, it is intended for this layer to receive this arbitrary map of optional labels and make the determination as to which it is interested in. For this case, we assume the caller has processed these service_name and service_namespace for the sake of avoiding string comparisons.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why not remove it from the API then, if we're just going to use whatever is passed in?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be optional labels other than service_name and service_namespace. Immediately, we have "locality". Additionally, the CSM Plugin Options changes the name of the label from "service_name" to "csm.service_name".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh whoops I didn't prepend csm. for the two. Let me do that now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Immediately, we have "locality".

Then why doesn't this need to filter out locality or ensure it only passes through service_name[space]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm ok. I do plan on using the same API that I set xDS Labels with for the locality, and even though there will be orthogonal stats handlers for GCS it will still be in same call which will get merged with this, and call out into this. So I'll add a filter here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just talked to Yash about this, the intended behavior is to log "unknown" for the xDS labels that don't show up. Even though we want to make this general purpose, C++ actually just takes a struct here with two xDS Labels and a service_name label. Would you prefer that to avoid filtering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline with Yash; decided to do this in the OTel layer with an additional API which we will need for GCS SetOptionalLabel, and also prepend csm. in the xDS Client.

val := md.Get("x-envoy-peer-metadata")
// This can't happen if corresponding csm client because of proto wire
// format encoding, but since it is arbitrary off the wire be safe.
if len(val) != 1 {
logger.Warningf("length of md values of \"x-envoy-peer-metadata\" is not 1, is %v", len(val))
return labels
}

protoWireFormat, err := base64.RawStdEncoding.DecodeString(val[0])
if err != nil {
logger.Warningf("error base 64 decoding value of \"x-envoy-peer-metadata\": %v", err)
return labels
}

spb := &structpb.Struct{}
if err := proto.Unmarshal(protoWireFormat, spb); err != nil {
logger.Warningf("error unmarshalling value of \"x-envoy-peer-metadata\" into proto: %v", err)
return labels
}

fields := spb.GetFields()

labels["csm.remote_workload_type"] = getFromMetadata("type", fields)
// The value of “csm.remote_workload_canonical_service” comes from
// MetadataExchange with the key “canonical_service”. (Note that this should
// be read even if the remote type is unknown.)
labels["csm.remote_workload_canonical_service"] = getFromMetadata("canonical_service", fields)

// Unset/unknown types, and types that aren't GKE or GCP return early with
// just local labels, remote_workload_type and
// remote_workload_canonical_service labels.
workloadType := labels["csm.remote_workload_type"]
if workloadType != "gcp_kubernetes_engine" && workloadType != "gcp_compute_engine" {
return labels
}
// GKE and GCE labels.
labels["csm.remote_workload_project_id"] = getFromMetadata("project_id", fields)
labels["csm.remote_workload_location"] = getFromMetadata("location", fields)
labels["csm.remote_workload_name"] = getFromMetadata("workload_name", fields)
if workloadType == "gcp_compute_engine" {
return labels
}

// GKE only labels.
labels["csm.remote_workload_cluster_name"] = getFromMetadata("cluster_name", fields)
labels["csm.remote_workload_namespace_name"] = getFromMetadata("namespace_name", fields)
return labels
}

// getFromMetadata gets the value for the metadata key from the protobuf
// metadata. Returns "unknown" if the metadata is not found in the protobuf
// metadata, or if the value is not a string value. Returns the string value
// otherwise.
func getFromMetadata(metadataKey string, metadata map[string]*structpb.Value) string {
ret := "unknown"
if metadata != nil {
if metadataVal, ok := metadata[metadataKey]; ok {
if _, ok := metadataVal.GetKind().(*structpb.Value_StringValue); ok {
ret = metadataVal.GetStringValue()
}
}
}
return ret
}

// getFromResource gets the value for the resource key from the attribute set.
// Returns "unknown" if the resourceKey is not found in the attribute set or is
// not a string value, the string value otherwise.
func getFromResource(resourceKey attribute.Key, set *attribute.Set) string {
ret := "unknown"
if set != nil {
if resourceVal, ok := set.Value(resourceKey); ok && resourceVal.Type() == attribute.STRING {
ret = resourceVal.AsString()
}
}
return ret
}

// getEnv returns "unknown" if environment variable is unset, the environment
// variable otherwise.
func getEnv(name string) string {
ret := "unknown"
if val, ok := os.LookupEnv(name); ok {
ret = val
}
return ret
}

var (
// This function will be overridden in unit tests.
getAttrSetFromResourceDetector = func(ctx context.Context) *attribute.Set {
r, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector()))

if err != nil {
logger.Errorf("error reading OpenTelemetry resource: %v", err)
}
var set *attribute.Set
if r != nil {
set = r.Set()
}
return set
}
)

// constructMetadataFromEnv creates local labels and labels to send to the peer
// using metadata exchange based off resource detection and environment
// variables.
//
// Returns local labels, and base 64 encoded protobuf.Struct containing metadata
// exchange labels.
func constructMetadataFromEnv(ctx context.Context) (map[string]string, string) {
set := getAttrSetFromResourceDetector(ctx)

labels := make(map[string]string)
labels["type"] = getFromResource("cloud.platform", set)
labels["canonical_service"] = getEnv("CSM_CANONICAL_SERVICE_NAME")

// If type is not GCE or GKE only metadata exchange labels are "type" and
// "canonical_service".
cloudPlatformVal := labels["type"]
if cloudPlatformVal != "gcp_kubernetes_engine" && cloudPlatformVal != "gcp_compute_engine" {
return initializeLocalAndMetadataLabels(labels)
}

// GCE and GKE labels:
labels["workload_name"] = getEnv("CSM_WORKLOAD_NAME")

locationVal := "unknown"
if resourceVal, ok := set.Value("cloud.availability_zone"); ok && resourceVal.Type() == attribute.STRING {
locationVal = resourceVal.AsString()
} else if resourceVal, ok = set.Value("cloud.region"); ok && resourceVal.Type() == attribute.STRING {
locationVal = resourceVal.AsString()
}
labels["location"] = locationVal

labels["project_id"] = getFromResource("cloud.account.id", set)
if cloudPlatformVal == "gcp_compute_engine" {
return initializeLocalAndMetadataLabels(labels)
}

// GKE specific labels:
labels["namespace_name"] = getFromResource("k8s.namespace.name", set)
labels["cluster_name"] = getFromResource("k8s.cluster.name", set)
return initializeLocalAndMetadataLabels(labels)
}

// parseMeshIDString parses the mesh id from the node id according to the format
// "projects/[GCP Project number]/networks/mesh:[Mesh ID]/nodes/[UUID]". Returns
// "unknown" if there is a syntax error in the node ID.
func parseMeshIDFromNodeID(nodeID string) string {
meshSplit := strings.Split(nodeID, "/")
if len(meshSplit) != 6 {
return "unknown"
}
if meshSplit[0] != "projects" || meshSplit[2] != "networks" || meshSplit[4] != "nodes" {
return "unknown"
}
meshID, ok := strings.CutPrefix(meshSplit[3], "mesh:")
if !ok { // errors become "unknown"
return "unknown"
}
return meshID
}

// initializeLocalAndMetadataLabels csm local labels for a CSM Plugin Option to
// record. It also builds out a base 64 encoded protobuf.Struct containing the
// metadata exchange labels to be sent as part of metadata exchange from a CSM
// Plugin Option.
func initializeLocalAndMetadataLabels(labels map[string]string) (map[string]string, string) {
// The value of “csm.workload_canonical_service” comes from
// “CSM_CANONICAL_SERVICE_NAME” env var, “unknown” if unset.
val := labels["canonical_service"]
localLabels := make(map[string]string)
localLabels["csm.workload_canonical_service"] = val
// Get the CSM Mesh ID from the bootstrap file.
nodeID := getNodeID()
localLabels["csm.mesh_id"] = parseMeshIDFromNodeID(nodeID)

// Metadata exchange labels - can go ahead and encode into proto, and then
// base64.
pbLabels := &structpb.Struct{
Fields: map[string]*structpb.Value{},
}
for k, v := range labels {
pbLabels.Fields[k] = structpb.NewStringValue(v)
}
protoWireFormat, err := proto.Marshal(pbLabels)
metadataExchangeLabelsEncoded := ""
if err == nil {
metadataExchangeLabelsEncoded = base64.RawStdEncoding.EncodeToString(protoWireFormat)
}
// else - This behavior triggers server side to reply (if sent from a gRPC
// Client within this binary) with the metadata exchange labels. Even if
// client side has a problem marshaling proto into wire format, it can
// still use server labels so send an empty string as the value of
// x-envoy-peer-metadata. The presence of this metadata exchange header
// will cause server side to respond with metadata exchange labels.

return localLabels, metadataExchangeLabelsEncoded
}

// getNodeID gets the Node ID from the bootstrap data.
func getNodeID() string {
cfg, err := bootstrap.NewConfig()
if err != nil {
return "" // will become "unknown"
}
if cfg.NodeProto == nil {
return ""
}
return cfg.NodeProto.GetId()
}

// metadataExchangeKey is the key for HTTP metadata exchange.
const metadataExchangeKey = "x-envoy-peer-metadata"

func determineTargetCSM(target string) bool {
// On the client-side, the channel target is used to determine if a channel is a
// CSM channel or not. CSM channels need to have an “xds” scheme and a
// "traffic-director-global.xds.googleapis.com" authority. In the cases where no
// authority is mentioned, the authority is assumed to be CSM. MetadataExchange
// is performed only for CSM channels. Non-metadata exchange labels are detected
// as described below.
parsedTarget, err := url.Parse(target)
if err != nil {
// Shouldn't happen as Dial would fail if target couldn't be parsed, but
// log just in case to inform user.
logger.Errorf("passed in target %v failed to parse: %v", parsedTarget, err)
return false
}

if parsedTarget.Scheme == "xds" {
if parsedTarget.Host == "" {
return true // "In the cases where no authority is mentioned, the authority is assumed to be csm"
}
return parsedTarget.Host == "traffic-director-global.xds.googleapis.com"
}
return false
}
Loading