Skip to content

Commit 1fecaf9

Browse files
authored
Allow request control plugins to return ext_proc dynamic metadata (#2156)
1 parent 684adde commit 1fecaf9

8 files changed

Lines changed: 119 additions & 28 deletions

File tree

pkg/epp/handlers/request.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package handlers
1818

1919
import (
2020
"context"
21+
"maps"
2122
"strconv"
2223
"time"
2324

@@ -98,6 +99,14 @@ func (s *StreamingServer) generateRequestHeaderResponse(ctx context.Context, req
9899
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
99100
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
100101
// options for gateway providers.
102+
dynamicMetadata := s.generateMetadata(reqCtx.TargetEndpoint)
103+
if reqCtx.Response.DynamicMetadata != nil {
104+
if dynamicMetadata.Fields == nil {
105+
dynamicMetadata.Fields = make(map[string]*structpb.Value)
106+
}
107+
maps.Copy(dynamicMetadata.Fields, reqCtx.Response.DynamicMetadata.Fields)
108+
}
109+
101110
return &extProcPb.ProcessingResponse{
102111
Response: &extProcPb.ProcessingResponse_RequestHeaders{
103112
RequestHeaders: &extProcPb.HeadersResponse{
@@ -109,7 +118,7 @@ func (s *StreamingServer) generateRequestHeaderResponse(ctx context.Context, req
109118
},
110119
},
111120
},
112-
DynamicMetadata: s.generateMetadata(reqCtx.TargetEndpoint),
121+
DynamicMetadata: dynamicMetadata,
113122
}
114123
}
115124

pkg/epp/handlers/request_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2424
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2525
"github.com/stretchr/testify/assert"
26+
"google.golang.org/protobuf/types/known/structpb"
2627
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
2728
)
2829

@@ -106,3 +107,46 @@ func TestGenerateHeaders_Sanitization(t *testing.T) {
106107
assert.Equal(t, "1.2.3.4:8080", gotHeaders[metadata.DestinationEndpointKey])
107108
assert.Equal(t, "123", gotHeaders["Content-Length"])
108109
}
110+
111+
func TestGenerateRequestHeaderResponse_MergeMetadata(t *testing.T) {
112+
t.Parallel()
113+
114+
server := &StreamingServer{}
115+
reqCtx := &RequestContext{
116+
TargetEndpoint: "1.2.3.4:8080",
117+
Request: &Request{
118+
Headers: make(map[string]string),
119+
},
120+
Response: &Response{
121+
DynamicMetadata: &structpb.Struct{
122+
Fields: map[string]*structpb.Value{
123+
"existing_namespace": {
124+
Kind: &structpb.Value_StructValue{
125+
StructValue: &structpb.Struct{
126+
Fields: map[string]*structpb.Value{
127+
"existing_key": {Kind: &structpb.Value_StringValue{StringValue: "existing_value"}},
128+
},
129+
},
130+
},
131+
},
132+
},
133+
},
134+
},
135+
}
136+
137+
resp := server.generateRequestHeaderResponse(context.Background(), reqCtx)
138+
139+
// Check that the existing metadata is preserved
140+
existingNamespace, ok := resp.DynamicMetadata.Fields["existing_namespace"]
141+
assert.True(t, ok, "Expected existing_namespace to be in DynamicMetadata")
142+
existingKey, ok := existingNamespace.GetStructValue().Fields["existing_key"]
143+
assert.True(t, ok, "Expected existing_key to be in existing_namespace")
144+
assert.Equal(t, "existing_value", existingKey.GetStringValue(), "Unexpected value for existing_key")
145+
146+
// Check that the new metadata is added
147+
endpointNamespace, ok := resp.DynamicMetadata.Fields[metadata.DestinationEndpointNamespace]
148+
assert.True(t, ok, "Expected DestinationEndpointNamespace to be in DynamicMetadata")
149+
endpointKey, ok := endpointNamespace.GetStructValue().Fields[metadata.DestinationEndpointKey]
150+
assert.True(t, ok, "Expected DestinationEndpointKey to be in DestinationEndpointNamespace")
151+
assert.Equal(t, "1.2.3.4:8080", endpointKey.GetStringValue(), "Unexpected value for DestinationEndpointKey")
152+
}

pkg/epp/handlers/response.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
3030
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging"
31+
handlerstypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers/types"
3132
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3233
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
3334
)
@@ -46,15 +47,15 @@ func (s *StreamingServer) HandleResponseBody(ctx context.Context, reqCtx *Reques
4647
}
4748
if response["usage"] != nil {
4849
usg := response["usage"].(map[string]any)
49-
usage := Usage{
50+
usage := handlerstypes.Usage{
5051
PromptTokens: int(usg["prompt_tokens"].(float64)),
5152
CompletionTokens: int(usg["completion_tokens"].(float64)),
5253
TotalTokens: int(usg["total_tokens"].(float64)),
5354
}
5455
if usg["prompt_token_details"] != nil {
5556
detailsMap := usg["prompt_token_details"].(map[string]any)
5657
if cachedTokens, ok := detailsMap["cached_tokens"]; ok {
57-
usage.PromptTokenDetails = &PromptTokenDetails{
58+
usage.PromptTokenDetails = &handlerstypes.PromptTokenDetails{
5859
CachedTokens: int(cachedTokens.(float64)),
5960
}
6061
}
@@ -203,14 +204,7 @@ func parseRespForUsage(ctx context.Context, responseText string) ResponseBody {
203204
}
204205

205206
type ResponseBody struct {
206-
Usage Usage `json:"usage"`
207-
}
208-
209-
type Usage struct {
210-
PromptTokens int `json:"prompt_tokens"`
211-
CompletionTokens int `json:"completion_tokens"`
212-
TotalTokens int `json:"total_tokens"`
213-
PromptTokenDetails *PromptTokenDetails `json:"prompt_token_details,omitempty"`
207+
Usage handlerstypes.Usage `json:"usage"`
214208
}
215209

216210
type PromptTokenDetails struct {

pkg/epp/handlers/response_test.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging"
2828
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
29+
handlerstypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers/types"
2930
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
3031
)
3132

@@ -119,13 +120,13 @@ func TestHandleResponseBody(t *testing.T) {
119120
name string
120121
body []byte
121122
reqCtx *RequestContext
122-
want Usage
123+
want handlerstypes.Usage
123124
wantErr bool
124125
}{
125126
{
126127
name: "success",
127128
body: []byte(body),
128-
want: Usage{
129+
want: handlerstypes.Usage{
129130
PromptTokens: 11,
130131
TotalTokens: 111,
131132
CompletionTokens: 100,
@@ -134,11 +135,11 @@ func TestHandleResponseBody(t *testing.T) {
134135
{
135136
name: "success with cached tokens",
136137
body: []byte(bodyWithCachedTokens),
137-
want: Usage{
138+
want: handlerstypes.Usage{
138139
PromptTokens: 11,
139140
TotalTokens: 111,
140141
CompletionTokens: 100,
141-
PromptTokenDetails: &PromptTokenDetails{
142+
PromptTokenDetails: &handlerstypes.PromptTokenDetails{
142143
CachedTokens: 10,
143144
},
144145
},
@@ -179,7 +180,7 @@ func TestHandleStreamedResponseBody(t *testing.T) {
179180
name string
180181
body string
181182
reqCtx *RequestContext
182-
want Usage
183+
want handlerstypes.Usage
183184
wantErr bool
184185
}{
185186
{
@@ -198,7 +199,7 @@ func TestHandleStreamedResponseBody(t *testing.T) {
198199
modelServerStreaming: true,
199200
},
200201
wantErr: false,
201-
want: Usage{
202+
want: handlerstypes.Usage{
202203
PromptTokens: 7,
203204
TotalTokens: 17,
204205
CompletionTokens: 10,
@@ -211,11 +212,11 @@ func TestHandleStreamedResponseBody(t *testing.T) {
211212
modelServerStreaming: true,
212213
},
213214
wantErr: false,
214-
want: Usage{
215+
want: handlerstypes.Usage{
215216
PromptTokens: 7,
216217
TotalTokens: 17,
217218
CompletionTokens: 10,
218-
PromptTokenDetails: &PromptTokenDetails{
219+
PromptTokenDetails: &handlerstypes.PromptTokenDetails{
219220
CachedTokens: 5,
220221
},
221222
},
@@ -245,14 +246,14 @@ func TestHandleResponseBodyModelStreaming_TokenAccumulation(t *testing.T) {
245246
tests := []struct {
246247
name string
247248
chunks []string
248-
wantUsage Usage
249+
wantUsage handlerstypes.Usage
249250
}{
250251
{
251252
name: "Standard: Usage and DONE in same chunk",
252253
chunks: []string{
253254
`data: {"usage":{"prompt_tokens":5,"completion_tokens":10,"total_tokens":15}}` + "\n" + `data: [DONE]`,
254255
},
255-
wantUsage: Usage{PromptTokens: 5, CompletionTokens: 10, TotalTokens: 15},
256+
wantUsage: handlerstypes.Usage{PromptTokens: 5, CompletionTokens: 10, TotalTokens: 15},
256257
},
257258
{
258259
name: "Split: Usage in Chunk 1, DONE in Chunk 2",
@@ -262,7 +263,7 @@ func TestHandleResponseBodyModelStreaming_TokenAccumulation(t *testing.T) {
262263
// Chunk 2: Stream termination. Should NOT overwrite the usage from Chunk 1.
263264
`data: [DONE]`,
264265
},
265-
wantUsage: Usage{PromptTokens: 5, CompletionTokens: 10, TotalTokens: 15},
266+
wantUsage: handlerstypes.Usage{PromptTokens: 5, CompletionTokens: 10, TotalTokens: 15},
266267
},
267268
{
268269
name: "Fragmented: Content -> Usage -> DONE",
@@ -271,15 +272,15 @@ func TestHandleResponseBodyModelStreaming_TokenAccumulation(t *testing.T) {
271272
`data: {"usage":{"prompt_tokens":5,"completion_tokens":10,"total_tokens":15}}` + "\n",
272273
`data: [DONE]`,
273274
},
274-
wantUsage: Usage{PromptTokens: 5, CompletionTokens: 10, TotalTokens: 15},
275+
wantUsage: handlerstypes.Usage{PromptTokens: 5, CompletionTokens: 10, TotalTokens: 15},
275276
},
276277
{
277278
name: "No Usage Data",
278279
chunks: []string{
279280
`data: {"choices":[{"text":"Hello"}]}` + "\n",
280281
`data: [DONE]`,
281282
},
282-
wantUsage: Usage{}, // Zero values
283+
wantUsage: handlerstypes.Usage{}, // Zero values
283284
},
284285
}
285286

pkg/epp/handlers/server.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ import (
3131
"go.opentelemetry.io/otel/trace"
3232
"google.golang.org/grpc/codes"
3333
"google.golang.org/grpc/status"
34+
"google.golang.org/protobuf/types/known/structpb"
3435

3536
"sigs.k8s.io/controller-runtime/pkg/log"
3637
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging"
3738
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
39+
handlerstypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers/types"
3840
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3941
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
4042
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
@@ -82,7 +84,7 @@ type RequestContext struct {
8284
RequestReceivedTimestamp time.Time
8385
ResponseCompleteTimestamp time.Time
8486
RequestSize int
85-
Usage Usage
87+
Usage handlerstypes.Usage
8688
ResponseSize int
8789
ResponseComplete bool
8890
ResponseStatusCode string
@@ -111,7 +113,8 @@ type Request struct {
111113
Metadata map[string]any
112114
}
113115
type Response struct {
114-
Headers map[string]string
116+
Headers map[string]string
117+
DynamicMetadata *structpb.Struct
115118
}
116119
type StreamRequestState int
117120

pkg/epp/handlers/types/types.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package types
18+
19+
type Usage struct {
20+
PromptTokens int `json:"prompt_tokens"`
21+
CompletionTokens int `json:"completion_tokens"`
22+
TotalTokens int `json:"total_tokens"`
23+
PromptTokenDetails *PromptTokenDetails `json:"prompt_token_details,omitempty"`
24+
}
25+
26+
type PromptTokenDetails struct {
27+
CachedTokens int `json:"cached_tokens"`
28+
}

pkg/epp/requestcontrol/director.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,10 @@ func (d *Director) HandleResponseBodyComplete(ctx context.Context, reqCtx *handl
314314
logger := log.FromContext(ctx).WithValues("stage", "bodyChunk")
315315
logger.V(logutil.DEBUG).Info("Entering HandleResponseBodyComplete")
316316
response := &Response{
317-
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
318-
Headers: reqCtx.Response.Headers,
317+
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
318+
Headers: reqCtx.Response.Headers,
319+
DynamicMetadata: reqCtx.Response.DynamicMetadata,
320+
Usage: reqCtx.Usage,
319321
}
320322

321323
d.runResponseCompletePlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)

pkg/epp/requestcontrol/types.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ limitations under the License.
1616

1717
package requestcontrol
1818

19+
import (
20+
"google.golang.org/protobuf/types/known/structpb"
21+
handlerstypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers/types"
22+
)
23+
1924
// Response contains information from the response received to be passed to the Response requestcontrol plugins
2025
type Response struct {
2126
// RequestId is the Envoy generated Id for the request being processed
@@ -32,4 +37,9 @@ type Response struct {
3237
// It is populated with Envoy's dynamic metadata when ext_proc is processing ProcessingRequest_ResponseHeaders.
3338
// Currently, this is only used by conformance test.
3439
ReqMetadata map[string]any
40+
// Token usage counts parsed from the response body.
41+
Usage handlerstypes.Usage
42+
// DynamicMetadata is a map of metadata that can be passed to the Envoy. It is populated into the dynamic
43+
// metadata when processing ProcessingResponse_RequestHeaders.
44+
DynamicMetadata *structpb.Struct
3545
}

0 commit comments

Comments
 (0)