Skip to content

Commit b432abf

Browse files
Add integration tests for response plugin execution
Follow-up to kubernetes-sigs#2354 / PR kubernetes-sigs#2369. Exercises the full Process loop (request body -> response headers -> response body) to validate that response plugins can mutate the response body over a real gRPC ext_proc stream. Covers: - Unary mode: response plugin mutates the response body - Unary mode: no response plugins results in passthrough behavior Signed-off-by: Abdallah Samara <[email protected]> Made-with: Cursor
1 parent eca7172 commit b432abf

4 files changed

Lines changed: 245 additions & 7 deletions

File tree

test/integration/bbr/body_mutation_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestBodyMutation_Unary(t *testing.T) {
6060

6161
plugin := &bodyMutatingPlugin{fieldName: "injected", fieldValue: "test-value"}
6262
baseModelToHeaderPlugin := &basemodelextractor.BaseModelToHeaderPlugin{AdaptersStore: basemodelextractor.NewAdaptersStore()}
63-
h := NewBBRHarnessWithPlugins(t, ctx, false, []framework.RequestProcessor{plugin, baseModelToHeaderPlugin})
63+
h := NewBBRHarnessWithPlugins(t, ctx, false, []framework.RequestProcessor{plugin, baseModelToHeaderPlugin}, []framework.ResponseProcessor{})
6464

6565
body := map[string]any{"prompt": "hello"}
6666
bodyBytes, _ := json.Marshal(body)
@@ -127,7 +127,7 @@ func TestBodyMutation_Streaming(t *testing.T) {
127127

128128
plugin := &bodyMutatingPlugin{fieldName: "injected", fieldValue: "test-value"}
129129
baseModelToHeaderPlugin := &basemodelextractor.BaseModelToHeaderPlugin{AdaptersStore: basemodelextractor.NewAdaptersStore()}
130-
h := NewBBRHarnessWithPlugins(t, ctx, true, []framework.RequestProcessor{plugin, baseModelToHeaderPlugin})
130+
h := NewBBRHarnessWithPlugins(t, ctx, true, []framework.RequestProcessor{plugin, baseModelToHeaderPlugin}, []framework.ResponseProcessor{})
131131

132132
body := map[string]any{"prompt": "hello"}
133133
bodyBytes, _ := json.Marshal(body)

test/integration/bbr/harness.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type BBRHarness struct {
5252
}
5353

5454
// NewBBRHarness boots up an isolated BBR server on a random port with the default
55-
// BodyFieldToHeaderPlugin for model extraction.
55+
// BodyFieldToHeaderPlugin for model extraction and no response plugins.
5656
func NewBBRHarness(t *testing.T, ctx context.Context, streaming bool) *BBRHarness {
5757
t.Helper()
5858
modelToHeaderPlugin, err := bodyfieldtoheader.NewBodyFieldToHeaderPlugin(modelField, bodyfieldtoheader.ModelHeader)
@@ -94,11 +94,18 @@ func NewBBRHarness(t *testing.T, ctx context.Context, streaming bool) *BBRHarnes
9494

9595
baseModelToHeaderPlugin := &basemodelextractor.BaseModelToHeaderPlugin{AdaptersStore: store}
9696

97-
return NewBBRHarnessWithPlugins(t, ctx, streaming, []framework.RequestProcessor{modelToHeaderPlugin, baseModelToHeaderPlugin})
97+
return NewBBRHarnessWithPlugins(t, ctx, streaming, []framework.RequestProcessor{modelToHeaderPlugin, baseModelToHeaderPlugin}, []framework.ResponseProcessor{})
9898
}
9999

100-
// NewBBRHarnessWithPlugins boots up an isolated BBR server with custom request plugins.
101-
func NewBBRHarnessWithPlugins(t *testing.T, ctx context.Context, streaming bool, requestPlugins []framework.RequestProcessor) *BBRHarness {
100+
// NewBBRHarnessWithPlugins boots up an isolated BBR server on a random port
101+
// with the given request and response plugins.
102+
func NewBBRHarnessWithPlugins(
103+
t *testing.T,
104+
ctx context.Context,
105+
streaming bool,
106+
requestPlugins []framework.RequestProcessor,
107+
responsePlugins []framework.ResponseProcessor,
108+
) *BBRHarness {
102109
t.Helper()
103110

104111
// 1. Allocate Free Port
@@ -110,6 +117,7 @@ func NewBBRHarnessWithPlugins(t *testing.T, ctx context.Context, streaming bool,
110117
runner.SecureServing = false
111118
runner.Streaming = streaming
112119
runner.RequestPlugins = requestPlugins
120+
runner.ResponsePlugins = responsePlugins
113121

114122
// 3. Start Server in Background
115123
serverCtx, serverCancel := context.WithCancel(ctx)

test/integration/bbr/hermetic_test.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,22 @@ package bbr
1919

2020
import (
2121
"context"
22+
"encoding/json"
2223
"strings"
2324
"testing"
2425

26+
envoyCorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2527
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2628
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
2729
"github.com/google/go-cmp/cmp"
2830
"github.com/stretchr/testify/require"
2931
"google.golang.org/protobuf/testing/protocmp"
3032

33+
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/framework"
34+
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins/basemodelextractor"
35+
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins/bodyfieldtoheader"
3136
envoytest "sigs.k8s.io/gateway-api-inference-extension/pkg/common/envoy/test"
37+
epp "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
3238
"sigs.k8s.io/gateway-api-inference-extension/test/integration"
3339
)
3440

@@ -176,3 +182,175 @@ func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) {
176182
})
177183
}
178184
}
185+
186+
// testResponsePlugin implements framework.ResponseProcessor for integration tests.
187+
type testResponsePlugin struct {
188+
name string
189+
mutateFn func(ctx context.Context, response *framework.InferenceResponse) error
190+
}
191+
192+
func (p *testResponsePlugin) TypedName() epp.TypedName {
193+
return epp.TypedName{Type: "test", Name: p.name}
194+
}
195+
196+
func (p *testResponsePlugin) ProcessResponse(ctx context.Context, _ *framework.CycleState, response *framework.InferenceResponse) error {
197+
return p.mutateFn(ctx, response)
198+
}
199+
200+
var _ framework.ResponseProcessor = &testResponsePlugin{}
201+
202+
// TestResponsePlugins_Unary validates that response plugins can mutate the
203+
// response body in unary (non-streaming) mode over a real gRPC ext_proc stream.
204+
func TestResponsePlugins_Unary(t *testing.T) {
205+
t.Parallel()
206+
207+
guardrailPlugin := &testResponsePlugin{
208+
name: "guardrail",
209+
mutateFn: func(_ context.Context, response *framework.InferenceResponse) error {
210+
response.SetBodyField("guardrail", "applied")
211+
return nil
212+
},
213+
}
214+
215+
ctx := context.Background()
216+
217+
modelToHeaderPlugin, err := bodyfieldtoheader.NewBodyFieldToHeaderPlugin(modelField, bodyfieldtoheader.ModelHeader)
218+
require.NoError(t, err, "failed to create body-field-to-header plugin")
219+
baseModelPlugin := &basemodelextractor.BaseModelToHeaderPlugin{AdaptersStore: basemodelextractor.NewAdaptersStore()}
220+
221+
h := NewBBRHarnessWithPlugins(t, ctx, false, []framework.RequestProcessor{modelToHeaderPlugin, baseModelPlugin}, []framework.ResponseProcessor{guardrailPlugin})
222+
223+
// Phase 1: Send request body (unary — no separate headers message).
224+
requestBody := map[string]any{
225+
"prompt": "hello",
226+
"max_tokens": 100,
227+
"temperature": 0,
228+
"model": "test-model",
229+
}
230+
reqBodyBytes, err := json.Marshal(requestBody)
231+
require.NoError(t, err)
232+
233+
// Phase 2: Build the full message sequence: RequestBody → ResponseHeaders → ResponseBody.
234+
responseBody := map[string]any{
235+
"choices": []any{
236+
map[string]any{"text": "Hello!"},
237+
},
238+
}
239+
respBodyBytes, err := json.Marshal(responseBody)
240+
require.NoError(t, err)
241+
242+
reqs := []*extProcPb.ProcessingRequest{
243+
{
244+
Request: &extProcPb.ProcessingRequest_RequestBody{
245+
RequestBody: &extProcPb.HttpBody{Body: reqBodyBytes, EndOfStream: true},
246+
},
247+
},
248+
{
249+
Request: &extProcPb.ProcessingRequest_ResponseHeaders{
250+
ResponseHeaders: &extProcPb.HttpHeaders{
251+
Headers: &envoyCorev3.HeaderMap{
252+
Headers: []*envoyCorev3.HeaderValue{
253+
{Key: "content-type", Value: "application/json"},
254+
},
255+
},
256+
},
257+
},
258+
},
259+
{
260+
Request: &extProcPb.ProcessingRequest_ResponseBody{
261+
ResponseBody: &extProcPb.HttpBody{Body: respBodyBytes, EndOfStream: true},
262+
},
263+
},
264+
}
265+
266+
// Expect 3 responses: request body, response headers, response body.
267+
responses, err := integration.StreamedRequest(t, h.Client, reqs, 3)
268+
require.NoError(t, err, "unexpected error during streamed request")
269+
require.Len(t, responses, 3)
270+
271+
// The response body should contain the guardrail field injected by the plugin.
272+
expectedRespBody := map[string]any{
273+
"choices": []any{
274+
map[string]any{"text": "Hello!"},
275+
},
276+
"guardrail": "applied",
277+
}
278+
279+
wantResponses := []*extProcPb.ProcessingResponse{
280+
ExpectBBRUnaryResponse("test-model", "", "hello"),
281+
ExpectResponseHeadersPassThrough(),
282+
ExpectResponseBodyMutation(expectedRespBody),
283+
}
284+
285+
envoytest.SortSetHeadersInResponses(wantResponses)
286+
envoytest.SortSetHeadersInResponses(responses)
287+
if diff := cmp.Diff(wantResponses, responses, protocmp.Transform()); diff != "" {
288+
t.Errorf("Response mismatch (-want +got): %v", diff)
289+
}
290+
}
291+
292+
// TestResponsePlugins_NoPlugins_Unary validates that when no response plugins
293+
// are configured, the response body is passed through without mutation.
294+
func TestResponsePlugins_NoPlugins_Unary(t *testing.T) {
295+
t.Parallel()
296+
297+
ctx := context.Background()
298+
h := NewBBRHarness(t, ctx, false)
299+
300+
requestBody := map[string]any{
301+
"prompt": "hello",
302+
"max_tokens": 100,
303+
"temperature": 0,
304+
"model": "test-model",
305+
}
306+
reqBodyBytes, err := json.Marshal(requestBody)
307+
require.NoError(t, err)
308+
309+
responseBody := map[string]any{
310+
"choices": []any{
311+
map[string]any{"text": "Hi there!"},
312+
},
313+
}
314+
respBodyBytes, err := json.Marshal(responseBody)
315+
require.NoError(t, err)
316+
317+
reqs := []*extProcPb.ProcessingRequest{
318+
{
319+
Request: &extProcPb.ProcessingRequest_RequestBody{
320+
RequestBody: &extProcPb.HttpBody{Body: reqBodyBytes, EndOfStream: true},
321+
},
322+
},
323+
{
324+
Request: &extProcPb.ProcessingRequest_ResponseHeaders{
325+
ResponseHeaders: &extProcPb.HttpHeaders{
326+
Headers: &envoyCorev3.HeaderMap{
327+
Headers: []*envoyCorev3.HeaderValue{
328+
{Key: "content-type", Value: "application/json"},
329+
},
330+
},
331+
},
332+
},
333+
},
334+
{
335+
Request: &extProcPb.ProcessingRequest_ResponseBody{
336+
ResponseBody: &extProcPb.HttpBody{Body: respBodyBytes, EndOfStream: true},
337+
},
338+
},
339+
}
340+
341+
responses, err := integration.StreamedRequest(t, h.Client, reqs, 3)
342+
require.NoError(t, err, "unexpected error during streamed request")
343+
require.Len(t, responses, 3)
344+
345+
wantResponses := []*extProcPb.ProcessingResponse{
346+
ExpectBBRUnaryResponse("test-model", "", "hello"),
347+
ExpectResponseHeadersPassThrough(),
348+
ExpectResponseBodyPassThrough(),
349+
}
350+
351+
envoytest.SortSetHeadersInResponses(wantResponses)
352+
envoytest.SortSetHeadersInResponses(responses)
353+
if diff := cmp.Diff(wantResponses, responses, protocmp.Transform()); diff != "" {
354+
t.Errorf("Response mismatch (-want +got): %v", diff)
355+
}
356+
}

test/integration/bbr/util.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package bbr
1818

1919
import (
2020
"encoding/json"
21+
"strconv"
2122

2223
envoyCorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2324
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -99,7 +100,58 @@ func ExpectBBRNoOpHeader() *extProcPb.ProcessingResponse {
99100
}
100101
}
101102

102-
// --- Response Expectations (Unary) ---
103+
// --- Response Phase Expectations ---
104+
105+
// ExpectResponseHeadersPassThrough asserts that BBR passed response headers through with no mutations.
106+
func ExpectResponseHeadersPassThrough() *extProcPb.ProcessingResponse {
107+
return &extProcPb.ProcessingResponse{
108+
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
109+
ResponseHeaders: &extProcPb.HeadersResponse{},
110+
},
111+
}
112+
}
113+
114+
// ExpectResponseBodyPassThrough asserts that BBR passed the response body through with no mutations
115+
// (i.e., no response plugins configured).
116+
func ExpectResponseBodyPassThrough() *extProcPb.ProcessingResponse {
117+
return &extProcPb.ProcessingResponse{
118+
Response: &extProcPb.ProcessingResponse_ResponseBody{
119+
ResponseBody: &extProcPb.BodyResponse{},
120+
},
121+
}
122+
}
123+
124+
// ExpectResponseBodyMutation asserts that a response plugin mutated the response body (unary mode).
125+
// Includes the Content-Length header mutation.
126+
func ExpectResponseBodyMutation(body map[string]any) *extProcPb.ProcessingResponse {
127+
b, _ := json.Marshal(body)
128+
return &extProcPb.ProcessingResponse{
129+
Response: &extProcPb.ProcessingResponse_ResponseBody{
130+
ResponseBody: &extProcPb.BodyResponse{
131+
Response: &extProcPb.CommonResponse{
132+
ClearRouteCache: true,
133+
HeaderMutation: &extProcPb.HeaderMutation{
134+
SetHeaders: []*envoyCorev3.HeaderValueOption{
135+
{
136+
Header: &envoyCorev3.HeaderValue{
137+
Key: "Content-Length",
138+
RawValue: []byte(strconv.Itoa(len(b))),
139+
},
140+
},
141+
},
142+
},
143+
BodyMutation: &extProcPb.BodyMutation{
144+
Mutation: &extProcPb.BodyMutation_Body{
145+
Body: b,
146+
},
147+
},
148+
},
149+
},
150+
},
151+
}
152+
}
153+
154+
// --- Request Phase Expectations (Unary) ---
103155

104156
// ExpectBBRUnaryResponse creates expected response for unary tests where the body is mutated directly.
105157
// baseModelName is the expected base model name (e.g., "llama" for both "llama" and "sql-lora-sheddable")

0 commit comments

Comments
 (0)