Skip to content

Commit 645c154

Browse files
committed
kvcoord: introduce batch truncation helper
This commit introduces a batch truncation helper that encompasses logic of truncating requests to the boundaries of a single range as well as returning the next key to seek the range iterator to. The helper is now used both in the DistSender as well as in the Streamer. No modification to the actual logic of `Truncate` nor `Next`/`prev` functions has been done other than incorporating the return of the next seek key into `Truncate` function itself. This is needed since the following commit will tightly couple the truncation process with the next seek key determination in order to optimize it. The helper can be configured with a knob indicating whether `Truncate` needs to return requests in the original order. This behavior is necessary by the BatchRequests that contain writes since in several spots we rely on the ordering assumptions (e.g. of increasing values of `Sequence`). The following adjustments were made to the tests: - `BenchmarkTruncate` has been renamed to `BenchmarkTruncateLegacy` - `TestTruncate` has been refactored to exercise the new and the old code-paths - `TestBatchPrevNext` has been refactored to run through the new code path, also a few test cases have been adjusted slightly. This commit also introduces some unit tests for the new code path when it runs in a loop over multiple ranges as well as a corresponding benchmark. Release note: None
1 parent 33dfff0 commit 645c154

File tree

5 files changed

+537
-159
lines changed

5 files changed

+537
-159
lines changed

pkg/kv/kvclient/kvcoord/batch.go

Lines changed: 138 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,121 @@ import (
1616
"github.com/cockroachdb/errors"
1717
)
1818

19-
var emptyHeader = roachpb.RequestHeader{}
19+
// BatchTruncationHelper is a utility struct that helps with truncating requests
20+
// to range boundaries as well as figuring out the next key to seek to for the
21+
// range iterator.
22+
//
23+
// The caller should not use the helper if all requests fit within a single
24+
// range since the helper has non-trivial setup cost.
25+
//
26+
// It is designed to be used roughly as follows:
27+
//
28+
// rs := keys.Range(requests)
29+
// ri.Seek(scanDir, rs.Key)
30+
// if !ri.NeedAnother(rs) {
31+
// // All requests fit within a single range, don't use the helper.
32+
// ...
33+
// }
34+
// helper := MakeBatchTruncationHelper(scanDir, requests)
35+
// for ri.Valid() {
36+
// curRangeRS := rs.Intersect(ri.Token().Desc())
37+
// curRangeReqs, positions, seekKey := helper.Truncate(curRangeRS)
38+
// // Process curRangeReqs that touch a single range and then use positions
39+
// // to reassemble the result.
40+
// ...
41+
// ri.Seek(scanDir, seekKey)
42+
// }
43+
//
44+
type BatchTruncationHelper struct {
45+
scanDir ScanDirection
46+
requests []roachpb.RequestUnion
47+
// mustPreserveOrder indicates whether the requests must be returned by
48+
// Truncate() in the original order.
49+
mustPreserveOrder bool
50+
}
51+
52+
// MakeBatchTruncationHelper returns a new BatchTruncationHelper for the given
53+
// requests.
54+
//
55+
// mustPreserveOrder, if true, indicates that the caller requires that requests
56+
// are returned by Truncate() in the original order (i.e. with strictly
57+
// increasing positions values).
58+
func MakeBatchTruncationHelper(
59+
scanDir ScanDirection, requests []roachpb.RequestUnion, mustPreserveOrder bool,
60+
) (BatchTruncationHelper, error) {
61+
var ret BatchTruncationHelper
62+
ret.scanDir = scanDir
63+
ret.requests = requests
64+
ret.mustPreserveOrder = mustPreserveOrder
65+
return ret, nil
66+
}
2067

