@@ -23,6 +23,7 @@ import (
2323 "time"
2424
2525 "go.uber.org/zap"
26+ "k8s.io/client-go/util/workqueue"
2627 "knative.dev/pkg/hash"
2728 "knative.dev/pkg/logging"
2829 "knative.dev/pkg/logging/logkey"
@@ -38,8 +39,18 @@ const (
3839 retryInterval = 100 * time .Millisecond
3940
4041 // Retry at most 15 seconds to process a stat.
41- maxProcessingRetry = 30
42+ maxProcessingRetry = 30
43+ // retryProcessingInterval is kept for backward compatibility but no longer used.
44+ // Use rateLimiter instead.
4245 retryProcessingInterval = 500 * time .Millisecond
46+
47+ // Rate limiting configuration for retries.
48+ // fastRetryDelay is used for the first maxFastRetryAttempts retries.
49+ fastRetryDelay = 100 * time .Millisecond
50+ // slowRetryDelay is used after maxFastRetryAttempts retries.
51+ slowRetryDelay = 500 * time .Millisecond
52+ // maxFastRetryAttempts is the number of fast retries before switching to slow retries.
53+ maxFastRetryAttempts = 5
4354)
4455
4556var svcURLSuffix = fmt .Sprintf ("svc.%s:%d" , network .GetClusterDomainName (), autoscalerPort )
@@ -75,6 +86,9 @@ type Forwarder struct {
7586
7687 statCh chan stat
7788 stopCh chan struct {}
89+
90+ // rateLimiter controls the retry backoff strategy using fast/slow retry delays.
91+ rateLimiter workqueue.TypedRateLimiter [string ]
7892}
7993
8094// New creates a new Forwarder.
@@ -84,11 +98,12 @@ type Forwarder struct {
8498func New (ctx context.Context , bs * hash.BucketSet ) * Forwarder {
8599 bkts := bs .Buckets ()
86100 f := & Forwarder {
87- logger : logging .FromContext (ctx ),
88- bs : bs ,
89- processors : make (map [string ]bucketProcessor , len (bkts )),
90- statCh : make (chan stat , 1000 ),
91- stopCh : make (chan struct {}),
101+ logger : logging .FromContext (ctx ),
102+ bs : bs ,
103+ processors : make (map [string ]bucketProcessor , len (bkts )),
104+ statCh : make (chan stat , 1000 ),
105+ stopCh : make (chan struct {}),
106+ rateLimiter : workqueue .NewTypedItemFastSlowRateLimiter [string ](fastRetryDelay , slowRetryDelay , maxFastRetryAttempts ),
92107 }
93108
94109 f .processingWg .Add (1 )
@@ -142,23 +157,37 @@ func (f *Forwarder) process() {
142157 if err := p .process (s .sm ); err != nil {
143158 l .Errorw ("Error while processing stat" , zap .Error (err ))
144159 f .maybeRetry (l , s )
160+ } else {
161+ // Successfully processed, forget the retry state for this stat.
162+ f .rateLimiter .Forget (rev )
145163 }
146164 }
147165 }
148166}
149167
150168func (f * Forwarder ) maybeRetry (logger * zap.SugaredLogger , s stat ) {
151- if s .retry > maxProcessingRetry {
152- logger .Warn ("Exceeding max retries. Dropping the stat." )
169+ rev := s .sm .Key .String ()
170+
171+ // Check current retry count before calling When (which increments the count).
172+ numRequeues := f .rateLimiter .NumRequeues (rev )
173+ if numRequeues >= maxProcessingRetry {
174+ logger .Warnf ("Exceeding max retries (%d). Dropping the stat." , maxProcessingRetry )
175+ // Clean up the rate limiter state for this stat.
176+ f .rateLimiter .Forget (rev )
177+ return
153178 }
154179
155- s .retry ++
180+ // Get the retry delay from the rate limiter (fast for initial retries, slow after).
181+ // This will increment the internal retry count.
182+ retryDelay := f .rateLimiter .When (rev )
183+
156184 f .retryWg .Add (1 )
157185 go func () {
158186 defer f .retryWg .Done ()
159- // TODO(yanweiguo): Use RateLimitingInterface and NewItemFastSlowRateLimiter.
160- time .Sleep (retryProcessingInterval )
161- logger .Debug ("Enqueuing stat for retry." )
187+ time .Sleep (retryDelay )
188+ logger .Debugf ("Enqueuing stat for retry (attempt %d, delay %v)." , numRequeues + 1 , retryDelay )
189+ // Increment retry count for tracking (though rate limiter also tracks this).
190+ s .retry ++
162191 f .statCh <- s
163192 }()
164193}
0 commit comments