Skip to content

Commit fbf85be

Browse files
authored
fix: ensure ResponseComplete hook always executes (kubernetes-sigs/gateway-api-inference-extension#2064)
This guarantees request/response symmetry to prevent capacity leaks in stateful plugins (e.g., Concurrency Detector). Previously, errors during JSON marshaling, client disconnects, or split streaming chunks could cause the `ResponseComplete` hook to be skipped. Changes: - Add `defer` safety block to trigger completion on errors/disconnects. - Move streaming completion trigger to the authoritative `EndOfStream` signal rather than relying on body content parsing.
1 parent baa1c5a commit fbf85be

3 files changed

Lines changed: 21 additions & 5 deletions

File tree

pkg/epp/handlers/response.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,6 @@ func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context,
9797
cachedToken = reqCtx.Usage.PromptTokenDetails.CachedTokens
9898
}
9999
metrics.RecordPromptCachedTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, cachedToken)
100-
_, err := s.director.HandleResponseBodyComplete(ctx, reqCtx)
101-
if err != nil {
102-
logger.Error(err, "error in HandleResponseBodyComplete")
103-
}
104100
}
105101
}
106102

pkg/epp/handlers/server.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,17 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
160160
if reqCtx.RequestRunning {
161161
metrics.DecRunningRequests(reqCtx.IncomingModelName)
162162
}
163+
164+
// If we scheduled a pod (TargetPod != nil) but never marked the response as complete (e.g. error, disconnect,
165+
// panic), force the completion hooks to run.
166+
if reqCtx.TargetPod != nil && !reqCtx.ResponseComplete {
167+
// Use a fresh context as the request context might be canceled (Client Disconnect).
168+
// We only need logging from the original context.
169+
cleanupCtx := log.IntoContext(context.Background(), logger)
170+
if _, err := s.director.HandleResponseBodyComplete(cleanupCtx, reqCtx); err != nil {
171+
logger.Error(err, "error in HandleResponseBodyComplete")
172+
}
173+
}
163174
}(err, reqCtx)
164175

165176
for {
@@ -270,6 +281,10 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
270281
s.HandleResponseBodyModelStreaming(ctx, reqCtx, responseText)
271282
if v.ResponseBody.EndOfStream {
272283
loggerTrace.Info("stream completed")
284+
reqCtx.ResponseComplete = true
285+
if _, err := s.director.HandleResponseBodyComplete(ctx, reqCtx); err != nil {
286+
logger.Error(err, "error in HandleResponseBodyComplete")
287+
}
273288

274289
reqCtx.ResponseCompleteTimestamp = time.Now()
275290
metrics.RecordRequestLatencies(ctx, reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)

pkg/epp/requestcontrol/plugins.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,12 @@ type ResponseStreaming interface {
5252
ResponseStreaming(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod)
5353
}
5454

55-
// ResponseComplete is called by the director after the complete response is sent.
55+
// ResponseComplete is called by the director when the request lifecycle terminates.
56+
// This occurs after a response is fully sent, OR if the request fails/disconnects after a pod was scheduled.
57+
//
58+
// Plugins should assume this is the final cleanup hook for a request.
59+
//
60+
// TODO: Consider passing an error or success bool; however, this is a breaking change and is deffered for now.
5661
type ResponseComplete interface {
5762
plugins.Plugin
5863
ResponseComplete(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod)

0 commit comments

Comments
 (0)