-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
Describe the problem
To prevent overloading the underlying store and to reduce the impact of rangefeeds on the foreground workload, the number of catchup iterators (pebble iterators used for catchup scans) is limited on a per store basis:
cockroach/pkg/kv/kvserver/replica_rangefeed.go
Lines 280 to 303 in d50ca11
| // If we will be using a catch-up iterator, wait for the limiter here before | |
| // locking raftMu. | |
| usingCatchUpIter := false | |
| iterSemRelease := func() {} | |
| if !args.Timestamp.IsEmpty() { | |
| usingCatchUpIter = true | |
| alloc, err := r.store.limiters.ConcurrentRangefeedIters.Begin(ctx) | |
| if err != nil { | |
| return err | |
| } | |
| // Finish the iterator limit if we exit before the iterator finishes. | |
| // The release function will be hooked into the Close method on the | |
| // iterator below. The sync.Once prevents any races between exiting early | |
| // from this call and finishing the catch-up scan underneath the | |
| // rangefeed.Processor. We need to release here in case we fail to | |
| // register the processor, or, more perniciously, in the case where the | |
| // processor gets registered by shut down before starting the catch-up | |
| // scan. | |
| var iterSemReleaseOnce sync.Once | |
| iterSemRelease = func() { | |
| iterSemReleaseOnce.Do(alloc.Release) | |
| } | |
| } |
In effect, this is a hard limit on the number of catchup scans that can be in flight. If the limit is exhausted, any new rangefeed registrations starting from a timestamp (nearly all rangefeeds), need to wait for the quota.
This limit has been relatively effective, but the current implementation allows a single client to effectively starve all other rangefeed users.
For a large table, it's possible that a single MuxRangefeed call consumes the entire catchup scan budget. If the consumer of that call is slow, it's possible for new MuxRangefeed requests from other clients to be completely blocked for an indefinite amount of time.
Further note that rangefeed replications started by MuxRangefeed are all writing to the same gRPC stream protected by a mutex and all likely being consumed by the same consumer on the other end of that gPRC connection. This may compound the starvation since any individual catchup scan will take longer to complete as it contends with the other catchup scans sharing that stream.
Potential Approaches
- Arbitrarily increase the semaphore limit to make this less likely.
- Limit any one MuxRangefeed call's ability to consume more than some percentage of the total catchup scan semaphore.
- Limit any one logical consumer of rangefeeds (LDR, CHANGEFEED, PCR) ability to consume more than some percentage of the total catchup scan semaphore.
- Replace this simple semaphore with something like a priority queue.
- Remove the destination (kv) side catch-up scan semaphore and rely only on the client side rate limiter.
Additional Notes
- There is also a client-side catchupScanQuota that is structured as a rate limit, with the original intention of limiting the creation of new goroutines:
cockroach/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Lines 254 to 257 in d50ca11
| // Before starting single rangefeed, acquire catchup scan quota. | |
| if err := s.acquireCatchupScanQuota(ctx, m.catchupSem, m.metrics); err != nil { | |
| return err | |
| } |
This is backed by a quotapool.RateLImiter:
cockroach/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Lines 736 to 754 in d50ca11
| func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter { | |
| const slowAcquisitionThreshold = 5 * time.Second | |
| lim := getCatchupRateLimit(sv) | |
| return &catchupScanRateLimiter{ | |
| sv: sv, | |
| limit: lim, | |
| pacer: quotapool.NewRateLimiter( | |
| "distSenderCatchupLimit", lim, 0, /* smooth rate limit without burst */ | |
| quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowCatchupScanAcquisition(slowAcquisitionThreshold))), | |
| } | |
| } | |
| func getCatchupRateLimit(sv *settings.Values) quotapool.Limit { | |
| if r := catchupStartupRate.Get(sv); r > 0 { | |
| return quotapool.Limit(r) | |
| } | |
| return quotapool.Inf() | |
| } |
Jira issue: CRDB-43066