Skip to content

Commit c86059d

Browse files
authored
[pluggable bbr] add CycleState for per-request shared plugin stateDuplicate CycleState, StateKey, StateData and ErrNotFound from EPPinto pkg/bbr/framework/cycle_state.go with no EPP dependency.Add CycleState field to RequestContext and initialize it per request. (#2612)
Signed-off-by: noalimoy <nlimoy@redhat.com>
1 parent 700da9c commit c86059d

2 files changed

Lines changed: 100 additions & 2 deletions

File tree

pkg/bbr/framework/cycle_state.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package framework
18+
19+
import (
20+
"errors"
21+
"fmt"
22+
"sync"
23+
)
24+
25+
var (
26+
// ErrNotFound is the not found error message.
27+
ErrNotFound = errors.New("not found")
28+
)
29+
30+
// StateKey is the type of keys stored in CycleState.
31+
type StateKey string
32+
33+
// StateData is a generic type for arbitrary data stored in CycleState.
34+
type StateData interface {
35+
// Clone is an interface to make a copy of StateData.
36+
Clone() StateData
37+
}
38+
39+
// NewCycleState initializes a new CycleState and returns its pointer.
40+
func NewCycleState() *CycleState {
41+
return &CycleState{}
42+
}
43+
44+
// CycleState provides a mechanism for plugins to store and retrieve arbitrary data.
45+
// StateData stored by one plugin can be read, altered, or deleted by another plugin.
46+
// CycleState does not provide any data protection, as all plugins are assumed to be
47+
// trusted.
48+
// Note: CycleState uses a sync.Map to back the storage, because it is thread safe.
49+
// It's aimed to optimize for the "write once and read many times" scenarios.
50+
type CycleState struct {
51+
// key: StateKey, value: StateData
52+
storage sync.Map
53+
}
54+
55+
// Read retrieves data with the given "key" from CycleState. If the key is not
56+
// present, ErrNotFound is returned.
57+
//
58+
// See CycleState for notes on concurrency.
59+
func (c *CycleState) Read(key StateKey) (StateData, error) {
60+
if v, ok := c.storage.Load(key); ok {
61+
return v.(StateData), nil
62+
}
63+
return nil, ErrNotFound
64+
}
65+
66+
// Write stores the given "val" in CycleState with the given "key".
67+
//
68+
// See CycleState for notes on concurrency.
69+
func (c *CycleState) Write(key StateKey, val StateData) {
70+
c.storage.Store(key, val)
71+
}
72+
73+
// Delete deletes data with the given key from CycleState.
74+
//
75+
// See CycleState for notes on concurrency.
76+
func (c *CycleState) Delete(key StateKey) {
77+
c.storage.Delete(key)
78+
}
79+
80+
// ReadCycleStateKey retrieves data with the given key from CycleState and asserts it to type T.
81+
// Returns an error if the key is not found or the type assertion fails.
82+
func ReadCycleStateKey[T StateData](c *CycleState, key StateKey) (T, error) {
83+
var zero T
84+
85+
raw, err := c.Read(key)
86+
if err != nil {
87+
return zero, err
88+
}
89+
90+
val, ok := raw.(T)
91+
if !ok {
92+
return zero, fmt.Errorf("unexpected type for key %q: got %T", key, raw)
93+
}
94+
95+
return val, nil
96+
}

pkg/bbr/handlers/server.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type Server struct {
6868
type RequestContext struct {
6969
RequestReceivedTimestamp time.Time
7070
ResponseCompleteTimestamp time.Time
71+
CycleState *framework.CycleState
7172
Request *framework.InferenceRequest
7273
Response *framework.InferenceResponse
7374
}
@@ -91,8 +92,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
9192
loggerVerbose.Info("Processing")
9293

9394
reqCtx := &RequestContext{
94-
Request: framework.NewInferenceRequest(),
95-
Response: framework.NewInferenceResponse(),
95+
CycleState: framework.NewCycleState(),
96+
Request: framework.NewInferenceRequest(),
97+
Response: framework.NewInferenceResponse(),
9698
}
9799
var body []byte
98100
respStreamedBody := &streamedBody{}

0 commit comments

Comments
 (0)