diff --git a/pkg/epp/handlers/response.go b/pkg/epp/handlers/response.go index 1cbacbae3..b650886ea 100644 --- a/pkg/epp/handlers/response.go +++ b/pkg/epp/handlers/response.go @@ -49,6 +49,14 @@ func (s *StreamingServer) HandleResponseBody(ctx context.Context, reqCtx *Reques CompletionTokens: int(usg["completion_tokens"].(float64)), TotalTokens: int(usg["total_tokens"].(float64)), } + if usg["prompt_token_details"] != nil { + detailsMap := usg["prompt_token_details"].(map[string]any) + if cachedTokens, ok := detailsMap["cached_tokens"]; ok { + usage.PromptTokenDetails = &PromptTokenDetails{ + CachedTokens: int(cachedTokens.(float64)), + } + } + } reqCtx.Usage = usage logger.V(logutil.VERBOSE).Info("Response generated", "usage", reqCtx.Usage) } @@ -78,6 +86,11 @@ func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context, reqCtx.Usage = resp.Usage metrics.RecordInputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.CompletionTokens) + cachedToken := 0 + if resp.Usage.PromptTokenDetails != nil { + cachedToken = resp.Usage.PromptTokenDetails.CachedTokens + } + metrics.RecordPromptCachedTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, cachedToken) _, err := s.director.HandleResponseBodyComplete(ctx, reqCtx) if err != nil { logger.Error(err, "error in HandleResponseBodyComplete") @@ -193,7 +206,12 @@ type ResponseBody struct { } type Usage struct { - PromptTokens int `json:"prompt_tokens"` - CompletionTokens int `json:"completion_tokens"` - TotalTokens int `json:"total_tokens"` + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` + PromptTokenDetails *PromptTokenDetails `json:"prompt_token_details,omitempty"` +} + +type PromptTokenDetails struct { + CachedTokens int `json:"cached_tokens"` } diff --git a/pkg/epp/handlers/response_test.go b/pkg/epp/handlers/response_test.go index 63b2de0da..188a4197e 100644 --- a/pkg/epp/handlers/response_test.go +++ b/pkg/epp/handlers/response_test.go @@ -51,11 +51,40 @@ const ( } } ` + bodyWithCachedTokens = ` + { + "id": "cmpl-573498d260f2423f9e42817bbba3743a", + "object": "text_completion", + "created": 1732563765, + "model": "meta-llama/Llama-3.1-8B-Instruct", + "choices": [ + { + "index": 0, + "text": " Chronicle\nThe San Francisco Chronicle has a new book review section, and it's a good one. The reviews are short, but they're well-written and well-informed. The Chronicle's book review section is a good place to start if you're looking for a good book review.\nThe Chronicle's book review section is a good place to start if you're looking for a good book review. The Chronicle's book review section", + "logprobs": null, + "finish_reason": "length", + "stop_reason": null, + "prompt_logprobs": null + } + ], + "usage": { + "prompt_tokens": 11, + "total_tokens": 111, + "completion_tokens": 100, + "prompt_token_details": { + "cached_tokens": 10 + } + } + } + ` streamingBodyWithoutUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"food-review-0","choices":[],"usage":null} ` streamingBodyWithUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"food-review-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}} +data: [DONE] + ` + streamingBodyWithUsageAndCachedTokens = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"food-review-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10,"prompt_token_details":{"cached_tokens":5}}} data: [DONE] ` ) @@ -100,6 +129,18 @@ func TestHandleResponseBody(t *testing.T) { CompletionTokens: 100, }, }, + { + name: "success with cached tokens", + body: []byte(bodyWithCachedTokens), + want: Usage{ + PromptTokens: 11, + TotalTokens: 111, + CompletionTokens: 100, + PromptTokenDetails: &PromptTokenDetails{ + CachedTokens: 10, + }, + }, + }, } for _, test := range tests { @@ -161,6 +202,22 @@ func TestHandleStreamedResponseBody(t *testing.T) { CompletionTokens: 10, }, }, + { + name: "streaming request with usage and cached tokens", + body: streamingBodyWithUsageAndCachedTokens, + reqCtx: &RequestContext{ + modelServerStreaming: true, + }, + wantErr: false, + want: Usage{ + PromptTokens: 7, + TotalTokens: 17, + CompletionTokens: 10, + PromptTokenDetails: &PromptTokenDetails{ + CachedTokens: 5, + }, + }, + }, } for _, test := range tests { diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 0d5305574..5296d49a4 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -316,6 +316,11 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) metrics.RecordResponseSizes(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.ResponseSize) metrics.RecordInputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.Usage.CompletionTokens) + cachedToken := 0 + if reqCtx.Usage.PromptTokenDetails != nil { + cachedToken = reqCtx.Usage.PromptTokenDetails.CachedTokens + } + metrics.RecordPromptCachedTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, cachedToken) } } } diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go index df50794b1..59c8976cd 100644 --- a/pkg/epp/metrics/metrics.go +++ b/pkg/epp/metrics/metrics.go @@ -125,6 +125,17 @@ var ( []string{"model_name", "target_model_name"}, ) + promptCachedTokens = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: InferenceObjectiveComponent, + Name: "prompt_cached_tokens", + Help: metricsutil.HelpMsgWithStability("Inference objective prompt cached token count distribution for requests in each model.", compbasemetrics.ALPHA), + // Most models have a input context window less than 1 million tokens. + Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536, 131072, 262144, 524288, 1048576}, + }, + []string{"model_name", "target_model_name"}, + ) + runningRequests = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Subsystem: InferenceObjectiveComponent, @@ -278,6 +289,7 @@ func Register(customCollectors ...prometheus.Collector) { metrics.Registry.MustRegister(responseSizes) metrics.Registry.MustRegister(inputTokens) metrics.Registry.MustRegister(outputTokens) + metrics.Registry.MustRegister(promptCachedTokens) metrics.Registry.MustRegister(runningRequests) metrics.Registry.MustRegister(NormalizedTimePerOutputToken) metrics.Registry.MustRegister(inferencePoolAvgKVCache) @@ -306,6 +318,7 @@ func Reset() { responseSizes.Reset() inputTokens.Reset() outputTokens.Reset() + promptCachedTokens.Reset() runningRequests.Reset() NormalizedTimePerOutputToken.Reset() inferencePoolAvgKVCache.Reset() @@ -369,6 +382,11 @@ func RecordOutputTokens(modelName, targetModelName string, size int) { } } +// RecordPromptCachedTokens records prompt cached tokens count. +func RecordPromptCachedTokens(modelName, targetModelName string, size int) { + promptCachedTokens.WithLabelValues(modelName, targetModelName).Observe(float64(size)) +} + // RecordNormalizedTimePerOutputToken (NTPOT) records the normalized time per output token. func RecordNormalizedTimePerOutputToken(ctx context.Context, modelName, targetModelName string, received time.Time, complete time.Time, outputTokenCount int) bool { if !complete.After(received) { diff --git a/pkg/epp/metrics/metrics_test.go b/pkg/epp/metrics/metrics_test.go index 28f941647..7d4168183 100644 --- a/pkg/epp/metrics/metrics_test.go +++ b/pkg/epp/metrics/metrics_test.go @@ -42,6 +42,7 @@ const ( OutputTokensMetric = InferenceObjectiveComponent + "_output_tokens" NormalizedTimePerOutputTokenMetric = InferenceObjectiveComponent + "_normalized_time_per_output_token_seconds" RunningRequestsMetric = InferenceObjectiveComponent + "_running_requests" + PromptCachedTokensMetric = InferenceObjectiveComponent + "_prompt_cached_tokens" KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization" QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size" PerPodQueueSizeMetrics = InferencePoolComponent + "_per_pod_queue_size" @@ -373,6 +374,7 @@ func TestRecordResponseMetrics(t *testing.T) { inputToken int outputToken int respSize int + cachedToken int } scenarios := []struct { name string @@ -386,6 +388,7 @@ func TestRecordResponseMetrics(t *testing.T) { respSize: 1200, inputToken: 10, outputToken: 100, + cachedToken: 5, }, { modelName: "m10", @@ -393,6 +396,7 @@ func TestRecordResponseMetrics(t *testing.T) { respSize: 500, inputToken: 20, outputToken: 200, + cachedToken: 10, }, { modelName: "m10", @@ -400,6 +404,7 @@ func TestRecordResponseMetrics(t *testing.T) { respSize: 2480, inputToken: 30, outputToken: 300, + cachedToken: 15, }, { modelName: "m20", @@ -407,6 +412,7 @@ func TestRecordResponseMetrics(t *testing.T) { respSize: 80, inputToken: 40, outputToken: 400, + cachedToken: 20, }, }, }} @@ -416,6 +422,7 @@ func TestRecordResponseMetrics(t *testing.T) { RecordInputTokens(resp.modelName, resp.targetModelName, resp.inputToken) RecordOutputTokens(resp.modelName, resp.targetModelName, resp.outputToken) RecordResponseSizes(resp.modelName, resp.targetModelName, resp.respSize) + RecordPromptCachedTokens(resp.modelName, resp.targetModelName, resp.cachedToken) } wantResponseSize, err := os.Open("testdata/response_sizes_metric") defer func() {