Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbopack/crates/turbo-persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ quick_cache = { version = "0.6.9" }
rayon = { workspace = true }
rustc-hash = { workspace = true }
serde = { workspace = true }
smallvec = { workspace = true}
thread_local = { workspace = true }
twox-hash = { version = "2.0.1", features = ["xxhash64"] }
zstd = { version = "0.13.2", features = ["zdict_builder"] }
Expand Down
11 changes: 8 additions & 3 deletions turbopack/crates/turbo-persistence/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
DATA_THRESHOLD_PER_INITIAL_FILE, MAX_ENTRIES_PER_INITIAL_FILE, MAX_SMALL_VALUE_SIZE,
},
key::{hash_key, StoreKey},
ValueBuffer,
};

/// A collector accumulates entries that should be eventually written to a file. It keeps track of
Expand Down Expand Up @@ -36,15 +37,19 @@ impl<K: StoreKey> Collector<K> {
}

/// Adds a normal key-value pair to the collector.
pub fn put(&mut self, key: K, value: Vec<u8>) {
pub fn put(&mut self, key: K, value: ValueBuffer) {
let key = EntryKey {
hash: hash_key(&key),
data: key,
};
let value = if value.len() > MAX_SMALL_VALUE_SIZE {
CollectorEntryValue::Medium { value }
CollectorEntryValue::Medium {
value: value.into_vec(),
}
} else {
CollectorEntryValue::Small { value }
CollectorEntryValue::Small {
value: value.into_small_vec(),
}
};
self.total_key_size += key.len();
self.total_value_size += value.len();
Expand Down
4 changes: 3 additions & 1 deletion turbopack/crates/turbo-persistence/src/collector_entry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::cmp::Ordering;

use smallvec::SmallVec;

use crate::{
key::StoreKey,
static_sorted_file_builder::{Entry, EntryValue},
Expand All @@ -11,7 +13,7 @@ pub struct CollectorEntry<K: StoreKey> {
}

pub enum CollectorEntryValue {
Small { value: Vec<u8> },
Small { value: SmallVec<[u8; 16]> },
Medium { value: Vec<u8> },
Large { blob: u32 },
Deleted,
Expand Down
30 changes: 29 additions & 1 deletion turbopack/crates/turbo-persistence/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use std::{cmp::min, hash::Hasher};
pub trait KeyBase {
/// Returns the length of the key in bytes.
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
/// Hashes the key. It should not include the structure of the key, only the data. E.g. `([1,
/// 2], [3, 4])` should hash the same as `[1, 2, 3, 4]`.
fn hash<H: Hasher>(&self, state: &mut H);
Expand All @@ -14,6 +17,10 @@ impl KeyBase for &'_ [u8] {
<[u8]>::len(self)
}

fn is_empty(&self) -> bool {
<[u8]>::is_empty(self)
}

fn hash<H: Hasher>(&self, state: &mut H) {
for item in *self {
state.write_u8(*item);
Expand All @@ -23,7 +30,11 @@ impl KeyBase for &'_ [u8] {

impl<const N: usize> KeyBase for [u8; N] {
fn len(&self) -> usize {
self[..].len()
N
}

fn is_empty(&self) -> bool {
N > 0
}

fn hash<H: Hasher>(&self, state: &mut H) {
Expand All @@ -38,6 +49,10 @@ impl KeyBase for Vec<u8> {
self.len()
}

fn is_empty(&self) -> bool {
self.is_empty()
}

fn hash<H: Hasher>(&self, state: &mut H) {
for item in self {
state.write_u8(*item);
Expand All @@ -50,6 +65,10 @@ impl KeyBase for u8 {
1
}

fn is_empty(&self) -> bool {
false
}

fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u8(*self);
}
Expand All @@ -61,6 +80,11 @@ impl<A: KeyBase, B: KeyBase> KeyBase for (A, B) {
a.len() + b.len()
}

fn is_empty(&self) -> bool {
let (a, b) = self;
a.is_empty() && b.is_empty()
}

fn hash<H: Hasher>(&self, state: &mut H) {
let (a, b) = self;
KeyBase::hash(a, state);
Expand All @@ -73,6 +97,10 @@ impl<T: KeyBase> KeyBase for &'_ T {
(*self).len()
}

fn is_empty(&self) -> bool {
(*self).is_empty()
}

fn hash<H: Hasher>(&self, state: &mut H) {
(*self).hash(state)
}
Expand Down
4 changes: 3 additions & 1 deletion turbopack/crates/turbo-persistence/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ mod write_batch;

#[cfg(test)]
mod tests;
mod value_buf;

pub use arc_slice::ArcSlice;
pub use db::TurboPersistence;
pub use key::{QueryKey, StoreKey};
pub use key::{KeyBase, QueryKey, StoreKey};
pub use value_buf::ValueBuffer;
pub use write_batch::WriteBatch;
66 changes: 66 additions & 0 deletions turbopack/crates/turbo-persistence/src/value_buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::{borrow::Cow, ops::Deref};

use smallvec::SmallVec;

pub enum ValueBuffer<'l> {
Borrowed(&'l [u8]),
Vec(Vec<u8>),
SmallVec(SmallVec<[u8; 16]>),
}

impl ValueBuffer<'_> {
pub fn into_vec(self) -> Vec<u8> {
match self {
ValueBuffer::Borrowed(b) => b.to_vec(),
ValueBuffer::Vec(v) => v,
ValueBuffer::SmallVec(sv) => sv.into_vec(),
}
}

pub fn into_small_vec(self) -> SmallVec<[u8; 16]> {
match self {
ValueBuffer::Borrowed(b) => SmallVec::from_slice(b),
ValueBuffer::Vec(v) => SmallVec::from_vec(v),
ValueBuffer::SmallVec(sv) => sv,
}
}
}

impl<'l> From<&'l [u8]> for ValueBuffer<'l> {
fn from(b: &'l [u8]) -> Self {
ValueBuffer::Borrowed(b)
}
}

impl From<Vec<u8>> for ValueBuffer<'_> {
fn from(v: Vec<u8>) -> Self {
ValueBuffer::Vec(v)
}
}

impl From<SmallVec<[u8; 16]>> for ValueBuffer<'_> {
fn from(sv: SmallVec<[u8; 16]>) -> Self {
ValueBuffer::SmallVec(sv)
}
}

impl<'l> From<Cow<'l, [u8]>> for ValueBuffer<'l> {
fn from(c: Cow<'l, [u8]>) -> Self {
match c {
Cow::Borrowed(b) => ValueBuffer::Borrowed(b),
Cow::Owned(o) => ValueBuffer::Vec(o),
}
}
}

impl Deref for ValueBuffer<'_> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
match self {
ValueBuffer::Borrowed(b) => b,
ValueBuffer::Vec(v) => v,
ValueBuffer::SmallVec(sv) => sv,
}
}
}
7 changes: 3 additions & 4 deletions turbopack/crates/turbo-persistence/src/write_batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
borrow::Cow,
cell::UnsafeCell,
fs::File,
io::Write,
Expand All @@ -20,7 +19,7 @@ use thread_local::ThreadLocal;

use crate::{
collector::Collector, collector_entry::CollectorEntry, constants::MAX_MEDIUM_VALUE_SIZE,
key::StoreKey, static_sorted_file_builder::StaticSortedFileBuilder,
key::StoreKey, static_sorted_file_builder::StaticSortedFileBuilder, ValueBuffer,
};

/// The thread local state of a `WriteBatch`.
Expand Down Expand Up @@ -107,11 +106,11 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
}

/// Puts a key-value pair into the write batch.
pub fn put(&self, family: usize, key: K, value: Cow<'_, [u8]>) -> Result<()> {
pub fn put(&self, family: usize, key: K, value: ValueBuffer<'_>) -> Result<()> {
let state = self.thread_local_state();
let collector = self.collector_mut(state, family)?;
if value.len() <= MAX_MEDIUM_VALUE_SIZE {
collector.put(key, value.into_owned());
collector.put(key, value);
} else {
let (blob, file) = self.create_blob(&value)?;
collector.put_blob(key, blob);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
borrow::Cow,
fs,
path::Path,
sync::atomic::{AtomicBool, Ordering},
Expand All @@ -9,7 +8,9 @@ use anyhow::Result;

use crate::database::{
key_value_database::{KeySpace, KeyValueDatabase},
write_batch::{BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch},
write_batch::{
BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBuffer,
},
};

pub fn is_fresh(path: &Path) -> bool {
Expand Down Expand Up @@ -124,23 +125,28 @@ impl<'a, B: BaseWriteBatch<'a>> BaseWriteBatch<'a> for FreshDbOptimizationWriteB
}

impl<'a, B: SerialWriteBatch<'a>> SerialWriteBatch<'a> for FreshDbOptimizationWriteBatch<'a, B> {
fn put(&mut self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> {
fn put(
&mut self,
key_space: KeySpace,
key: WriteBuffer<'_>,
value: WriteBuffer<'_>,
) -> Result<()> {
self.write_batch.put(key_space, key, value)
}

fn delete(&mut self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> {
fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> {
self.write_batch.delete(key_space, key)
}
}

impl<'a, B: ConcurrentWriteBatch<'a>> ConcurrentWriteBatch<'a>
for FreshDbOptimizationWriteBatch<'a, B>
{
fn put(&self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> {
fn put(&self, key_space: KeySpace, key: WriteBuffer<'_>, value: WriteBuffer<'_>) -> Result<()> {
self.write_batch.put(key_space, key, value)
}

fn delete(&self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> {
fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> {
self.write_batch.delete(key_space, key)
}
}
13 changes: 9 additions & 4 deletions turbopack/crates/turbo-tasks-backend/src/database/lmdb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{borrow::Cow, fs::create_dir_all, path::Path, thread::available_parallelism};
use std::{fs::create_dir_all, path::Path, thread::available_parallelism};

use anyhow::{Context, Result};
use lmdb::{
Expand All @@ -8,7 +8,7 @@ use lmdb::{

use crate::database::{
key_value_database::{KeySpace, KeyValueDatabase},
write_batch::{BaseWriteBatch, SerialWriteBatch, WriteBatch},
write_batch::{BaseWriteBatch, SerialWriteBatch, WriteBatch, WriteBuffer},
};

mod extended_key;
Expand Down Expand Up @@ -164,7 +164,12 @@ impl<'a> BaseWriteBatch<'a> for LmbdWriteBatch<'a> {
}

impl<'a> SerialWriteBatch<'a> for LmbdWriteBatch<'a> {
fn put(&mut self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> {
fn put(
&mut self,
key_space: KeySpace,
key: WriteBuffer<'_>,
value: WriteBuffer<'_>,
) -> Result<()> {
extended_key::put(
&mut self.tx,
self.this.db(key_space),
Expand All @@ -175,7 +180,7 @@ impl<'a> SerialWriteBatch<'a> for LmbdWriteBatch<'a> {
Ok(())
}

fn delete(&mut self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> {
fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> {
extended_key::delete(
&mut self.tx,
self.this.db(key_space),
Expand Down
24 changes: 17 additions & 7 deletions turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::borrow::Cow;

use anyhow::Result;

use crate::database::{
key_value_database::{KeySpace, KeyValueDatabase},
write_batch::{BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch},
write_batch::{
BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBuffer,
},
};

pub struct NoopKvDb;
Expand Down Expand Up @@ -78,21 +78,31 @@ impl<'a> BaseWriteBatch<'a> for NoopWriteBatch {
}

impl SerialWriteBatch<'_> for NoopWriteBatch {
fn put(&mut self, _key_space: KeySpace, _key: Cow<[u8]>, _value: Cow<[u8]>) -> Result<()> {
fn put(
&mut self,
_key_space: KeySpace,
_key: WriteBuffer<'_>,
_value: WriteBuffer<'_>,
) -> Result<()> {
Ok(())
}

fn delete(&mut self, _key_space: KeySpace, _key: Cow<[u8]>) -> Result<()> {
fn delete(&mut self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> {
Ok(())
}
}

impl ConcurrentWriteBatch<'_> for NoopWriteBatch {
fn put(&self, _key_space: KeySpace, _key: Cow<[u8]>, _value: Cow<[u8]>) -> Result<()> {
fn put(
&self,
_key_space: KeySpace,
_key: WriteBuffer<'_>,
_value: WriteBuffer<'_>,
) -> Result<()> {
Ok(())
}

fn delete(&self, _key_space: KeySpace, _key: Cow<[u8]>) -> Result<()> {
fn delete(&self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> {
Ok(())
}
}
Loading
Loading