Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions pkg/epp/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func (s *StreamingServer) HandleResponseBody(ctx context.Context, reqCtx *Reques
CompletionTokens: int(usg["completion_tokens"].(float64)),
TotalTokens: int(usg["total_tokens"].(float64)),
}
if usg["prompt_token_details"] != nil {
detailsMap := usg["prompt_token_details"].(map[string]any)
if cachedTokens, ok := detailsMap["cached_tokens"]; ok {
usage.PromptTokenDetails = &PromptTokenDetails{
CachedTokens: int(cachedTokens.(float64)),
}
}
}
reqCtx.Usage = usage
logger.V(logutil.VERBOSE).Info("Response generated", "usage", reqCtx.Usage)
}
Expand Down Expand Up @@ -78,6 +86,11 @@ func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context,
reqCtx.Usage = resp.Usage
metrics.RecordInputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.CompletionTokens)
cachedToken := 0
if resp.Usage.PromptTokenDetails != nil {
cachedToken = resp.Usage.PromptTokenDetails.CachedTokens
}
metrics.RecordPromptCachedTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, cachedToken)
_, err := s.director.HandleResponseBodyComplete(ctx, reqCtx)
if err != nil {
logger.Error(err, "error in HandleResponseBodyComplete")
Expand Down Expand Up @@ -193,7 +206,12 @@ type ResponseBody struct {
}

type Usage struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
PromptTokenDetails *PromptTokenDetails `json:"prompt_token_details,omitempty"`
}

type PromptTokenDetails struct {
CachedTokens int `json:"cached_tokens"`
}
57 changes: 57 additions & 0 deletions pkg/epp/handlers/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,40 @@ const (
}
}
`
bodyWithCachedTokens = `
{
"id": "cmpl-573498d260f2423f9e42817bbba3743a",
"object": "text_completion",
"created": 1732563765,
"model": "meta-llama/Llama-3.1-8B-Instruct",
"choices": [
{
"index": 0,
"text": " Chronicle\nThe San Francisco Chronicle has a new book review section, and it's a good one. The reviews are short, but they're well-written and well-informed. The Chronicle's book review section is a good place to start if you're looking for a good book review.\nThe Chronicle's book review section is a good place to start if you're looking for a good book review. The Chronicle's book review section",
"logprobs": null,
"finish_reason": "length",
"stop_reason": null,
"prompt_logprobs": null
}
],
"usage": {
"prompt_tokens": 11,
"total_tokens": 111,
"completion_tokens": 100,
"prompt_token_details": {
"cached_tokens": 10
}
}
}
`

streamingBodyWithoutUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"food-review-0","choices":[],"usage":null}
`

streamingBodyWithUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"food-review-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
data: [DONE]
`
streamingBodyWithUsageAndCachedTokens = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"food-review-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10,"prompt_token_details":{"cached_tokens":5}}}
data: [DONE]
`
)
Expand Down Expand Up @@ -100,6 +129,18 @@ func TestHandleResponseBody(t *testing.T) {
CompletionTokens: 100,
},
},
{
name: "success with cached tokens",
body: []byte(bodyWithCachedTokens),
want: Usage{
PromptTokens: 11,
TotalTokens: 111,
CompletionTokens: 100,
PromptTokenDetails: &PromptTokenDetails{
CachedTokens: 10,
},
},
},
}

for _, test := range tests {
Expand Down Expand Up @@ -161,6 +202,22 @@ func TestHandleStreamedResponseBody(t *testing.T) {
CompletionTokens: 10,
},
},
{
name: "streaming request with usage and cached tokens",
body: streamingBodyWithUsageAndCachedTokens,
reqCtx: &RequestContext{
modelServerStreaming: true,
},
wantErr: false,
want: Usage{
PromptTokens: 7,
TotalTokens: 17,
CompletionTokens: 10,
PromptTokenDetails: &PromptTokenDetails{
CachedTokens: 5,
},
},
},
}

for _, test := range tests {
Expand Down
5 changes: 5 additions & 0 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
metrics.RecordResponseSizes(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.ResponseSize)
metrics.RecordInputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.Usage.CompletionTokens)
cachedToken := 0
if reqCtx.Usage.PromptTokenDetails != nil {
cachedToken = reqCtx.Usage.PromptTokenDetails.CachedTokens
}
metrics.RecordPromptCachedTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, cachedToken)
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ var (
[]string{"model_name", "target_model_name"},
)

