@@ -227,54 +227,74 @@ func (fc *FlowController) EnqueueAndWait(
227227 reqCtx , cancel , enqueueTime := fc .createRequestContext (ctx , req )
228228 defer cancel ()
229229
230- // 2. Enter the distribution loop to find a home for the request.
231- // This loop is responsible for retrying on ErrShardDraining.
232- for {
233- select { // Non-blocking check on controller lifecycle.
234- case <- fc .parentCtx .Done ():
235- return types .QueueOutcomeRejectedOther , fmt .Errorf ("%w: %w" , types .ErrRejected , types .ErrFlowControllerNotRunning )
236- default :
237- }
230+ var finalOutcome types.QueueOutcome
231+
232+ // 2. Acquire a lease for the Flow.
233+ // We hold this lease for the entire duration of the request (Distribution + Queueing).
234+ err := fc .registry .WithConnection (flowKey , func (conn contracts.ActiveFlowConnection ) error {
235+ // 3. Enter the distribution loop to find a home for the request.
236+ // This loop is responsible for retrying on ErrShardDraining.
237+ // We can safely retry within this loop using the same 'conn' object because conn.ActiveShards() provides a live
238+ // view of the topology.
239+ for {
240+ select { // Non-blocking check on controller lifecycle.
241+ case <- fc .parentCtx .Done ():
242+ finalOutcome = types .QueueOutcomeRejectedOther
243+ return fmt .Errorf ("%w: %w" , types .ErrRejected , types .ErrFlowControllerNotRunning )
244+ default :
245+ }
238246
239- // Attempt to distribute the request once.
240- item , err := fc .tryDistribution (reqCtx , req , enqueueTime )
241- if err != nil {
242- // Distribution failed terminally (e.g., no shards, context cancelled during blocking submit).
243- // The item has already been finalized by tryDistribution.
244- finalState := item .FinalState ()
245- return finalState .Outcome , finalState .Err
246- }
247+ // Attempt to distribute the request once, passing the active connection.
248+ item , err := fc .tryDistribution (reqCtx , req , enqueueTime , conn )
249+ if err != nil {
250+ // Distribution failed terminally (e.g., no shards, context cancelled during blocking submit).
251+ // The item has already been finalized by tryDistribution.
252+ finalState := item .FinalState ()
253+ finalOutcome = finalState .Outcome
254+ return finalState .Err
255+ }
247256
248- // Distribution was successful; ownership of the item has been transferred to a processor.
249- // Now, we block here in awaitFinalization until the request is finalized by either the processor (e.g., dispatched,
250- // rejected) or the controller itself (e.g., caller's context cancelled/TTL expired).
251- outcome , err := fc .awaitFinalization (reqCtx , item )
252- if errors .Is (err , contracts .ErrShardDraining ) {
253- // This is a benign race condition where the chosen shard started draining after acceptance.
254- fc .logger .V (logutil .DEBUG ).Info ("Selected shard is Draining, retrying request distribution" ,
255- "flowKey" , req .FlowKey (), "requestID" , req .ID ())
256- // Introduce a small, randomized delay (1-10ms) to prevent tight spinning loops and thundering herds during retry
257- // scenarios (e.g., shard draining)
258- // TODO: Replace this with a more sophisticated backoff strategy when our data parallelism story matures.
259- // For now, this is more than sufficient.
260- jitterMs := k8srand .Intn (10 ) + 1
261- fc .clock .Sleep (time .Duration (jitterMs ) * time .Millisecond )
262- continue
257+ // Distribution was successful; ownership of the item has been transferred to a processor.
258+ // Now, we block here in awaitFinalization until the request is finalized by either the processor (e.g., dispatched,
259+ // rejected) or the controller itself (e.g., caller's context cancelled/TTL expired).
260+ outcome , err := fc .awaitFinalization (reqCtx , item )
261+ if errors .Is (err , contracts .ErrShardDraining ) {
262+ // This is a benign race condition where the chosen shard started draining after acceptance.
263+ fc .logger .V (logutil .DEBUG ).Info ("Selected shard is Draining, retrying request distribution" ,
264+ "flowKey" , req .FlowKey (), "requestID" , req .ID ())
265+ // Introduce a small, randomized delay (1-10ms) to prevent tight spinning loops and thundering herds during retry
266+ // scenarios (e.g., shard draining)
267+ jitterMs := k8srand .Intn (10 ) + 1
268+ fc .clock .Sleep (time .Duration (jitterMs ) * time .Millisecond )
269+ continue
270+ }
271+
272+ // The outcome is terminal (Dispatched, Evicted, or a non-retriable rejection).
273+ finalOutcome = outcome
274+ return err
263275 }
276+ })
264277
265- // The outcome is terminal (Dispatched, Evicted, or a non-retriable rejection).
266- return outcome , err
278+ // If WithConnection returned an error (e.g. connection failure, context cancelled before lease), we must ensure we
279+ // return a valid rejection outcome.
280+ // In the success case (where the closure ran), finalOutcome is set inside the closure.
281+ if err != nil && finalOutcome == types .QueueOutcomeNotYetFinalized {
282+ return types .QueueOutcomeRejectedOther , fmt .Errorf ("%w: %w" , types .ErrRejected , err )
267283 }
284+
285+ return finalOutcome , err
268286}
269287
270288var errNoShards = errors .New ("no viable active shards available" )
271289
272290// tryDistribution handles a single attempt to select a shard and submit a request.
291+ // It uses the provided `conn` to identify candidate shards.
273292// If this function returns an error, it guarantees that the provided `item` has been finalized.
274293func (fc * FlowController ) tryDistribution (
275294 reqCtx context.Context ,
276295 req types.FlowControlRequest ,
277296 enqueueTime time.Time ,
297+ conn contracts.ActiveFlowConnection ,
278298) (* internal.FlowItem , error ) {
279299 // Calculate effective TTL for item initialization (reqCtx is the enforcement mechanism).
280300 effectiveTTL := fc .config .DefaultRequestTTL
@@ -287,7 +307,7 @@ func (fc *FlowController) tryDistribution(
287307 // We must create a fresh FlowItem on each attempt as finalization is per-lifecycle.
288308 item := internal .NewItem (req , effectiveTTL , enqueueTime )
289309
290- candidates , err := fc .selectDistributionCandidates (item . OriginalRequest (). FlowKey () )
310+ candidates , err := fc .selectDistributionCandidates (conn )
291311 if err != nil {
292312 outcome := types .QueueOutcomeRejectedOther
293313 if errors .Is (err , errNoShards ) {
@@ -367,35 +387,29 @@ type candidate struct {
367387 byteSize uint64
368388}
369389
370- // selectDistributionCandidates identifies all Active shards for the item's flow and ranks them by the current byte size
390+ // selectDistributionCandidates identifies all Active shards for the leased flow and ranks them by the current byte size
371391// of that flow's queue, from least to most loaded.
372- func (fc * FlowController ) selectDistributionCandidates (key types.FlowKey ) ([]candidate , error ) {
373- var candidates []candidate
374-
375- // Acquire a connection to the registry for the flow key. This ensures a consistent view of the ActiveShards for the
376- // duration of the shard selection process, preventing races with concurrent shard topology changes.
377- err := fc .registry .WithConnection (key , func (conn contracts.ActiveFlowConnection ) error {
378- shards := conn .ActiveShards ()
379- candidates = make ([]candidate , 0 , len (shards ))
380- for _ , shard := range shards {
381- worker := fc .getOrStartWorker (shard )
382- mq , err := shard .ManagedQueue (key )
383- if err != nil {
384- fc .logger .Error (err ,
385- "Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard." ,
386- "flowKey" , key , "shardID" , shard .ID ())
387- continue
388- }
389- candidates = append (candidates , candidate {worker .processor , shard .ID (), mq .FlowQueueAccessor ().ByteSize ()})
392+ func (fc * FlowController ) selectDistributionCandidates (conn contracts.ActiveFlowConnection ) ([]candidate , error ) {
393+ shards := conn .ActiveShards ()
394+ if len (shards ) == 0 {
395+ return nil , fmt .Errorf ("%w for flow %s" , errNoShards , conn .FlowKey ())
396+ }
397+
398+ candidates := make ([]candidate , 0 , len (shards ))
399+ for _ , shard := range shards {
400+ worker := fc .getOrStartWorker (shard )
401+ mq , err := shard .ManagedQueue (conn .FlowKey ())
402+ if err != nil {
403+ fc .logger .Error (err ,
404+ "Invariant violation. Failed to get ManagedQueue for a leased flow on an Active shard. Skipping shard." ,
405+ "flowKey" , conn .FlowKey (), "shardID" , shard .ID ())
406+ continue
390407 }
391- return nil
392- })
393- if err != nil {
394- return nil , fmt .Errorf ("failed to acquire lease for flow %s: %w" , key , err )
408+ candidates = append (candidates , candidate {worker .processor , shard .ID (), mq .FlowQueueAccessor ().ByteSize ()})
395409 }
396410
397411 if len (candidates ) == 0 {
398- return nil , fmt .Errorf ("%w for flow %s" , errNoShards , key )
412+ return nil , fmt .Errorf ("%w for flow %s" , errNoShards , conn . FlowKey () )
399413 }
400414
401415 slices .SortFunc (candidates , func (a , b candidate ) int {
0 commit comments