Skip to content

Commit 6d68a68

Browse files
committed
kvstreamer: fix pathological behavior in InOrder mode
This commit fixes the case of pathological behavior by the streamer in the InOrder mode in some cases. Namely, when ordering needs to be maintained, the streamer needs to prioritize sub-requests that have higher "urgency" to be served (i.e. those that are closer to the head of the line). This "urgency" is represented by the values in `singleRangeBatch.positions` slice where the smaller the value, the higher the urgency, and the value at the zeroth index is used as the priority for the whole single-range batch. It is assumed that the values in this slice are increasing, but this assumption could previously be violated when multiple ranges were touched (when the original batch fit within a single range, we have a separate fast-path that is unaffected by this bug). This was the case because we used `mustPreserveOrder = false` when instantiating the batch truncation helper. As a result, all sub-requests within the single-range batch would get reordered according to the start key of each request, and the original order wouldn't be restored by the batch truncation helper. This, in turn, would result in the streamer evaluating the requests with effectively random urgency which would then consume the working budget. In the extreme, we would use up all available budget for random requests, buffer them, and would keep on doing so until we get lucky to get the next head-of-the-line request randomly. This is now fixed by restoring the order of `positions` by the truncation helper when the streamer is in the InOrder mode. This commit also adds a test-only assertion for ensuring the ascending invariant is maintained. Here is a concrete example of the behavior. Say, we have two ranges [a - f) and [f - ...) and requests 0: Get(c) 1: Get(e) 2: Get(d) 3: Get(f) 4: Get(a) 5: Get(b) The batch truncation helper will first order all requests by the start key, so it'll process them in the order 4 - 5 - 0 - 2 - 1 - 3. When truncating to the first range [a - f), it'll populate `positions` as `[4, 5, 0, 2, 1]` (request 3 is outside of the range, so it'll stop). This slice is what we would previously include into `singleRangeBatch.positions`, so we would first evaluate the 4th request, then the 5th, etc. Previously, we would also incorrectly compare `singleRangeBatch`es between each other for "in order" priority. AFAICT this bug has been present since the introduction of the batch truncation helper in 645c154. The assumption of the InOrder mode was already there, in the comment, but wasn't enforced and was overlooked. Release note (bug fix): Previously, when executing queries with index / lookup joins when the ordering needs to be maintained, CockroachDB in some cases could get into a pathological behavior which would lead to increased query latency, possibly by several orders of magnitude. This bug was introduced in 22.2 and is now fixed.
1 parent 19f8f61 commit 6d68a68

File tree

2 files changed

+80
-3
lines changed

2 files changed

+80
-3
lines changed

