Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
// ranges.
if s.truncationHelper == nil {
// The streamer can process the responses in an arbitrary order, so
// we don't require the helper to preserve the order of requests and
// allow it to reorder the reqs slice too.
const mustPreserveOrder = false
// we don't require the helper to preserve the order of requests,
// unless we're in the InOrder mode when we must maintain increasing
// positions. We unconditionally allow reordering of the reqs slice
// though.
var mustPreserveOrder = s.mode == InOrder
const canReorderRequestsSlice = true
s.truncationHelper, err = kvcoord.NewBatchTruncationHelper(
scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice,
Expand Down Expand Up @@ -1343,6 +1345,20 @@ func (w *workerCoordinator) performRequestAsync(
ba.AdmissionHeader.NoMemoryReservedAtSource = false
ba.Requests = req.reqs

if buildutil.CrdbTestBuild {
if w.s.mode == InOrder {
for i := range req.positions[:len(req.positions)-1] {
if req.positions[i] >= req.positions[i+1] {
w.s.results.setError(errors.AssertionFailedf(
"positions aren't ascending: %d before %d at index %d",
req.positions[i], req.positions[i+1], i,
))
return
}
}
}
}

// TODO(yuzefovich): in Enqueue we split all requests into
// single-range batches, so ideally ba touches a single range in
// which case we hit the fast path in the DistSender. However, if
Expand Down Expand Up @@ -1770,6 +1786,9 @@ func buildResumeSingleRangeBatch(
// We've already reconciled the budget with the actual reservation for the
// requests with the ResumeSpans.
resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage
// TODO(yuzefovich): add heuristic for making fresh allocation of slices
// whenever only a fraction of them will be used by the resume batch. This
// will allow us to return most of overheadAccountedFor to the budget.
resumeReq.overheadAccountedFor = req.overheadAccountedFor
// Note that due to limitations of the KV layer (#75452) we cannot reuse
// original requests because the KV doesn't allow mutability (and all
Expand Down
63 changes: 61 additions & 2 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,11 +671,11 @@ ALTER TABLE t SPLIT AT SELECT generate_series(1, 30000, 3000);
// all rows via the streamer, both in the OutOfOrder and InOrder modes. Each
// time assert that the number of BatchRequests issued is in double digits
// (if not, then the streamer was extremely suboptimal).
kvGRPCCallsRegex := regexp.MustCompile(`KV gRPC calls: (\d+,)`)
kvGRPCCallsRegex := regexp.MustCompile(`KV gRPC calls: ([\d,]+)`)
for inOrder := range []bool{false, true} {
runner.Exec(t, `SET streamer_always_maintain_ordering = $1;`, inOrder)
for i := 0; i < 2; i++ {
var gRPCCalls int
gRPCCalls := -1
var err error
rows := runner.QueryStr(t, `EXPLAIN ANALYZE SELECT length(blob) FROM t@t_v_idx WHERE v = '1';`)
for _, row := range rows {
Expand All @@ -685,7 +685,66 @@ ALTER TABLE t SPLIT AT SELECT generate_series(1, 30000, 3000);
break
}
}
require.Greater(t, gRPCCalls, 0, rows)
require.Greater(t, 100, gRPCCalls, rows)
}
}
}

// TestStreamerRandomAccess verifies that the Streamer handles the requests that
// have random access pattern within ranges reasonably well. It is a regression
// test for #133043.
func TestStreamerRandomAccess(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStress(t)
skip.UnderRace(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())

rng, _ := randutil.NewTestRand()
runner := sqlutils.MakeSQLRunner(db)
// Create a table with 3 ranges, with 2k rows in each. Each row is about
// 2.7KiB in size and has a random value in column 'v'.
runner.Exec(t, `
CREATE TABLE t (
k INT PRIMARY KEY,
v INT,
blob STRING,
INDEX v_idx (v)
);

INSERT INTO t (k, v, blob) SELECT i, (random()*6000)::INT, repeat('a', 2700) FROM generate_series(1, 6000) AS g(i);

ALTER TABLE t SPLIT AT SELECT i*2000 FROM generate_series(0, 2) AS g(i);
`)

// The meat of the test - run the query that performs an index join to fetch
// all rows via the streamer, both in the OutOfOrder and InOrder modes, and
// with different workmem limits. Each time assert that the number of
// BatchRequests issued is relatively small (if not, then the streamer was
// extremely suboptimal).
kvGRPCCallsRegex := regexp.MustCompile(`KV gRPC calls: ([\d,]+)`)
for i := 0; i < 10; i++ {
// Pick random workmem limit in [2MiB; 16MiB] range.
workmem := 2<<20 + rng.Intn(14<<20)
runner.Exec(t, fmt.Sprintf("SET distsql_workmem = '%dB'", workmem))
for inOrder := range []bool{false, true} {
runner.Exec(t, `SET streamer_always_maintain_ordering = $1;`, inOrder)
gRPCCalls := -1
var err error
rows := runner.QueryStr(t, `EXPLAIN ANALYZE SELECT * FROM t@v_idx WHERE v > 0`)
for _, row := range rows {
if matches := kvGRPCCallsRegex.FindStringSubmatch(row[0]); len(matches) > 0 {
gRPCCalls, err = strconv.Atoi(strings.ReplaceAll(matches[1], ",", ""))
require.NoError(t, err)
break
}
}
require.Greater(t, gRPCCalls, 0, rows)
require.Greater(t, 150, gRPCCalls, rows)
}
}
}