[Flow Control] Garbage Collection for Priority Bands#2097
[Flow Control] Garbage Collection for Priority Bands#2097k8s-ci-robot merged 9 commits intokubernetes-sigs:mainfrom
Conversation
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Welcome @evacchi! |
|
Hi @evacchi. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@evacchi, this is an excellent draft! I agree with you that the Problem: "Zombie" Band Currently, we have two competing authorities for the existence of a band:
Because the JIT path does not (and currently cannot) acquire the
Why We Can't Just Add a Lock If we try to fix this by making the JIT path acquire
Proposal: Split Admission from Cleanup To solve this safely, I believe we need to decouple the admission from memory management (GC). This likely makes Issue #2011 (controller-driven lifecycle) a prerequisite, but with a twist:
This breaks the lock hierarchy cycle because the JIT path never fights the GC. The JIT only cares if it's in the Next Steps? I think we have two options:
What are your thoughts? I am happy to hop on a call to whiteboard the locking hierarchy if it helps! This is resolved with #2127. |
|
reworking on top of #2127 |
|
top post updated, reworked this on top of #2127 to follow the same pattern @LukeAVanDrie let me know what you think 🙏 |
Sorry for the conflict here. It actually removes a source of fallibility from the registration path, so it hopefully was not too difficult to address. I am taking a look over this PR this morning. Thanks again! |
|
no it was flawless, just a plain rebase, thx for asking 🙏 |
LukeAVanDrie
left a comment
There was a problem hiding this comment.
Thanks for picking this up! This PR looks really good, especially the added test coverage. From a correctness point of view, I think this is ready to merge. I have high confidence that this solves the issue without risking any race conditions/leaks.
I added a few comments on ideas for future simplification / deduplication, but I don't think we should tackle them in this PR.
| // TODO:(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1982) revert to 5m once this GC | ||
| // race condition is properly resolved. | ||
| defaultFlowGCTimeout time.Duration = 1 * time.Hour | ||
| defaultFlowGCTimeout time.Duration = 5 * time.Minute |
There was a problem hiding this comment.
Good catch -- this fix was just merged today, so 5 * time.Minute is safe again.
|
|
||
| // Release the band lease if we created the flow. | ||
| // If JIT provisioning fails for a new flow, we must release that lease to prevent leaking band leases. | ||
| if isNewFlow { |
There was a problem hiding this comment.
nit: This is correct and necessary; however, this highlights a slight leak in our abstraction layers. We are manually unwinding distinct state layers (flowStates delete, then priorityBand release). We may want to consider some resourceLease abstraction that bundles these.
e.g., lease := fr.acquireLease(key) where lease.Release() handles both flow and band decrements.
That being said, I wouldn't make this change in this PR. The current explicitness is safe given the complexity.
There was a problem hiding this comment.
As a general note: priorityBandState is almost structurally identical to flowState, and
pinActivePriorityBand duplicates the pinActiveFlow CAS-loop logic.
I'm not sure how much of an improvement this would be, but we can consider a generic LeasedResource[K comparable] struct or a helper for the Pin --> LoadOrStore --> Lock pattern to reduce the line count and surface area for bugs. I would try to get this PR merged first though as it is already correct and in a good state.
| band := val.(*priorityBand) | ||
|
|
||
| // Check queue count under lock | ||
| shard.mu.RLock() |
There was a problem hiding this comment.
nit: This acquires a read lock on every shard for every band during the GC cycle. With many bands, this scan mechanism could become heavy.
We can add an atomic.Int64 activeQueues to the priorityBand struct in shard.go, updating it when adding/removing queues. This would make isBandActive lock-free and remove the shard lock dependency from the registry GC loop.
Now, since priorities are tied to InferenceObjective CRDs and we run with a single shard by default, this seems like overkill at the moment. Just something to keep in mind if we hit scaling limits down the road.
|
/lgtm |
| s.logger.V(logging.DEBUG).Info("Removed priority band from shard", "priority", priority) | ||
| } | ||
|
|
||
| // sortPriorityLevels sorts the orderedPriorityLevels slice in descending order (highest priority first). |
Signed-off-by: Edoardo Vacchi <[email protected]>
Signed-off-by: Edoardo Vacchi <[email protected]>
Signed-off-by: Edoardo Vacchi <[email protected]>
| // Normally we may assume that only one GC loop is running globally: the following check is defensive. | ||
| // Concurrent GC might happen in test cases if a GC cycle is triggered concurrently with a background GC loop. | ||
| // In the case of concurrent GC execution, both GC cycles might see the same flow in their Range() snapshots. | ||
| // Only the first one to delete it should release the band lease. This prevents double-release bugs. | ||
| if _, existed := fr.flowStates.LoadAndDelete(key); existed { | ||
| flowsToClean = append(flowsToClean, key.(types.FlowKey)) | ||
| fr.logger.V(logging.VERBOSE).Info("Garbage collecting flow", "flowKey", key, "becameIdleAt", idleTime) | ||
|
|
||
| // 5. Release the band lease. | ||
| // Every flow in the map holds exactly one band lease. This flow is being destroyed, | ||
| // so decrement the band's flow count. | ||
| if bandVal, ok := fr.priorityBandStates.Load(priority); ok { | ||
| bandState := bandVal.(*priorityBandState) | ||
| fr.releasePriorityBand(bandState) | ||
| } |
There was a problem hiding this comment.
@LukeAVanDrie rerunning the tests with -race I noticed the band lease count might go to -1, because the tests are both invoking executeGCCycle() explicitly and spinning up a background GC loop in newRegistryTestHarness(). I am not sure this is 100% intentional in the flow tests, it is incorrect for bands tests. So:
- I am adding a
manualGCflag toharnessOptionsand set it to true in the band tests, to ensure they run deterministically - I am adding a defensive check here to make sure that leaseCount is not decremented twice if
executeGCCycle()runs concurrently -- this is actually redundant if concurrent GC is not allowed
however, if executeGCCycle() is not meant to run concurrently, maybe we should add an atomic boolean to FlowRegistry and assert it's false when we enter executeGCCycle() (this will break the Flow GC tests!); then we can revert to a plain Delete() here
There was a problem hiding this comment.
I am adding a manualGC flag to harnessOptions and set it to true in the band tests, to ensure they run deterministically
The way I avoid this in the flow tests is by setting the idle timeout in the config to be very large (e.g., 1hr) effectively disabling the GC loop. Then you use the injected clock to step to the relevant times needed for your test logic.
however, if executeGCCycle() is not meant to run concurrently, maybe we should add an atomic boolean to FlowRegistry and assert it's false when we enter executeGCCycle() (this will break the Flow GC tests!); then we can revert to a plain Delete() here
It is not meant to run concurrently, so this seems reasonable to me.
There was a problem hiding this comment.
I would much prefer the simpler route here than another defensive check.
|
/retest-required |
|
@evacchi: Cannot trigger testing until a trusted user reviews the PR and leaves an DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
/ok-to-test Only holding b/c I dont want it to autosubmit off my comment, will let @LukeAVanDrie control when this is ready. Thanks all! |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: evacchi, kfswain, LukeAVanDrie The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/lgtm |
…/gateway-api-inference-extension#2097) * [Flow Control] Garbage Collection for Priority Bands Signed-off-by: Edoardo Vacchi <[email protected]> * test cases Signed-off-by: Edoardo Vacchi <[email protected]> * Rebuilt on top of main Signed-off-by: Edoardo Vacchi <[email protected]> * redundant tests Signed-off-by: Edoardo Vacchi <[email protected]> * naming conventions Signed-off-by: Edoardo Vacchi <[email protected]> * fix comments Signed-off-by: Edoardo Vacchi <[email protected]> * remove unused code Signed-off-by: Edoardo Vacchi <[email protected]> * fix config tests Signed-off-by: Edoardo Vacchi <[email protected]> * defensive LoadAndDelete() on bands, ensure tests won't GC concurrently Signed-off-by: Edoardo Vacchi <[email protected]> --------- Signed-off-by: Edoardo Vacchi <[email protected]>
What type of PR is this?
/kind feature
What this PR does / why we need it:
Which issue(s) this PR fixes:
Fixes #2012.
Does this PR introduce a user-facing change?:
The PR tries to stick as much as possible to the same pattern for flowState GC'ing.
Config: we introduce
PriorityBandGCTimeoutwith adefaultPriorityBandGCTimeout; this defaults to2 * defaultFlowGCTimeoutto give a "grace period" where the bands are retained even if the owning flows have been already collected, so that if a new flow with the same priority is re-created shortly after, they won't have to be reinstantiated from scratchFlowRegistry: we introduce apriorityBandStates sync.Mapto be used similarly toflowStates sync.MapintprioritiespriorityBandStatestructs; this struct include abecameIdleAt time.TimefieldpinActivePriorityBand(priority)almost verbatim copy ofpinActiveFlow(key types.FlowKey)pinActiveFlow()now also returns a boolean; when true, we callpinActivePriorityBand(priority)executeGCCycle()now also invokesgcPriorityBands(): updates the idle timestamp for priority bands + marks candidates + deletes themisBandActive(priority)checks forlen(band.queues)and usesband.len, band.byteSizeto account for in-flight, buffered requests. There is no equivalent to theleaseCount. Does this make sense?gcPriorityBands()fllowsverifyAndSweepFlows()