2168
// Truncate restricts all requests to the given key range and returns new,
2269
// truncated, requests. All returned requests are "truncated" to the given span,
2370
// and requests which are found to not overlap the given span at all are
71+
// removed. A mapping of response index to request index is returned. It also
72+
// returns the next seek key for the range iterator. With Ascending scan
73+
// direction, the next seek key is such that requests in [RKeyMin, seekKey)
74+
// range have been processed, with Descending scan direction, it is such that
75+
// requests in [seekKey, RKeyMax) range have been processed.
76+
//
77+
// For example, if
78+
//
79+
// reqs = Put[a], Put[c], Put[b],
80+
// rs = [a,bb],
81+
// BatchTruncationHelper.Init(Ascending, reqs)
82+
//
83+
// then BatchTruncationHelper.Truncate(rs) returns (Put[a], Put[b]), positions
84+
// [0,2] as well as seekKey 'c'.
85+
//
86+
// Truncate returns the requests in an arbitrary order (meaning that positions
87+
// return values might not be ascending), unless mustPreserveOrder was true in
88+
// Init().
89+
//
90+
// NOTE: it is assumed that
91+
// 1. Truncate has been called on the previous ranges that intersect with
92+
// keys.Range(reqs);
93+
// 2. rs is intersected with the current range boundaries.
94+
func (h *BatchTruncationHelper) Truncate(
95+
rs roachpb.RSpan,
96+
) ([]roachpb.RequestUnion, []int, roachpb.RKey, error) {
97+
truncReqs, positions, err := truncateLegacy(h.requests, rs)
98+
if err != nil {
99+
return nil, nil, nil, err
100+
}
101+
var seekKey roachpb.RKey
102+
if h.scanDir == Ascending {
103+
// In next iteration, query next range.
104+
// It's important that we use the EndKey of the current descriptor
105+
// as opposed to the StartKey of the next one: if the former is stale,
106+
// it's possible that the next range has since merged the subsequent
107+
// one, and unless both descriptors are stale, the next descriptor's
108+
// StartKey would move us to the beginning of the current range,
109+
// resulting in a duplicate scan.
110+
seekKey, err = nextLegacy(h.requests, rs.EndKey)
111+
} else {
112+
// In next iteration, query previous range.
113+
// We use the StartKey of the current descriptor as opposed to the
114+
// EndKey of the previous one since that doesn't have bugs when
115+
// stale descriptors come into play.
116+
seekKey, err = prevLegacy(h.requests, rs.Key)
117+
}
118+
return truncReqs, positions, seekKey, err
119+
}
120+
121+
var emptyHeader = roachpb.RequestHeader{}
122+
123+
// truncateLegacy restricts all requests to the given key range and returns new,
124+
// truncated, requests. All returned requests are "truncated" to the given span,
125+
// and requests which are found to not overlap the given span at all are
24126
// removed. A mapping of response index to request index is returned. For
25127
// example, if
26128
//
27129
// reqs = Put[a], Put[c], Put[b],
28130
// rs = [a,bb],
29131
//
30-
// then Truncate(reqs,rs) returns (Put[a], Put[b]) and positions [0,2].
31-
func Truncate(
132+
// then truncateLegacy(reqs,rs) returns (Put[a], Put[b]) and positions [0,2].
133+
func truncateLegacy(
32134
reqs []roachpb.RequestUnion, rs roachpb.RSpan,
33135
) ([]roachpb.RequestUnion, []int, error) {
34136
truncateOne := func(args roachpb.Request) (hasRequest bool, changed bool, _ roachpb.RequestHeader, _ error) {
@@ -64,26 +166,26 @@ func Truncate(
64166
local = true
65167
}
66168
if keyAddr.Less(rs.Key) {
67-
// rs.Key can't be local because it contains range split points, which
68-
// are never local.
169+
// rs.Key can't be local because it contains range split points,
170+
// which are never local.
69171
changed = true
70172
if !local {
71173
header.Key = rs.Key.AsRawKey()
72174
} else {
73-
// The local start key should be truncated to the boundary of local keys which
74-
// address to rs.Key.
175+
// The local start key should be truncated to the boundary of
176+
// local keys which address to rs.Key.
75177
header.Key = keys.MakeRangeKeyPrefix(rs.Key)
76178
}
77179
}
78180
if !endKeyAddr.Less(rs.EndKey) {
79-
// rs.EndKey can't be local because it contains range split points, which
80-
// are never local.
181+
// rs.EndKey can't be local because it contains range split points,
182+
// which are never local.
81183
changed = true
82184
if !local {
83185
header.EndKey = rs.EndKey.AsRawKey()
84186
} else {
85-
// The local end key should be truncated to the boundary of local keys which
86-
// address to rs.EndKey.
187+
// The local end key should be truncated to the boundary of
188+
// local keys which address to rs.EndKey.
87189
header.EndKey = keys.MakeRangeKeyPrefix(rs.EndKey)
88190
}
89191
}
@@ -122,18 +224,15 @@ func Truncate(
122224
return truncReqs, positions, nil
123225
}
124226

125-
// prev gives the right boundary of the union of all requests which don't
227+
// prevLegacy gives the right boundary of the union of all requests which don't
126228
// affect keys larger than the given key. Note that a right boundary is
127-
// exclusive, that is, the returned RKey is to be used as the exclusive
128-
// right endpoint in finding the next range to query.
229+
// exclusive, that is, the returned RKey is to be used as the exclusive right
230+
// endpoint in finding the next range to query.
129231
//
130-
// Informally, a call `prev(reqs, k)` means: we've already executed the parts
131-
// of `reqs` that intersect `[k, KeyMax)`; please tell me how far to the
232+
// Informally, a call `prevLegacy(reqs, k)` means: we've already executed the
233+
// parts of `reqs` that intersect `[k, KeyMax)`; please tell me how far to the
132234
// left the next relevant request begins.
133-
//
134-
// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull
135-
// 'keys' into 'roachpb'.
136-
func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
235+
func prevLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
137236
candidate := roachpb.RKeyMin
138237
for _, union := range reqs {
139238
inner := union.GetInner()
@@ -144,16 +243,18 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
144243
}
145244
endKey := h.EndKey
146245
if len(endKey) == 0 {
147-
// If we have a point request for `x < k` then that request has not been
148-
// satisfied (since the batch has only been executed for keys `>=k`). We
149-
// treat `x` as `[x, x.Next())` which does the right thing below. This
150-
// also works when `x > k` or `x=k` as the logic below will skip `x`.
246+
// If we have a point request for `x < k` then that request has not
247+
// been satisfied (since the batch has only been executed for keys
248+
// `>=k`). We treat `x` as `[x, x.Next())` which does the right
249+
// thing below. This also works when `x > k` or `x=k` as the logic
250+
// below will skip `x`.
151251
//
152252
// Note that if the key is /Local/x/something, then instead of using
153-
// /Local/x/something.Next() as the end key, we rely on AddrUpperBound to
154-
// handle local keys. In particular, AddrUpperBound will turn it into
155-
// `x\x00`, so we're looking at the key-range `[x, x.Next())`. This is
156-
// exactly what we want as the local key is contained in that range.
253+
// /Local/x/something.Next() as the end key, we rely on
254+
// AddrUpperBound to handle local keys. In particular,
255+
// AddrUpperBound will turn it into `x\x00`, so we're looking at the
256+
// key-range `[x, x.Next())`. This is exactly what we want as the
257+
// local key is contained in that range.
157258
//
158259
// See TestBatchPrevNext for test cases with commentary.
159260
endKey = h.Key.Next()
@@ -191,18 +292,15 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
191292
return candidate, nil
192293
}
193294

194-
// Next gives the left boundary of the union of all requests which don't affect
195-
// keys less than the given key. Note that the left boundary is inclusive, that
196-
// is, the returned RKey is the inclusive left endpoint of the keys the request
197-
// should operate on next.
198-
//
199-
// Informally, a call `Next(reqs, k)` means: we've already executed the parts of
200-
// `reqs` that intersect `[KeyMin, k)`; please tell me how far to the right the
201-
// next relevant request begins.
202-
//
203-
// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull
204-
// 'keys' into 'proto'.
205-
func Next(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
295+
// nextLegacy gives the left boundary of the union of all requests which don't
296+
// affect keys less than the given key. Note that the left boundary is
297+
// inclusive, that is, the returned RKey is the inclusive left endpoint of the
298+
// keys the request should operate on next.
299+
//
300+
// Informally, a call `nextLegacy(reqs, k)` means: we've already executed the
301+
// parts of `reqs` that intersect `[KeyMin, k)`; please tell me how far to the
302+
// right the next relevant request begins.
303+
func nextLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
206304
candidate := roachpb.RKeyMax
207305
for _, union := range reqs {
208306
inner := union.GetInner()

0 commit comments

Comments
 (0)