diff --git a/Cargo.lock b/Cargo.lock index 5eca723de290c9..feb80afde4e894 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9204,6 +9204,7 @@ dependencies = [ "rayon", "rustc-hash 2.1.0", "serde", + "smallvec", "tempfile", "thread_local", "twox-hash 2.1.0", diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index cc4e01d0ab70d4..2efd376c8e14f2 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -22,6 +22,7 @@ quick_cache = { version = "0.6.9" } rayon = { workspace = true } rustc-hash = { workspace = true } serde = { workspace = true } +smallvec = { workspace = true} thread_local = { workspace = true } twox-hash = { version = "2.0.1", features = ["xxhash64"] } zstd = { version = "0.13.2", features = ["zdict_builder"] } diff --git a/turbopack/crates/turbo-persistence/src/collector.rs b/turbopack/crates/turbo-persistence/src/collector.rs index bfd507be294e02..82e82b9cf92013 100644 --- a/turbopack/crates/turbo-persistence/src/collector.rs +++ b/turbopack/crates/turbo-persistence/src/collector.rs @@ -4,6 +4,7 @@ use crate::{ DATA_THRESHOLD_PER_INITIAL_FILE, MAX_ENTRIES_PER_INITIAL_FILE, MAX_SMALL_VALUE_SIZE, }, key::{hash_key, StoreKey}, + ValueBuffer, }; /// A collector accumulates entries that should be eventually written to a file. It keeps track of @@ -36,15 +37,19 @@ impl Collector { } /// Adds a normal key-value pair to the collector. - pub fn put(&mut self, key: K, value: Vec) { + pub fn put(&mut self, key: K, value: ValueBuffer) { let key = EntryKey { hash: hash_key(&key), data: key, }; let value = if value.len() > MAX_SMALL_VALUE_SIZE { - CollectorEntryValue::Medium { value } + CollectorEntryValue::Medium { + value: value.into_vec(), + } } else { - CollectorEntryValue::Small { value } + CollectorEntryValue::Small { + value: value.into_small_vec(), + } }; self.total_key_size += key.len(); self.total_value_size += value.len(); diff --git a/turbopack/crates/turbo-persistence/src/collector_entry.rs b/turbopack/crates/turbo-persistence/src/collector_entry.rs index a27db6d1119dc4..88f49821d3c388 100644 --- a/turbopack/crates/turbo-persistence/src/collector_entry.rs +++ b/turbopack/crates/turbo-persistence/src/collector_entry.rs @@ -1,5 +1,7 @@ use std::cmp::Ordering; +use smallvec::SmallVec; + use crate::{ key::StoreKey, static_sorted_file_builder::{Entry, EntryValue}, @@ -11,7 +13,7 @@ pub struct CollectorEntry { } pub enum CollectorEntryValue { - Small { value: Vec }, + Small { value: SmallVec<[u8; 16]> }, Medium { value: Vec }, Large { blob: u32 }, Deleted, diff --git a/turbopack/crates/turbo-persistence/src/key.rs b/turbopack/crates/turbo-persistence/src/key.rs index 99c06ba820982c..f10a6ff8ac5c27 100644 --- a/turbopack/crates/turbo-persistence/src/key.rs +++ b/turbopack/crates/turbo-persistence/src/key.rs @@ -4,6 +4,9 @@ use std::{cmp::min, hash::Hasher}; pub trait KeyBase { /// Returns the length of the key in bytes. fn len(&self) -> usize; + fn is_empty(&self) -> bool { + self.len() == 0 + } /// Hashes the key. It should not include the structure of the key, only the data. E.g. `([1, /// 2], [3, 4])` should hash the same as `[1, 2, 3, 4]`. fn hash(&self, state: &mut H); @@ -14,6 +17,10 @@ impl KeyBase for &'_ [u8] { <[u8]>::len(self) } + fn is_empty(&self) -> bool { + <[u8]>::is_empty(self) + } + fn hash(&self, state: &mut H) { for item in *self { state.write_u8(*item); @@ -23,7 +30,11 @@ impl KeyBase for &'_ [u8] { impl KeyBase for [u8; N] { fn len(&self) -> usize { - self[..].len() + N + } + + fn is_empty(&self) -> bool { + N > 0 } fn hash(&self, state: &mut H) { @@ -38,6 +49,10 @@ impl KeyBase for Vec { self.len() } + fn is_empty(&self) -> bool { + self.is_empty() + } + fn hash(&self, state: &mut H) { for item in self { state.write_u8(*item); @@ -50,6 +65,10 @@ impl KeyBase for u8 { 1 } + fn is_empty(&self) -> bool { + false + } + fn hash(&self, state: &mut H) { state.write_u8(*self); } @@ -61,6 +80,11 @@ impl KeyBase for (A, B) { a.len() + b.len() } + fn is_empty(&self) -> bool { + let (a, b) = self; + a.is_empty() && b.is_empty() + } + fn hash(&self, state: &mut H) { let (a, b) = self; KeyBase::hash(a, state); @@ -73,6 +97,10 @@ impl KeyBase for &'_ T { (*self).len() } + fn is_empty(&self) -> bool { + (*self).is_empty() + } + fn hash(&self, state: &mut H) { (*self).hash(state) } diff --git a/turbopack/crates/turbo-persistence/src/lib.rs b/turbopack/crates/turbo-persistence/src/lib.rs index 3069de7069418f..fd4473e6102f8e 100644 --- a/turbopack/crates/turbo-persistence/src/lib.rs +++ b/turbopack/crates/turbo-persistence/src/lib.rs @@ -17,8 +17,10 @@ mod write_batch; #[cfg(test)] mod tests; +mod value_buf; pub use arc_slice::ArcSlice; pub use db::TurboPersistence; -pub use key::{QueryKey, StoreKey}; +pub use key::{KeyBase, QueryKey, StoreKey}; +pub use value_buf::ValueBuffer; pub use write_batch::WriteBatch; diff --git a/turbopack/crates/turbo-persistence/src/value_buf.rs b/turbopack/crates/turbo-persistence/src/value_buf.rs new file mode 100644 index 00000000000000..8e961a60c596d7 --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/value_buf.rs @@ -0,0 +1,66 @@ +use std::{borrow::Cow, ops::Deref}; + +use smallvec::SmallVec; + +pub enum ValueBuffer<'l> { + Borrowed(&'l [u8]), + Vec(Vec), + SmallVec(SmallVec<[u8; 16]>), +} + +impl ValueBuffer<'_> { + pub fn into_vec(self) -> Vec { + match self { + ValueBuffer::Borrowed(b) => b.to_vec(), + ValueBuffer::Vec(v) => v, + ValueBuffer::SmallVec(sv) => sv.into_vec(), + } + } + + pub fn into_small_vec(self) -> SmallVec<[u8; 16]> { + match self { + ValueBuffer::Borrowed(b) => SmallVec::from_slice(b), + ValueBuffer::Vec(v) => SmallVec::from_vec(v), + ValueBuffer::SmallVec(sv) => sv, + } + } +} + +impl<'l> From<&'l [u8]> for ValueBuffer<'l> { + fn from(b: &'l [u8]) -> Self { + ValueBuffer::Borrowed(b) + } +} + +impl From> for ValueBuffer<'_> { + fn from(v: Vec) -> Self { + ValueBuffer::Vec(v) + } +} + +impl From> for ValueBuffer<'_> { + fn from(sv: SmallVec<[u8; 16]>) -> Self { + ValueBuffer::SmallVec(sv) + } +} + +impl<'l> From> for ValueBuffer<'l> { + fn from(c: Cow<'l, [u8]>) -> Self { + match c { + Cow::Borrowed(b) => ValueBuffer::Borrowed(b), + Cow::Owned(o) => ValueBuffer::Vec(o), + } + } +} + +impl Deref for ValueBuffer<'_> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + match self { + ValueBuffer::Borrowed(b) => b, + ValueBuffer::Vec(v) => v, + ValueBuffer::SmallVec(sv) => sv, + } + } +} diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 19bb575ea05284..6eb97c3742b7e4 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -1,5 +1,4 @@ use std::{ - borrow::Cow, cell::UnsafeCell, fs::File, io::Write, @@ -20,7 +19,7 @@ use thread_local::ThreadLocal; use crate::{ collector::Collector, collector_entry::CollectorEntry, constants::MAX_MEDIUM_VALUE_SIZE, - key::StoreKey, static_sorted_file_builder::StaticSortedFileBuilder, + key::StoreKey, static_sorted_file_builder::StaticSortedFileBuilder, ValueBuffer, }; /// The thread local state of a `WriteBatch`. @@ -107,11 +106,11 @@ impl WriteBatch { } /// Puts a key-value pair into the write batch. - pub fn put(&self, family: usize, key: K, value: Cow<'_, [u8]>) -> Result<()> { + pub fn put(&self, family: usize, key: K, value: ValueBuffer<'_>) -> Result<()> { let state = self.thread_local_state(); let collector = self.collector_mut(state, family)?; if value.len() <= MAX_MEDIUM_VALUE_SIZE { - collector.put(key, value.into_owned()); + collector.put(key, value); } else { let (blob, file) = self.create_blob(&value)?; collector.put_blob(key, blob); diff --git a/turbopack/crates/turbo-tasks-backend/src/database/fresh_db_optimization.rs b/turbopack/crates/turbo-tasks-backend/src/database/fresh_db_optimization.rs index 9ffd276201dab2..3013091f1a480d 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/fresh_db_optimization.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/fresh_db_optimization.rs @@ -1,5 +1,4 @@ use std::{ - borrow::Cow, fs, path::Path, sync::atomic::{AtomicBool, Ordering}, @@ -9,7 +8,9 @@ use anyhow::Result; use crate::database::{ key_value_database::{KeySpace, KeyValueDatabase}, - write_batch::{BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch}, + write_batch::{ + BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBuffer, + }, }; pub fn is_fresh(path: &Path) -> bool { @@ -124,11 +125,16 @@ impl<'a, B: BaseWriteBatch<'a>> BaseWriteBatch<'a> for FreshDbOptimizationWriteB } impl<'a, B: SerialWriteBatch<'a>> SerialWriteBatch<'a> for FreshDbOptimizationWriteBatch<'a, B> { - fn put(&mut self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> { + fn put( + &mut self, + key_space: KeySpace, + key: WriteBuffer<'_>, + value: WriteBuffer<'_>, + ) -> Result<()> { self.write_batch.put(key_space, key, value) } - fn delete(&mut self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> { + fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { self.write_batch.delete(key_space, key) } } @@ -136,11 +142,11 @@ impl<'a, B: SerialWriteBatch<'a>> SerialWriteBatch<'a> for FreshDbOptimizationWr impl<'a, B: ConcurrentWriteBatch<'a>> ConcurrentWriteBatch<'a> for FreshDbOptimizationWriteBatch<'a, B> { - fn put(&self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> { + fn put(&self, key_space: KeySpace, key: WriteBuffer<'_>, value: WriteBuffer<'_>) -> Result<()> { self.write_batch.put(key_space, key, value) } - fn delete(&self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> { + fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { self.write_batch.delete(key_space, key) } } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/lmdb/mod.rs b/turbopack/crates/turbo-tasks-backend/src/database/lmdb/mod.rs index cad09f0408e678..2a986ce121985a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/lmdb/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/lmdb/mod.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, fs::create_dir_all, path::Path, thread::available_parallelism}; +use std::{fs::create_dir_all, path::Path, thread::available_parallelism}; use anyhow::{Context, Result}; use lmdb::{ @@ -8,7 +8,7 @@ use lmdb::{ use crate::database::{ key_value_database::{KeySpace, KeyValueDatabase}, - write_batch::{BaseWriteBatch, SerialWriteBatch, WriteBatch}, + write_batch::{BaseWriteBatch, SerialWriteBatch, WriteBatch, WriteBuffer}, }; mod extended_key; @@ -164,7 +164,12 @@ impl<'a> BaseWriteBatch<'a> for LmbdWriteBatch<'a> { } impl<'a> SerialWriteBatch<'a> for LmbdWriteBatch<'a> { - fn put(&mut self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> { + fn put( + &mut self, + key_space: KeySpace, + key: WriteBuffer<'_>, + value: WriteBuffer<'_>, + ) -> Result<()> { extended_key::put( &mut self.tx, self.this.db(key_space), @@ -175,7 +180,7 @@ impl<'a> SerialWriteBatch<'a> for LmbdWriteBatch<'a> { Ok(()) } - fn delete(&mut self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> { + fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { extended_key::delete( &mut self.tx, self.this.db(key_space), diff --git a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs index ddc824afcb4b3b..050140198e59cd 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs @@ -1,10 +1,10 @@ -use std::borrow::Cow; - use anyhow::Result; use crate::database::{ key_value_database::{KeySpace, KeyValueDatabase}, - write_batch::{BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch}, + write_batch::{ + BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBuffer, + }, }; pub struct NoopKvDb; @@ -78,21 +78,31 @@ impl<'a> BaseWriteBatch<'a> for NoopWriteBatch { } impl SerialWriteBatch<'_> for NoopWriteBatch { - fn put(&mut self, _key_space: KeySpace, _key: Cow<[u8]>, _value: Cow<[u8]>) -> Result<()> { + fn put( + &mut self, + _key_space: KeySpace, + _key: WriteBuffer<'_>, + _value: WriteBuffer<'_>, + ) -> Result<()> { Ok(()) } - fn delete(&mut self, _key_space: KeySpace, _key: Cow<[u8]>) -> Result<()> { + fn delete(&mut self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { Ok(()) } } impl ConcurrentWriteBatch<'_> for NoopWriteBatch { - fn put(&self, _key_space: KeySpace, _key: Cow<[u8]>, _value: Cow<[u8]>) -> Result<()> { + fn put( + &self, + _key_space: KeySpace, + _key: WriteBuffer<'_>, + _value: WriteBuffer<'_>, + ) -> Result<()> { Ok(()) } - fn delete(&self, _key_space: KeySpace, _key: Cow<[u8]>) -> Result<()> { + fn delete(&self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { Ok(()) } } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/read_transaction_cache.rs b/turbopack/crates/turbo-tasks-backend/src/database/read_transaction_cache.rs index c4ef64113de163..2296b7ca265e59 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/read_transaction_cache.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/read_transaction_cache.rs @@ -7,7 +7,9 @@ use thread_local::ThreadLocal; use crate::database::{ key_value_database::KeyValueDatabase, - write_batch::{BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch}, + write_batch::{ + BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBuffer, + }, }; struct ThreadLocalReadTransactionsContainer( @@ -184,15 +186,15 @@ impl<'a, T: KeyValueDatabase, B: SerialWriteBatch<'a>> SerialWriteBatch<'a> fn put( &mut self, key_space: super::key_value_database::KeySpace, - key: std::borrow::Cow<[u8]>, - value: std::borrow::Cow<[u8]>, + key: WriteBuffer<'_>, + value: WriteBuffer<'_>, ) -> Result<()> { self.write_batch.put(key_space, key, value) } fn delete( &mut self, key_space: super::key_value_database::KeySpace, - key: std::borrow::Cow<[u8]>, + key: WriteBuffer<'_>, ) -> Result<()> { self.write_batch.delete(key_space, key) } @@ -204,15 +206,15 @@ impl<'a, T: KeyValueDatabase, B: ConcurrentWriteBatch<'a>> ConcurrentWriteBatch< fn put( &self, key_space: super::key_value_database::KeySpace, - key: std::borrow::Cow<[u8]>, - value: std::borrow::Cow<[u8]>, + key: WriteBuffer<'_>, + value: WriteBuffer<'_>, ) -> Result<()> { self.write_batch.put(key_space, key, value) } fn delete( &self, key_space: super::key_value_database::KeySpace, - key: std::borrow::Cow<[u8]>, + key: WriteBuffer<'_>, ) -> Result<()> { self.write_batch.delete(key_space, key) } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/startup_cache.rs b/turbopack/crates/turbo-tasks-backend/src/database/startup_cache.rs index 58be14389fa454..583733760490c9 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/startup_cache.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/startup_cache.rs @@ -1,5 +1,5 @@ use std::{ - borrow::{Borrow, Cow}, + borrow::Borrow, fs::{self, File}, hash::BuildHasherDefault, io::{BufWriter, Read, Write}, @@ -16,7 +16,9 @@ use turbo_tasks::FxDashMap; use crate::database::{ by_key_space::ByKeySpace, key_value_database::{KeySpace, KeyValueDatabase}, - write_batch::{BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch}, + write_batch::{ + BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBuffer, + }, }; const CACHE_SIZE_LIMIT: usize = 100 * 1024 * 1024; @@ -268,7 +270,12 @@ impl<'a, B: BaseWriteBatch<'a>> BaseWriteBatch<'a> for StartupCacheWriteBatch<'a } impl<'a, B: SerialWriteBatch<'a>> SerialWriteBatch<'a> for StartupCacheWriteBatch<'a, B> { - fn put(&mut self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> { + fn put( + &mut self, + key_space: KeySpace, + key: WriteBuffer<'_>, + value: WriteBuffer<'_>, + ) -> Result<()> { if !self.fresh_db { let cache = self.cache.get(key_space); cache.insert(key.to_vec(), Some(value.to_vec())); @@ -276,7 +283,7 @@ impl<'a, B: SerialWriteBatch<'a>> SerialWriteBatch<'a> for StartupCacheWriteBatc self.batch.put(key_space, key, value) } - fn delete(&mut self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> { + fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { if !self.fresh_db { let cache = self.cache.get(key_space); cache.insert(key.to_vec(), None); @@ -286,7 +293,7 @@ impl<'a, B: SerialWriteBatch<'a>> SerialWriteBatch<'a> for StartupCacheWriteBatc } impl<'a, B: ConcurrentWriteBatch<'a>> ConcurrentWriteBatch<'a> for StartupCacheWriteBatch<'a, B> { - fn put(&self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> { + fn put(&self, key_space: KeySpace, key: WriteBuffer<'_>, value: WriteBuffer<'_>) -> Result<()> { if !self.fresh_db { let cache = self.cache.get(key_space); cache.insert(key.to_vec(), Some(value.to_vec())); @@ -294,7 +301,7 @@ impl<'a, B: ConcurrentWriteBatch<'a>> ConcurrentWriteBatch<'a> for StartupCacheW self.batch.put(key_space, key, value) } - fn delete(&self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> { + fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { if !self.fresh_db { let cache = self.cache.get(key_space); cache.insert(key.to_vec(), None); diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs index 3d1274689eb128..9a4e418bd81ef2 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs @@ -1,5 +1,4 @@ use std::{ - borrow::Cow, path::PathBuf, sync::Arc, thread::{spawn, JoinHandle}, @@ -7,11 +6,11 @@ use std::{ use anyhow::Result; use parking_lot::Mutex; -use turbo_persistence::{ArcSlice, TurboPersistence}; +use turbo_persistence::{ArcSlice, KeyBase, StoreKey, TurboPersistence, ValueBuffer}; use crate::database::{ key_value_database::{KeySpace, KeyValueDatabase}, - write_batch::{BaseWriteBatch, ConcurrentWriteBatch, WriteBatch}, + write_batch::{BaseWriteBatch, ConcurrentWriteBatch, WriteBatch, WriteBuffer}, }; const COMPACT_MAX_COVERAGE: f32 = 20.0; @@ -104,7 +103,7 @@ impl KeyValueDatabase for TurboKeyValueDatabase { } pub struct TurboWriteBatch<'a> { - batch: turbo_persistence::WriteBatch, 5>, + batch: turbo_persistence::WriteBatch, 5>, db: &'a Arc, compact_join_handle: &'a Mutex>>>, } @@ -137,11 +136,60 @@ impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> { } impl<'a> ConcurrentWriteBatch<'a> for TurboWriteBatch<'a> { - fn put(&self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> { - self.batch.put(key_space as usize, key.into_owned(), value) + fn put(&self, key_space: KeySpace, key: WriteBuffer<'_>, value: WriteBuffer<'_>) -> Result<()> { + self.batch + .put(key_space as usize, key.into_static(), value.into()) } - fn delete(&self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> { - self.batch.delete(key_space as usize, key.into_owned()) + fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { + self.batch.delete(key_space as usize, key.into_static()) + } +} + +impl KeyBase for WriteBuffer<'_> { + fn len(&self) -> usize { + (**self).len() + } + + fn hash(&self, state: &mut H) { + for item in &**self { + state.write_u8(*item); + } + } +} + +impl StoreKey for WriteBuffer<'_> { + fn write_to(&self, buf: &mut Vec) { + buf.extend_from_slice(self); + } +} + +impl PartialEq for WriteBuffer<'_> { + fn eq(&self, other: &Self) -> bool { + **self == **other + } +} + +impl Eq for WriteBuffer<'_> {} + +impl Ord for WriteBuffer<'_> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (**self).cmp(&**other) + } +} + +impl PartialOrd for WriteBuffer<'_> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl<'l> From> for ValueBuffer<'l> { + fn from(val: WriteBuffer<'l>) -> Self { + match val { + WriteBuffer::Borrowed(b) => ValueBuffer::Borrowed(b), + WriteBuffer::Vec(v) => ValueBuffer::Vec(v), + WriteBuffer::SmallVec(sv) => ValueBuffer::SmallVec(sv), + } } } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs b/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs index c699b0d274fc1f..6754a4ebb69a35 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs @@ -1,9 +1,11 @@ use std::{ borrow::{Borrow, Cow}, marker::PhantomData, + ops::Deref, }; use anyhow::Result; +use smallvec::SmallVec; use crate::database::key_value_database::KeySpace; @@ -19,14 +21,56 @@ pub trait BaseWriteBatch<'a> { fn commit(self) -> Result<()>; } +pub enum WriteBuffer<'a> { + Borrowed(&'a [u8]), + Vec(Vec), + SmallVec(smallvec::SmallVec<[u8; 16]>), +} + +impl WriteBuffer<'_> { + pub fn into_static(self) -> WriteBuffer<'static> { + match self { + WriteBuffer::Borrowed(b) => WriteBuffer::SmallVec(SmallVec::from_slice(b)), + WriteBuffer::Vec(v) => WriteBuffer::Vec(v), + WriteBuffer::SmallVec(sv) => WriteBuffer::Vec(sv.into_vec()), + } + } +} + +impl Deref for WriteBuffer<'_> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + match self { + WriteBuffer::Borrowed(b) => b, + WriteBuffer::Vec(v) => v, + WriteBuffer::SmallVec(sv) => sv, + } + } +} + +impl<'l> From> for WriteBuffer<'l> { + fn from(c: Cow<'l, [u8]>) -> Self { + match c { + Cow::Borrowed(b) => WriteBuffer::Borrowed(b), + Cow::Owned(o) => WriteBuffer::Vec(o), + } + } +} + pub trait SerialWriteBatch<'a>: BaseWriteBatch<'a> { - fn put(&mut self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()>; - fn delete(&mut self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()>; + fn put( + &mut self, + key_space: KeySpace, + key: WriteBuffer<'_>, + value: WriteBuffer<'_>, + ) -> Result<()>; + fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()>; } pub trait ConcurrentWriteBatch<'a>: BaseWriteBatch<'a> + Sync + Send { - fn put(&self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()>; - fn delete(&self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()>; + fn put(&self, key_space: KeySpace, key: WriteBuffer<'_>, value: WriteBuffer<'_>) -> Result<()>; + fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()>; } pub enum WriteBatch<'a, S, C> @@ -102,14 +146,19 @@ where S: SerialWriteBatch<'a>, C: ConcurrentWriteBatch<'a>, { - fn put(&mut self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> { + fn put( + &mut self, + key_space: KeySpace, + key: WriteBuffer<'_>, + value: WriteBuffer<'_>, + ) -> Result<()> { match self { WriteBatch::Serial(s) => s.put(key_space, key, value), WriteBatch::Concurrent(c, _) => c.put(key_space, key, value), } } - fn delete(&mut self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> { + fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { match self { WriteBatch::Serial(s) => s.delete(key_space, key), WriteBatch::Concurrent(c, _) => c.delete(key_space, key), @@ -174,14 +223,19 @@ where S: SerialWriteBatch<'a>, C: ConcurrentWriteBatch<'a>, { - fn put(&mut self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> { + fn put( + &mut self, + key_space: KeySpace, + key: WriteBuffer<'_>, + value: WriteBuffer<'_>, + ) -> Result<()> { match self { WriteBatchRef::Serial(s) => s.put(key_space, key, value), WriteBatchRef::Concurrent(c, _) => c.put(key_space, key, value), } } - fn delete(&mut self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> { + fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { match self { WriteBatchRef::Serial(s) => s.delete(key_space, key), WriteBatchRef::Concurrent(c, _) => c.delete(key_space, key), @@ -210,19 +264,29 @@ impl<'a> BaseWriteBatch<'a> for UnimplementedWriteBatch { } impl SerialWriteBatch<'_> for UnimplementedWriteBatch { - fn put(&mut self, _key_space: KeySpace, _key: Cow<[u8]>, _value: Cow<[u8]>) -> Result<()> { + fn put( + &mut self, + _key_space: KeySpace, + _key: WriteBuffer<'_>, + _value: WriteBuffer<'_>, + ) -> Result<()> { todo!() } - fn delete(&mut self, _key_space: KeySpace, _key: Cow<[u8]>) -> Result<()> { + fn delete(&mut self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { todo!() } } impl ConcurrentWriteBatch<'_> for UnimplementedWriteBatch { - fn put(&self, _key_space: KeySpace, _key: Cow<[u8]>, _value: Cow<[u8]>) -> Result<()> { + fn put( + &self, + _key_space: KeySpace, + _key: WriteBuffer<'_>, + _value: WriteBuffer<'_>, + ) -> Result<()> { todo!() } - fn delete(&self, _key_space: KeySpace, _key: Cow<[u8]>) -> Result<()> { + fn delete(&self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { todo!() } } diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 283c353d90ad1b..3d0465220e075a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -1,14 +1,10 @@ -use std::{ - borrow::{Borrow, Cow}, - cmp::max, - collections::hash_map::Entry, - sync::Arc, -}; +use std::{borrow::Borrow, cmp::max, collections::hash_map::Entry, sync::Arc}; use anyhow::{anyhow, Context, Result}; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use rustc_hash::FxHashMap; use serde::{ser::SerializeSeq, Serialize}; +use smallvec::SmallVec; use tracing::Span; use turbo_tasks::{backend::CachedTaskType, turbo_tasks_scope, KeyValuePair, SessionId, TaskId}; @@ -20,6 +16,7 @@ use crate::{ key_value_database::{KeySpace, KeyValueDatabase}, write_batch::{ BaseWriteBatch, ConcurrentWriteBatch, SerialWriteBatch, WriteBatch, WriteBatchRef, + WriteBuffer, }, }, utils::chunked_vec::ChunkedVec, @@ -27,6 +24,32 @@ use crate::{ const POT_CONFIG: pot::Config = pot::Config::new().compatibility(pot::Compatibility::V4); +fn pot_serialize_small_vec(value: &T) -> pot::Result> { + struct SmallVecWrite<'l>(&'l mut SmallVec<[u8; 16]>); + impl std::io::Write for SmallVecWrite<'_> { + #[inline] + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.extend_from_slice(buf); + Ok(buf.len()) + } + + #[inline] + fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { + self.0.extend_from_slice(buf); + Ok(()) + } + + #[inline] + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } + } + + let mut output = SmallVec::new(); + POT_CONFIG.serialize_into(value, SmallVecWrite(&mut output))?; + Ok(output) +} + fn pot_ser_symbol_map() -> pot::ser::SymbolMap { pot::ser::SymbolMap::new().with_compatibility(pot::Compatibility::V4) } @@ -191,8 +214,8 @@ impl BackingStorage batch .put( KeySpace::ForwardTaskCache, - Cow::Borrowed(&task_type_bytes), - Cow::Borrowed(&task_id.to_le_bytes()), + WriteBuffer::Borrowed(&task_type_bytes), + WriteBuffer::Borrowed(&task_id.to_le_bytes()), ) .with_context(|| { anyhow!( @@ -203,8 +226,8 @@ impl BackingStorage batch .put( KeySpace::ReverseTaskCache, - Cow::Borrowed(IntKey::new(task_id).as_ref()), - Cow::Borrowed(&task_type_bytes), + WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()), + WriteBuffer::Borrowed(&task_type_bytes), ) .with_context(|| { anyhow!( @@ -279,8 +302,8 @@ impl BackingStorage batch .put( KeySpace::ForwardTaskCache, - Cow::Borrowed(&task_type_bytes), - Cow::Borrowed(&task_id.to_le_bytes()), + WriteBuffer::Borrowed(&task_type_bytes), + WriteBuffer::Borrowed(&task_id.to_le_bytes()), ) .with_context(|| { anyhow!("Unable to write task cache {task_type:?} => {task_id}") @@ -288,8 +311,8 @@ impl BackingStorage batch .put( KeySpace::ReverseTaskCache, - Cow::Borrowed(IntKey::new(task_id).as_ref()), - Cow::Borrowed(&task_type_bytes), + WriteBuffer::Borrowed(IntKey::new(task_id).as_ref()), + WriteBuffer::Borrowed(&task_type_bytes), ) .with_context(|| { anyhow!("Unable to write task cache {task_id} => {task_type:?}") @@ -325,8 +348,8 @@ impl BackingStorage batch .put( key_space, - Cow::Borrowed(IntKey::new(*task_id).as_ref()), - value.into(), + WriteBuffer::Borrowed(IntKey::new(*task_id).as_ref()), + value, ) .with_context(|| anyhow!("Unable to write data items for {task_id}"))?; } @@ -473,8 +496,8 @@ where batch .put( KeySpace::Infra, - Cow::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()), - Cow::Borrowed(&next_task_id.to_le_bytes()), + WriteBuffer::Borrowed(IntKey::new(META_KEY_NEXT_FREE_TASK_ID).as_ref()), + WriteBuffer::Borrowed(&next_task_id.to_le_bytes()), ) .with_context(|| anyhow!("Unable to write next free task id"))?; } @@ -483,22 +506,21 @@ where batch .put( KeySpace::Infra, - Cow::Borrowed(IntKey::new(META_KEY_SESSION_ID).as_ref()), - Cow::Borrowed(&session_id.to_le_bytes()), + WriteBuffer::Borrowed(IntKey::new(META_KEY_SESSION_ID).as_ref()), + WriteBuffer::Borrowed(&session_id.to_le_bytes()), ) .with_context(|| anyhow!("Unable to write next session id"))?; } { let _span = tracing::trace_span!("update operations", operations = operations.len()).entered(); - let operations = POT_CONFIG - .serialize(&operations) + let operations = pot_serialize_small_vec(&operations) .with_context(|| anyhow!("Unable to serialize operations"))?; batch .put( KeySpace::Infra, - Cow::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()), - operations.into(), + WriteBuffer::Borrowed(IntKey::new(META_KEY_OPERATIONS).as_ref()), + WriteBuffer::SmallVec(operations), ) .with_context(|| anyhow!("Unable to write operations"))?; } @@ -527,7 +549,7 @@ fn serialize_task_type( Ok(()) } -type SerializedTasks = Vec)>>; +type SerializedTasks = Vec)>>; type TaskUpdates = FxHashMap, Option)>; @@ -712,12 +734,12 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>( if let Some(batch) = batch { batch.put( key_space, - Cow::Borrowed(IntKey::new(*task).as_ref()), - Cow::Owned(value), + WriteBuffer::Borrowed(IntKey::new(*task).as_ref()), + WriteBuffer::SmallVec(value), )?; } else { // Store the new task data - tasks.push((task, value)); + tasks.push((task, WriteBuffer::SmallVec(value))); } } @@ -728,9 +750,9 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync>( .collect::>>() } -fn serialize(task: TaskId, data: &mut TaskUpdates) -> Result> { +fn serialize(task: TaskId, data: &mut TaskUpdates) -> Result> { Ok( - match POT_CONFIG.serialize(&SerializeLikeVecOfCachedDataItem(data)) { + match pot_serialize_small_vec(&SerializeLikeVecOfCachedDataItem(data)) { #[cfg(not(feature = "verify_serialization"))] Ok(value) => value, _ => { @@ -782,11 +804,9 @@ fn serialize(task: TaskId, data: &mut TaskUpdates) -> Result> { }); error?; - POT_CONFIG - .serialize(&SerializeLikeVecOfCachedDataItem(data)) - .with_context(|| { - anyhow!("Unable to serialize data items for {task}: {data:#?}") - })? + pot_serialize_small_vec(&SerializeLikeVecOfCachedDataItem(data)).with_context( + || anyhow!("Unable to serialize data items for {task}: {data:#?}"), + )? } }, )