Skip to content

Commit 51ba893

Browse files
committed
kvserver,changefeeds,crosscluster: set per-consumer catchup scan limit
Currently, it is easily possible for a single slow rangefeed consumer to acquire the entire catchup scan quota for a given store, preventing any other consumers from advancing. Here, we introduce the concept of a ConsumerID into the rangefeed request. A ConsumerID is that it represents a logical rangefeed consumer such as a changefeed or LDR stream. Such consumers may make multiple MuxRangeFeed requests to a given node despite sharing the same downstream consumer. When per-consumer catchup scan limiting is enabled, no single consumer is allowed to consumer more than 75% of a given store's capacity. If no ConsumerID is specified, a random consumer ID is assigned to all rangefeeds originating from a given MuxRangeFeed call. In the long run, we need a more sophisticated approach to solve this. This change is aimed to be a small improvement that solves the most egregious case: a single slow consumer consuming the entire quota. The goal of this change is an easily backportable feature, however, it comes at the cost of adding yet-another mechanism to the existing systems attempting to limit catchup scans: 1. Client-side rate limiter, 2. Store-level CatchupIter semaphore, 3. Elastic CPU rate limiting in the main CatchUpScan loop, and 4. Any limiting imposed now or in the future by virtue of including and admission header in the request. Informs #132438 Epic: none Release note: None
1 parent a60d739 commit 51ba893

File tree

18 files changed

+273
-16
lines changed

18 files changed

+273
-16
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
514514
Knobs: ca.knobs.FeedKnobs,
515515
ScopedTimers: ca.sliMetrics.Timers,
516516
MonitoringCfg: monitoringCfg,
517+
ConsumerID: int64(ca.spec.JobID),
517518
}, nil
518519
}
519520

pkg/ccl/changefeedccl/kvfeed/kv_feed.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type Config struct {
9393
Knobs TestingKnobs
9494

9595
ScopedTimers *timers.ScopedTimers
96+
97+
ConsumerID int64
9698
}
9799

