Skip to content

Commit 7ec73fd

Browse files
authored
import: sst_importer support download SST and rewrite into keyspace data. (tikv#14046)
ref tikv#12999 import: sst_importer support download SST and rewrite into keyspace data. Signed-off-by: iosmanthus <[email protected]>
1 parent 0ce3485 commit 7ec73fd

7 files changed

Lines changed: 95 additions & 24 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/keys/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ kvproto = { workspace = true }
1010
log_wrappers = { workspace = true }
1111
thiserror = "1.0"
1212
tikv_alloc = { workspace = true }
13+
tikv_util = { workspace = true }
1314

1415
[dev-dependencies]
1516
panic_hook = { workspace = true }

components/keys/src/rewrite.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,21 @@
66
77
use std::ops::Bound::{self, *};
88

9+
use tikv_util::codec::bytes::encode_bytes;
10+
911
/// An error indicating the key cannot be rewritten because it does not start
1012
/// with the given prefix.
1113
#[derive(PartialEq, Debug, Clone)]
1214
pub struct WrongPrefix;
1315

16+
pub fn encode_bound(bound: Bound<Vec<u8>>) -> Bound<Vec<u8>> {
17+
match bound {
18+
Included(k) => Included(encode_bytes(&k)),
19+
Excluded(k) => Excluded(encode_bytes(&k)),
20+
Unbounded => Unbounded,
21+
}
22+
}
23+
1424
/// Rewrites the prefix of a byte array.
1525
pub fn rewrite_prefix(
1626
old_prefix: &[u8],

components/sst_importer/src/import_file.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use engine_traits::{
1515
iter_option, EncryptionKeyManager, Iterator, KvEngine, RefIterable, SstMetaInfo, SstReader,
1616
};
1717
use file_system::{get_io_rate_limiter, sync_dir, File, OpenOptions};
18+
use keys::data_key;
1819
use kvproto::{import_sstpb::*, kvrpcpb::ApiVersion};
1920
use tikv_util::time::Instant;
2021
use uuid::{Builder as UuidBuilder, Uuid};
@@ -336,7 +337,7 @@ impl ImportDir {
336337
let sst_reader = RocksSstReader::open_with_env(path_str, Some(env))?;
337338

338339
for &(start, end) in TIDB_RANGES_COMPLEMENT {
339-
let opt = iter_option(start, end, false);
340+
let opt = iter_option(&data_key(start), &data_key(end), false);
340341
let mut iter = sst_reader.iter(opt)?;
341342
if iter.seek(start)? {
342343
error!(

components/sst_importer/src/sst_importer.rs

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ use kvproto::{
3232
kvrpcpb::ApiVersion,
3333
};
3434
use tikv_util::{
35-
codec::stream_event::{EventEncoder, EventIterator, Iterator as EIterator},
35+
codec::{
36+
bytes::{decode_bytes_in_place, encode_bytes},
37+
stream_event::{EventEncoder, EventIterator, Iterator as EIterator},
38+
},
3639
config::ReadableSize,
3740
stream::block_on_external_io,
3841
sys::SysQuota,
@@ -53,13 +56,18 @@ use crate::{
5356
#[derive(Default, Debug, Clone)]
5457
pub struct DownloadExt<'a> {
5558
cache_key: Option<&'a str>,
59+
req_type: DownloadRequestType,
5660
}
5761

5862
impl<'a> DownloadExt<'a> {
59-
pub fn cache_key(self, key: &'a str) -> Self {
60-
Self {
61-
cache_key: Some(key),
62-
}
63+
pub fn cache_key(mut self, key: &'a str) -> Self {
64+
self.cache_key = Some(key);
65+
self
66+
}
67+
68+
pub fn req_type(mut self, req_type: DownloadRequestType) -> Self {
69+
self.req_type = req_type;
70+
self
6371
}
6472
}
6573

@@ -896,16 +904,20 @@ impl SstImporter {
896904
let sst_reader = RocksSstReader::open_with_env(dst_file_name, Some(env))?;
897905
sst_reader.verify_checksum()?;
898906

907+
// undo key rewrite so we could compare with the keys inside SST
908+
let old_prefix = rewrite_rule.get_old_key_prefix();
909+
let new_prefix = rewrite_rule.get_new_key_prefix();
910+
let req_type = ext.req_type;
911+
899912
debug!("downloaded file and verified";
900913
"meta" => ?meta,
901914
"name" => name,
902915
"path" => dst_file_name,
916+
"old_prefix" => log_wrappers::Value::key(old_prefix),
917+
"new_prefix" => log_wrappers::Value::key(new_prefix),
918+
"req_type" => ?req_type,
903919
);
904920

905-
// undo key rewrite so we could compare with the keys inside SST
906-
let old_prefix = rewrite_rule.get_old_key_prefix();
907-
let new_prefix = rewrite_rule.get_new_key_prefix();
908-
909921
let range_start = meta.get_range().get_start();
910922
let range_end = meta.get_range().get_end();
911923
let range_start_bound = key_to_bound(range_start);
@@ -915,21 +927,26 @@ impl SstImporter {
915927
key_to_bound(range_end)
916928
};
917929

918-
let range_start =
930+
let mut range_start =
919931
keys::rewrite::rewrite_prefix_of_start_bound(new_prefix, old_prefix, range_start_bound)
920932
.map_err(|_| Error::WrongKeyPrefix {
921933
what: "SST start range",
922934
key: range_start.to_vec(),
923935
prefix: new_prefix.to_vec(),
924936
})?;
925-
let range_end =
937+
let mut range_end =
926938
keys::rewrite::rewrite_prefix_of_end_bound(new_prefix, old_prefix, range_end_bound)
927939
.map_err(|_| Error::WrongKeyPrefix {
928940
what: "SST end range",
929941
key: range_end.to_vec(),
930942
prefix: new_prefix.to_vec(),
931943
})?;
932944

945+
if req_type == DownloadRequestType::Keyspace {
946+
range_start = keys::rewrite::encode_bound(range_start);
947+
range_end = keys::rewrite::encode_bound(range_end);
948+
}
949+
933950
let start_rename_rewrite = Instant::now();
934951
// read the first and last keys from the SST, determine if we could
935952
// simply move the entire SST instead of iterating and generate a new one.
@@ -942,9 +959,15 @@ impl SstImporter {
942959
return Ok(None);
943960
}
944961
if !iter.seek_to_first()? {
962+
let mut range = meta.get_range().clone();
963+
if req_type == DownloadRequestType::Keyspace {
964+
*range.mut_start() = encode_bytes(&range.take_start());
965+
*range.mut_end() = encode_bytes(&range.take_end());
966+
}
945967
// the SST is empty, so no need to iterate at all (should be impossible?)
946-
return Ok(Some(meta.get_range().clone()));
968+
return Ok(Some(range));
947969
}
970+
948971
let start_key = keys::origin_key(iter.key());
949972
if is_before_start_bound(start_key, &range_start) {
950973
// SST's start is before the range to consume, so needs to iterate to skip over
@@ -995,8 +1018,10 @@ impl SstImporter {
9951018
}
9961019

9971020
// perform iteration and key rewrite.
998-
let mut key = keys::data_key(new_prefix);
999-
let new_prefix_data_key_len = key.len();
1021+
let mut data_key = keys::DATA_PREFIX_KEY.to_vec();
1022+
let data_key_prefix_len = keys::DATA_PREFIX_KEY.len();
1023+
let mut user_key = new_prefix.to_vec();
1024+
let user_key_prefix_len = new_prefix.len();
10001025
let mut first_key = None;
10011026

10021027
match range_start {
@@ -1016,23 +1041,44 @@ impl SstImporter {
10161041
.unwrap();
10171042

10181043
while iter.valid()? {
1019-
let old_key = keys::origin_key(iter.key());
1020-
if is_after_end_bound(old_key, &range_end) {
1044+
let mut old_key = Cow::Borrowed(keys::origin_key(iter.key()));
1045+
let mut ts = None;
1046+
1047+
if is_after_end_bound(old_key.as_ref(), &range_end) {
10211048
break;
10221049
}
1050+
1051+
if req_type == DownloadRequestType::Keyspace {
1052+
ts = Some(Key::decode_ts_bytes_from(old_key.as_ref())?.to_owned());
1053+
old_key = {
1054+
let mut key = old_key.to_vec();
1055+
decode_bytes_in_place(&mut key, false)?;
1056+
Cow::Owned(key)
1057+
};
1058+
}
1059+
10231060
if !old_key.starts_with(old_prefix) {
10241061
return Err(Error::WrongKeyPrefix {
10251062
what: "Key in SST",
10261063
key: keys::origin_key(iter.key()).to_vec(),
10271064
prefix: old_prefix.to_vec(),
10281065
});
10291066
}
1030-
key.truncate(new_prefix_data_key_len);
1031-
key.extend_from_slice(&old_key[old_prefix.len()..]);
1067+
1068+
data_key.truncate(data_key_prefix_len);
1069+
user_key.truncate(user_key_prefix_len);
1070+
user_key.extend_from_slice(&old_key[old_prefix.len()..]);
1071+
if req_type == DownloadRequestType::Keyspace {
1072+
data_key.extend(encode_bytes(&user_key));
1073+
data_key.extend(ts.unwrap());
1074+
} else {
1075+
data_key.extend_from_slice(&user_key);
1076+
}
1077+
10321078
let mut value = Cow::Borrowed(iter.value());
10331079

10341080
if rewrite_rule.new_timestamp != 0 {
1035-
key = Key::from_encoded(key)
1081+
data_key = Key::from_encoded(data_key)
10361082
.truncate_ts()
10371083
.map_err(|e| {
10381084
Error::BadFormat(format!(
@@ -1056,10 +1102,10 @@ impl SstImporter {
10561102
}
10571103
}
10581104

1059-
sst_writer.put(&key, &value)?;
1105+
sst_writer.put(&data_key, &value)?;
10601106
iter.next()?;
10611107
if first_key.is_none() {
1062-
first_key = Some(keys::origin_key(&key).to_vec());
1108+
first_key = Some(keys::origin_key(&data_key).to_vec());
10631109
}
10641110
}
10651111

@@ -1078,7 +1124,7 @@ impl SstImporter {
10781124

10791125
let mut final_range = Range::default();
10801126
final_range.set_start(start_key);
1081-
final_range.set_end(keys::origin_key(&key).to_vec());
1127+
final_range.set_end(keys::origin_key(&data_key).to_vec());
10821128
Ok(Some(final_range))
10831129
} else {
10841130
// nothing is written: prevents finishing the SST at all.

components/txn_types/src/types.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,16 @@ impl Key {
192192
Ok(number::decode_u64_desc(&mut ts)?.into())
193193
}
194194

195+
/// Decode the timestamp from a ts encoded key and return in bytes.
196+
#[inline]
197+
pub fn decode_ts_bytes_from(key: &[u8]) -> Result<&[u8], codec::Error> {
198+
let len = key.len();
199+
if len < number::U64_SIZE {
200+
return Err(codec::Error::KeyLength);
201+
}
202+
Ok(&key[key.len() - number::U64_SIZE..])
203+
}
204+
195205
/// Whether the user key part of a ts encoded key `ts_encoded_key` equals to
196206
/// the encoded user key `user_key`.
197207
///

src/import/sst_service.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,9 @@ where
783783
cipher,
784784
limiter,
785785
engine,
786-
DownloadExt::default().cache_key(req.get_storage_cache_id()),
786+
DownloadExt::default()
787+
.cache_key(req.get_storage_cache_id())
788+
.req_type(req.get_request_type()),
787789
);
788790
let mut resp = DownloadResponse::default();
789791
match res.await {

0 commit comments

Comments
 (0)