promptCachedTokens = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: InferenceObjectiveComponent,
Name: "prompt_cached_tokens",
Help: metricsutil.HelpMsgWithStability("Inference objective prompt cached token count distribution for requests in each model.", compbasemetrics.ALPHA),
// Most models have a input context window less than 1 million tokens.
Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536, 131072, 262144, 524288, 1048576},
},
[]string{"model_name", "target_model_name"},
)

runningRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: InferenceObjectiveComponent,
Expand Down Expand Up @@ -278,6 +289,7 @@ func Register(customCollectors ...prometheus.Collector) {
metrics.Registry.MustRegister(responseSizes)
metrics.Registry.MustRegister(inputTokens)
metrics.Registry.MustRegister(outputTokens)
metrics.Registry.MustRegister(promptCachedTokens)
metrics.Registry.MustRegister(runningRequests)
metrics.Registry.MustRegister(NormalizedTimePerOutputToken)
metrics.Registry.MustRegister(inferencePoolAvgKVCache)
Expand Down Expand Up @@ -306,6 +318,7 @@ func Reset() {
responseSizes.Reset()
inputTokens.Reset()
outputTokens.Reset()
promptCachedTokens.Reset()
runningRequests.Reset()
NormalizedTimePerOutputToken.Reset()
inferencePoolAvgKVCache.Reset()
Expand Down Expand Up @@ -369,6 +382,11 @@ func RecordOutputTokens(modelName, targetModelName string, size int) {
}
}

// RecordPromptCachedTokens records prompt cached tokens count.
func RecordPromptCachedTokens(modelName, targetModelName string, size int) {
promptCachedTokens.WithLabelValues(modelName, targetModelName).Observe(float64(size))
}

// RecordNormalizedTimePerOutputToken (NTPOT) records the normalized time per output token.
func RecordNormalizedTimePerOutputToken(ctx context.Context, modelName, targetModelName string, received time.Time, complete time.Time, outputTokenCount int) bool {
if !complete.After(received) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/epp/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
OutputTokensMetric = InferenceObjectiveComponent + "_output_tokens"
NormalizedTimePerOutputTokenMetric = InferenceObjectiveComponent + "_normalized_time_per_output_token_seconds"
RunningRequestsMetric = InferenceObjectiveComponent + "_running_requests"
PromptCachedTokensMetric = InferenceObjectiveComponent + "_prompt_cached_tokens"
KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
PerPodQueueSizeMetrics = InferencePoolComponent + "_per_pod_queue_size"
Expand Down Expand Up @@ -373,6 +374,7 @@ func TestRecordResponseMetrics(t *testing.T) {
inputToken int
outputToken int
respSize int
cachedToken int
}
scenarios := []struct {
name string
Expand All @@ -386,27 +388,31 @@ func TestRecordResponseMetrics(t *testing.T) {
respSize: 1200,
inputToken: 10,
outputToken: 100,
cachedToken: 5,
},
{
modelName: "m10",
targetModelName: "t10",
respSize: 500,
inputToken: 20,
outputToken: 200,
cachedToken: 10,
},
{
modelName: "m10",
targetModelName: "t11",
respSize: 2480,
inputToken: 30,
outputToken: 300,
cachedToken: 15,
},
{
modelName: "m20",
targetModelName: "t20",
respSize: 80,
inputToken: 40,
outputToken: 400,
cachedToken: 20,
},
},
}}
Expand All @@ -416,6 +422,7 @@ func TestRecordResponseMetrics(t *testing.T) {
RecordInputTokens(resp.modelName, resp.targetModelName, resp.inputToken)
RecordOutputTokens(resp.modelName, resp.targetModelName, resp.outputToken)
RecordResponseSizes(resp.modelName, resp.targetModelName, resp.respSize)
RecordPromptCachedTokens(resp.modelName, resp.targetModelName, resp.cachedToken)
}
wantResponseSize, err := os.Open("testdata/response_sizes_metric")
defer func() {
Expand Down