Skip to content

Commit f1aceb0

Browse files
authored
stats/opentelemetry: CSM Observability server side component changes (#7264)
1 parent 8138555 commit f1aceb0

File tree

3 files changed

+191
-70
lines changed

3 files changed

+191
-70
lines changed

stats/opentelemetry/client_metrics.go

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -37,36 +37,36 @@ type clientStatsHandler struct {
3737
clientMetrics clientMetrics
3838
}
3939

40-
func (csh *clientStatsHandler) initializeMetrics() {
40+
func (h *clientStatsHandler) initializeMetrics() {
4141
// Will set no metrics to record, logically making this stats handler a
4242
// no-op.
43-
if csh.o.MetricsOptions.MeterProvider == nil {
43+
if h.o.MetricsOptions.MeterProvider == nil {
4444
return
4545
}
4646

47-
meter := csh.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version)
47+
meter := h.o.MetricsOptions.MeterProvider.Meter("grpc-go " + grpc.Version)
4848
if meter == nil {
4949
return
5050
}
5151

52-
setOfMetrics := csh.o.MetricsOptions.Metrics.metrics
52+
setOfMetrics := h.o.MetricsOptions.Metrics.metrics
5353

54-
csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
55-
csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
56-
csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
57-
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
58-
csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
54+
h.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
55+
h.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
56+
h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
57+
h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
58+
h.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
5959
}
6060

61-
func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
61+
func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
6262
ci := &callInfo{
63-
target: csh.determineTarget(cc),
64-
method: csh.determineMethod(method, opts...),
63+
target: h.determineTarget(cc),
64+
method: h.determineMethod(method, opts...),
6565
}
6666
ctx = setCallInfo(ctx, ci)
6767

68-
if csh.o.MetricsOptions.pluginOption != nil {
69-
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
68+
if h.o.MetricsOptions.pluginOption != nil {
69+
md := h.o.MetricsOptions.pluginOption.GetMetadata()
7070
for k, vs := range md {
7171
for _, v := range vs {
7272
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
@@ -76,16 +76,16 @@ func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method stri
7676

7777
startTime := time.Now()
7878
err := invoker(ctx, method, req, reply, cc, opts...)
79-
csh.perCallMetrics(ctx, err, startTime, ci)
79+
h.perCallMetrics(ctx, err, startTime, ci)
8080
return err
8181
}
8282

8383
// determineTarget determines the target to record attributes with. This will be
8484
// "other" if target filter is set and specifies, the target name as is
8585
// otherwise.
86-
func (csh *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string {
86+
func (h *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string {
8787
target := cc.CanonicalTarget()
88-
if f := csh.o.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) {
88+
if f := h.o.MetricsOptions.TargetAttributeFilter; f != nil && !f(target) {
8989
target = "other"
9090
}
9191
return target
@@ -94,7 +94,7 @@ func (csh *clientStatsHandler) determineTarget(cc *grpc.ClientConn) string {
9494
// determineMethod determines the method to record attributes with. This will be
9595
// "other" if StaticMethod isn't specified or if method filter is set and
9696
// specifies, the method name as is otherwise.
97-
func (csh *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOption) string {
97+
func (h *clientStatsHandler) determineMethod(method string, opts ...grpc.CallOption) string {
9898
for _, opt := range opts {
9999
if _, ok := opt.(grpc.StaticMethodCallOption); ok {
100100
return removeLeadingSlash(method)
@@ -103,15 +103,15 @@ func (csh *clientStatsHandler) determineMethod(method string, opts ...grpc.CallO
103103
return "other"
104104
}
105105

106-
func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
106+
func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
107107
ci := &callInfo{
108-
target: csh.determineTarget(cc),
109-
method: csh.determineMethod(method, opts...),
108+
target: h.determineTarget(cc),
109+
method: h.determineMethod(method, opts...),
110110
}
111111
ctx = setCallInfo(ctx, ci)
112112

113-
if csh.o.MetricsOptions.pluginOption != nil {
114-
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
113+
if h.o.MetricsOptions.pluginOption != nil {
114+
md := h.o.MetricsOptions.pluginOption.GetMetadata()
115115
for k, vs := range md {
116116
for _, v := range vs {
117117
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
@@ -122,28 +122,28 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
122122
startTime := time.Now()
123123

124124
callback := func(err error) {
125-
csh.perCallMetrics(ctx, err, startTime, ci)
125+
h.perCallMetrics(ctx, err, startTime, ci)
126126
}
127127
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
128128
return streamer(ctx, desc, cc, method, opts...)
129129
}
130130

131-
func (csh *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
131+
func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
132132
s := status.Convert(err)
133133
callLatency := float64(time.Since(startTime)) / float64(time.Second)
134-
csh.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(s.Code()))))
134+
h.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(s.Code()))))
135135
}
136136

137137
// TagConn exists to satisfy stats.Handler.
138-
func (csh *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
138+
func (h *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
139139
return ctx
140140
}
141141

142142
// HandleConn exists to satisfy stats.Handler.
143-
func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
143+
func (h *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
144144

145145
// TagRPC implements per RPC attempt context management.
146-
func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
146+
func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
147147
// Numerous stats handlers can be used for the same channel. The cluster
148148
// impl balancer which writes to this will only write once, thus have this
149149
// stats handler's per attempt scoped context point to the same optional
@@ -166,16 +166,16 @@ func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInf
166166
return setRPCInfo(ctx, ri)
167167
}
168168

169-
func (csh *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
169+
func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
170170
ri := getRPCInfo(ctx)
171171
if ri == nil {
172172
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
173173
return
174174
}
175-
csh.processRPCEvent(ctx, rs, ri.ai)
175+
h.processRPCEvent(ctx, rs, ri.ai)
176176
}
177177

178-
func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
178+
func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
179179
switch st := s.(type) {
180180
case *stats.Begin:
181181
ci := getCallInfo(ctx)
@@ -184,32 +184,32 @@ func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCS
184184
return
185185
}
186186

187-
csh.clientMetrics.attemptStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target)))
187+
h.clientMetrics.attemptStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target)))
188188
case *stats.OutPayload:
189189
atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength))
190190
case *stats.InPayload:
191191
atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength))
192192
case *stats.InHeader:
193-
csh.setLabelsFromPluginOption(ai, st.Header)
193+
h.setLabelsFromPluginOption(ai, st.Header)
194194
case *stats.InTrailer:
195-
csh.setLabelsFromPluginOption(ai, st.Trailer)
195+
h.setLabelsFromPluginOption(ai, st.Trailer)
196196
case *stats.End:
197-
csh.processRPCEnd(ctx, ai, st)
197+
h.processRPCEnd(ctx, ai, st)
198198
default:
199199
}
200200
}
201201