pkg/kv/kvclient/kvstreamer/streamer.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -562,9 +562,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
562562
// ranges.
563563
if s.truncationHelper == nil {
564564
// The streamer can process the responses in an arbitrary order, so
565-
// we don't require the helper to preserve the order of requests and
566-
// allow it to reorder the reqs slice too.
567-
const mustPreserveOrder = false
565+
// we don't require the helper to preserve the order of requests,
566+
// unless we're in the InOrder mode when we must maintain increasing
567+
// positions. We unconditionally allow reordering of the reqs slice
568+
// though.
569+
var mustPreserveOrder = s.mode == InOrder
568570
const canReorderRequestsSlice = true
569571
s.truncationHelper, err = kvcoord.NewBatchTruncationHelper(
570572
scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice,
@@ -1343,6 +1345,20 @@ func (w *workerCoordinator) performRequestAsync(
13431345
ba.AdmissionHeader.NoMemoryReservedAtSource = false
13441346
ba.Requests = req.reqs
13451347

1348+
if buildutil.CrdbTestBuild {
1349+
if w.s.mode == InOrder {
1350+
for i := range req.positions[:len(req.positions)-1] {
1351+
if req.positions[i] >= req.positions[i+1] {
1352+
w.s.results.setError(errors.AssertionFailedf(
1353+
"positions aren't ascending: %d before %d at index %d",
1354+
req.positions[i], req.positions[i+1], i,
1355+
))
1356+
return
1357+
}
1358+
}
1359+
}
1360+
}
1361+
13461362
// TODO(yuzefovich): in Enqueue we split all requests into
13471363
// single-range batches, so ideally ba touches a single range in
13481364
// which case we hit the fast path in the DistSender. However, if
@@ -1770,6 +1786,9 @@ func buildResumeSingleRangeBatch(
17701786
// We've already reconciled the budget with the actual reservation for the
17711787
// requests with the ResumeSpans.
17721788
resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage
1789+
// TODO(yuzefovich): add heuristic for making fresh allocation of slices
1790+
// whenever only a fraction of them will be used by the resume batch. This
1791+
// will allow us to return most of overheadAccountedFor to the budget.
17731792
resumeReq.overheadAccountedFor = req.overheadAccountedFor
17741793
// Note that due to limitations of the KV layer (#75452) we cannot reuse
17751794
// original requests because the KV doesn't allow mutability (and all

pkg/kv/kvclient/kvstreamer/streamer_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,3 +690,61 @@ ALTER TABLE t SPLIT AT SELECT generate_series(1, 30000, 3000);
690690
}
691691
}
692692
}
693+
694+
// TestStreamerRandomAccess verifies that the Streamer handles the requests that
695+
// have random access pattern within ranges reasonably well. It is a regression
696+
// test for #133043.
697+
func TestStreamerRandomAccess(t *testing.T) {
698+
defer leaktest.AfterTest(t)()
699+
defer log.Scope(t).Close(t)
700+
701+
skip.UnderStress(t)
702+
skip.UnderRace(t)
703+
704+
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
705+
defer s.Stopper().Stop(context.Background())
706+
707+
rng, _ := randutil.NewTestRand()
708+
runner := sqlutils.MakeSQLRunner(db)
709+
// Create a table with 3 ranges, with 2k rows in each. Each row is about
710+
// 2.7KiB in size and has a random value in column 'v'.
711+
runner.Exec(t, `
712+
CREATE TABLE t (
713+
k INT PRIMARY KEY,
714+
v INT,
715+
blob STRING,
716+
INDEX v_idx (v)
717+
);
718+
719+
INSERT INTO t (k, v, blob) SELECT i, (random()*6000)::INT, repeat('a', 2700) FROM generate_series(1, 6000) AS g(i);
720+
721+
ALTER TABLE t SPLIT AT SELECT i*2000 FROM generate_series(0, 2) AS g(i);
722+
`)
723+
724+
// The meat of the test - run the query that performs an index join to fetch
725+
// all rows via the streamer, both in the OutOfOrder and InOrder modes, and
726+
// with different workmem limits. Each time assert that the number of
727+
// BatchRequests issued is relatively small (if not, then the streamer was
728+
// extremely suboptimal).
729+
kvGRPCCallsRegex := regexp.MustCompile(`KV gRPC calls: ([\d,]+)`)
730+
for i := 0; i < 10; i++ {
731+
// Pick random workmem limit in [2MiB; 16MiB] range.
732+
workmem := 2<<20 + rng.Intn(14<<20)
733+
runner.Exec(t, fmt.Sprintf("SET distsql_workmem = '%dB'", workmem))
734+
for inOrder := range []bool{false, true} {
735+
runner.Exec(t, `SET streamer_always_maintain_ordering = $1;`, inOrder)
736+
gRPCCalls := -1
737+
var err error
738+
rows := runner.QueryStr(t, `EXPLAIN ANALYZE SELECT * FROM t@v_idx WHERE v > 0`)
739+
for _, row := range rows {
740+
if matches := kvGRPCCallsRegex.FindStringSubmatch(row[0]); len(matches) > 0 {
741+
gRPCCalls, err = strconv.Atoi(strings.ReplaceAll(matches[1], ",", ""))
742+
require.NoError(t, err)
743+
break
744+
}
745+
}
746+
require.Greater(t, gRPCCalls, 0, rows)
747+
require.Greater(t, 150, gRPCCalls, rows)
748+
}
749+
}
750+
}

0 commit comments

Comments
 (0)