98100
// Run will run the kvfeed. The feed runs synchronously and returns an
@@ -123,6 +125,7 @@ func Run(ctx context.Context, cfg Config) error {
123125
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp,
124126
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
125127
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
128+
cfg.ConsumerID,
126129
cfg.InitialHighWater, cfg.EndTime,
127130
cfg.Codec,
128131
cfg.SchemaFeed,
@@ -248,6 +251,7 @@ type kvFeed struct {
248251
withDiff bool
249252
withFiltering bool
250253
withInitialBackfill bool
254+
consumerID int64
251255
initialHighWater hlc.Timestamp
252256
endTime hlc.Timestamp
253257
writer kvevent.Writer
@@ -278,6 +282,7 @@ func newKVFeed(
278282
schemaChangeEvents changefeedbase.SchemaChangeEventClass,
279283
schemaChangePolicy changefeedbase.SchemaChangePolicy,
280284
withInitialBackfill, withDiff, withFiltering bool,
285+
consumerID int64,
281286
initialHighWater hlc.Timestamp,
282287
endTime hlc.Timestamp,
283288
codec keys.SQLCodec,
@@ -297,6 +302,7 @@ func newKVFeed(
297302
withInitialBackfill: withInitialBackfill,
298303
withDiff: withDiff,
299304
withFiltering: withFiltering,
305+
consumerID: consumerID,
300306
initialHighWater: initialHighWater,
301307
endTime: endTime,
302308
schemaChangeEvents: schemaChangeEvents,
@@ -585,6 +591,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
585591
Frontier: resumeFrontier.Frontier(),
586592
WithDiff: f.withDiff,
587593
WithFiltering: f.withFiltering,
594+
ConsumerID: f.consumerID,
588595
Knobs: f.knobs,
589596
Timers: f.timers,
590597
RangeObserver: f.rangeObserver,

pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ func TestKVFeed(t *testing.T) {
144144
f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{},
145145
tc.schemaChangeEvents, tc.schemaChangePolicy,
146146
tc.needsInitialScan, tc.withDiff, true, /* withFiltering */
147+
0, /* consumerID */
147148
tc.initialHighWater, tc.endTime,
148149
codec,
149150
tf, sf, rangefeedFactory(ref.run), bufferFactory,

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type rangeFeedConfig struct {
3131
Spans []kvcoord.SpanTimePair
3232
WithDiff bool
3333
WithFiltering bool
34+
ConsumerID int64
3435
RangeObserver kvcoord.RangeObserver
3536
Knobs TestingKnobs
3637
Timers *timers.ScopedTimers

pkg/ccl/crosscluster/producer/event_stream.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
153153
rangefeed.WithFrontierQuantized(quantize.Get(&s.execCfg.Settings.SV)),
154154
rangefeed.WithOnValues(s.onValues),
155155
rangefeed.WithDiff(s.spec.WithDiff),
156+
rangefeed.WithConsumerID(int64(s.streamID)),
156157
rangefeed.WithInvoker(func(fn func() error) error { return fn() }),
157158
rangefeed.WithFiltering(s.spec.WithFiltering),
158159
}

pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
283283

284284
for !s.transport.IsExhausted() {
285285
args := makeRangeFeedRequest(
286-
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs)
286+
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs, m.cfg.consumerID)
287287
args.Replica = s.transport.NextReplica()
288288
args.StreamID = streamID
289289
s.ReplicaDescriptor = args.Replica

pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type rangeFeedConfig struct {
6767
withMetadata bool
6868
withMatchingOriginIDs []uint32
6969
rangeObserver RangeObserver
70+
consumerID int64
7071

7172
knobs struct {
7273
// onRangefeedEvent invoked on each rangefeed event.
@@ -138,6 +139,12 @@ func WithMetadata() RangeFeedOption {
138139
})
139140
}
140141

142+
func WithConsumerID(cid int64) RangeFeedOption {
143+
return optionFunc(func(c *rangeFeedConfig) {
144+
c.consumerID = cid
145+
})
146+
}
147+
141148
// SpanTimePair is a pair of span along with its starting time. The starting
142149
// time is exclusive, i.e. the first possible emitted event (including catchup
143150
// scans) will be at startAfter.Next().
@@ -620,6 +627,7 @@ func makeRangeFeedRequest(
620627
withDiff bool,
621628
withFiltering bool,
622629
withMatchingOriginIDs []uint32,
630+
consumerID int64,
623631
) kvpb.RangeFeedRequest {
624632
admissionPri := admissionpb.BulkNormalPri
625633
if isSystemRange {
@@ -631,6 +639,7 @@ func makeRangeFeedRequest(
631639
Timestamp: startAfter,
632640
RangeID: rangeID,
633641
},
642+
ConsumerID: consumerID,
634643
WithDiff: withDiff,
635644
WithFiltering: withFiltering,
636645
WithMatchingOriginIDs: withMatchingOriginIDs,

pkg/kv/kvclient/rangefeed/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type config struct {
3939
withDiff bool
4040
withFiltering bool
4141
withMatchingOriginIDs []uint32
42+
consumerID int64
4243
onUnrecoverableError OnUnrecoverableError
4344
onCheckpoint OnCheckpoint
4445
frontierQuantize time.Duration
@@ -159,6 +160,12 @@ func WithOriginIDsMatching(originIDs ...uint32) Option {
159160
})
160161
}
161162

163+
func WithConsumerID(cid int64) Option {
164+
return optionFunc(func(c *config) {
165+
c.consumerID = cid
166+
})
167+
}
168+
162169
// WithInvoker makes an option to invoke the rangefeed tasks such as running the
163170
// the client and processing events emitted by the client with a caller-supplied
164171
// function, which can make it easier to introspect into work done by a given

pkg/kv/kvclient/rangefeed/rangefeed.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier span.Frontier, resumeWithF
349349
if f.onMetadata != nil {
350350
rangefeedOpts = append(rangefeedOpts, kvcoord.WithMetadata())
351351
}
352+
rangefeedOpts = append(rangefeedOpts, kvcoord.WithConsumerID(f.consumerID))
352353

353354
for i := 0; r.Next(); i++ {
354355
ts := frontier.Frontier()

pkg/kv/kvclient/rangefeed/rangefeed_external_test.go

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1451,7 +1451,7 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) {
14511451
}
14521452
eventC := make(chan *kvpb.RangeFeedEvent)
14531453
sink := newChannelSink(ctx, eventC)
1454-
require.NoError(t, s3.RangeFeed(sink.ctx, &req, sink)) // check if we've errored yet
1454+
require.NoError(t, s3.RangeFeed(sink.ctx, &req, sink, nil)) // check if we've errored yet
14551455
require.NoError(t, sink.Error())
14561456
t.Logf("started rangefeed on %s", repl3)
14571457

@@ -1876,3 +1876,99 @@ func TestRangeFeedMetadataAutoSplit(t *testing.T) {
18761876
}
18771877
})
18781878
}
1879+
1880+
// TestRangefeedCatchupStarvation tests that a single MuxRangefeed
1881+
// call cannot starve other users. Note that starvation is still
1882+
// possible if there are more than 2 consumers of a given range.
1883+
func TestRangefeedCatchupStarvation(t *testing.T) {
1884+
defer leaktest.AfterTest(t)()
1885+
defer log.Scope(t).Close(t)
1886+
1887+
testutils.RunValues(t, "feed_type", feedTypes, func(t *testing.T, rt rangefeedTestType) {
1888+
ctx := context.Background()
1889+
settings := cluster.MakeTestingClusterSettings()
1890+
kvserver.RangefeedUseBufferedSender.Override(ctx, &settings.SV, rt.useBufferedSender)
1891+
kvserver.RangefeedEnabled.Override(ctx, &settings.SV, true)
1892+
// Lower the limit to make it more likely to get starved.
1893+
kvserver.ConcurrentRangefeedItersLimit.Override(ctx, &settings.SV, 8)
1894+
kvserver.PerConsumerCatchupLimit.Override(ctx, &settings.SV, 6)
1895+
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
1896+
Settings: settings,
1897+
})
1898+
defer srv.Stopper().Stop(ctx)
1899+
s := srv.ApplicationLayer()
1900+
ts := s.Clock().Now()
1901+
scratchKey := append(s.Codec().TenantPrefix(), keys.ScratchRangeMin...)
1902+
scratchKey = scratchKey[:len(scratchKey):len(scratchKey)]
1903+
mkKey := func(k string) roachpb.Key {
1904+
return encoding.EncodeStringAscending(scratchKey, k)
1905+
}
1906+
ranges := 32
1907+
keysPerRange := 128
1908+
totalKeys := ranges * keysPerRange
1909+
for i := range ranges {
1910+
for j := range keysPerRange {
1911+
k := mkKey(fmt.Sprintf("%d-%d", i, j))
1912+
require.NoError(t, db.Put(ctx, k, 1))
1913+
}
1914+
_, _, err := srv.SplitRange(mkKey(fmt.Sprintf("%d", i)))
1915+
require.NoError(t, err)
1916+
}
1917+
1918+
span := roachpb.Span{Key: scratchKey, EndKey: scratchKey.PrefixEnd()}
1919+
f, err := rangefeed.NewFactory(s.AppStopper(), db, s.ClusterSettings(), nil)
1920+
require.NoError(t, err)
1921+
1922+
blocked := make(chan struct{})
1923+
r1, err := f.RangeFeed(ctx, "consumer-1-rf-1", []roachpb.Span{span}, ts,
1924+
func(ctx context.Context, value *kvpb.RangeFeedValue) {
1925+
blocked <- struct{}{}
1926+
<-ctx.Done()
1927+
},
1928+
rangefeed.WithConsumerID(1),
1929+
)
1930+
require.NoError(t, err)
1931+
defer r1.Close()
1932+
<-blocked
1933+
1934+
// Multiple rangefeeds from the same ConsumeID should
1935+
// be treated as the same consumer and thus they
1936+
// shouldn't be able to overwhelm the overall store
1937+
// quota.
1938+
for i := range 8 {
1939+
r1, err := f.RangeFeed(ctx, fmt.Sprintf("consumer-1-rf-%d", i+2), []roachpb.Span{span}, ts,
1940+
func(ctx context.Context, value *kvpb.RangeFeedValue) { <-ctx.Done() },
1941+
rangefeed.WithConsumerID(1),
1942+
)
1943+
require.NoError(t, err)
1944+
defer r1.Close()
1945+
}
1946+
1947+
// Despite 9 rangefeeds above each needing 32 catchup
1948+
// scans, the following rangefeed should always make
1949+
// progress because it has a different consumer ID.
1950+
r2ConsumedRow := make(chan roachpb.Key)
1951+
r2, err := f.RangeFeed(ctx, "rf2", []roachpb.Span{span}, ts,
1952+
func(ctx context.Context, value *kvpb.RangeFeedValue) {
1953+
r2ConsumedRow <- value.Key
1954+
},
1955+
rangefeed.WithConsumerID(2),
1956+
)
1957+
require.NoError(t, err)
1958+
defer r2.Close()
1959+
1960+
// Wait until we see every key we've writen on rf2.
1961+
seen := make(map[string]struct{}, 0)
1962+
for {
1963+
select {
1964+
case r := <-r2ConsumedRow:
1965+
seen[r.String()] = struct{}{}
1966+
if len(seen) >= totalKeys {
1967+
return
1968+
}
1969+
case <-time.After(testutils.DefaultSucceedsSoonDuration):
1970+
t.Fatal("test timed out")
1971+
}
1972+
}
1973+
})
1974+
}

0 commit comments

Comments
 (0)