202-
func (csh *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) {
203-
if ai.pluginOptionLabels == nil && csh.o.MetricsOptions.pluginOption != nil {
204-
labels := csh.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
202+
func (h *clientStatsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) {
203+
if ai.pluginOptionLabels == nil && h.o.MetricsOptions.pluginOption != nil {
204+
labels := h.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
205205
if labels == nil {
206206
labels = map[string]string{} // Shouldn't return a nil map. Make it empty if so to ignore future Get Calls for this Attempt.
207207
}
208208
ai.pluginOptionLabels = labels
209209
}
210210
}
211211

212-
func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) {
212+
func (h *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInfo, e *stats.End) {
213213
ci := getCallInfo(ctx)
214214
if ci == nil {
215215
logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present")
@@ -232,16 +232,16 @@ func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, ai *attemptInf
232232
attributes = append(attributes, otelattribute.String(k, v))
233233
}
234234

235-
for _, o := range csh.o.MetricsOptions.OptionalLabels {
235+
for _, o := range h.o.MetricsOptions.OptionalLabels {
236236
if val, ok := ai.xdsLabels[o]; ok {
237237
attributes = append(attributes, otelattribute.String(o, val))
238238
}
239239
}
240240

241241
clientAttributeOption := otelmetric.WithAttributes(attributes...)
242-
csh.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption)
243-
csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), clientAttributeOption)
244-
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), clientAttributeOption)
242+
h.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption)
243+
h.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.sentCompressedBytes), clientAttributeOption)
244+
h.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&ai.recvCompressedBytes), clientAttributeOption)
245245
}
246246

247247
const (

stats/opentelemetry/opentelemetry.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ func DialOption(o Options) grpc.DialOption {
154154
return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh))
155155
}
156156

157+
var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
158+
157159
// ServerOption returns a server option which enables OpenTelemetry
158160
// instrumentation code for a grpc.Server.
159161
//
@@ -169,7 +171,7 @@ func DialOption(o Options) grpc.DialOption {
169171
func ServerOption(o Options) grpc.ServerOption {
170172
ssh := &serverStatsHandler{o: o}
171173
ssh.initializeMetrics()
172-
return grpc.StatsHandler(ssh)
174+
return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh))
173175
}
174176

175177
// callInfo is information pertaining to the lifespan of the RPC client side.

0 commit comments

Comments
 (0)