Skip to content

Commit 4de2201

Browse files
authored
[turbopack] Share scratch buffer across shards using thread local (#90167)
Instead of allocating a scratch buffer for every shard, buffer them in a thread local.
1 parent 6689814 commit 4de2201

File tree

6 files changed

+133
-55
lines changed

6 files changed

+133
-55
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

turbopack/crates/turbo-tasks-backend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ turbo-persistence = { workspace = true }
5757
turbo-rcstr = { workspace = true }
5858
turbo-tasks = { workspace = true }
5959
turbo-tasks-hash = { workspace = true }
60+
thread_local = { workspace = true }
6061

6162
[dev-dependencies]
6263
criterion = { workspace = true, features = ["async_tokio"] }

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1209,7 +1209,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
12091209
)
12101210
};
12111211

1212-
// take_snapshot already filters empty items and empty shards in parallel
1212+
// take_snapshot filters empty shards (no modified/snapshot entries) in parallel.
1213+
// Individual empty SnapshotItems are filtered by the iterator.
12131214
let task_snapshots = self.storage.take_snapshot(&process, &process_snapshot);
12141215

12151216
swap_retain(&mut persisted_task_cache_log, |shard| !shard.is_empty());

turbopack/crates/turbo-tasks-backend/src/backend/storage.rs

Lines changed: 125 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use std::{
2+
cell::Cell,
23
hash::Hash,
3-
iter::Peekable,
44
ops::{Deref, DerefMut},
55
sync::{Arc, atomic::AtomicBool},
66
};
77

88
use smallvec::SmallVec;
9+
use thread_local::ThreadLocal;
910
use turbo_bincode::TurboBincodeBuffer;
1011
use turbo_tasks::{FxDashMap, TaskId, parallel};
1112

@@ -113,7 +114,9 @@ impl Storage {
113114
/// receives an owned Box<TaskStorage> snapshot.
114115
/// Both callbacks receive a mutable scratch buffer that can be reused across iterations
115116
/// to avoid repeated allocations.
116-
/// The returned iterators are guaranteed to be non-empty and only yield non-empty items.
117+
/// The returned shards implement `IntoIterator`. Empty shards (no modified or snapshot
118+
/// entries) are filtered out, but shards may still yield no items if all entries produce
119+
/// empty `SnapshotItem`s (this is rare and only happens under error conditions).
117120
pub fn take_snapshot<
118121
'l,
119122
P: for<'a> Fn(TaskId, &'a TaskStorage, &mut TurboBincodeBuffer) -> SnapshotItem + Sync,
@@ -122,12 +125,15 @@ impl Storage {
122125
&'l self,
123126
process: &'l P,
124127
process_snapshot: &'l PS,
125-
) -> Vec<Peekable<SnapshotShard<'l, P, PS>>> {
128+
) -> Vec<SnapshotShard<'l, P, PS>> {
126129
if !self.snapshot_mode() {
127130
self.start_snapshot();
128131
}
129132

130-
let guard = Arc::new(SnapshotGuard { storage: self });
133+
let guard = Arc::new(SnapshotGuard {
134+
storage: self,
135+
scratch_buffers: ThreadLocal::new(),
136+
});
131137

132138
// The number of shards is much larger than the number of threads, so the effect of the
133139
// locks held is negligible.
@@ -163,22 +169,14 @@ impl Storage {
163169
return None;
164170
}
165171

166-
/// How big of a buffer to allocate initially. Based on metrics from a large
167-
/// application this should cover about 98% of values with no resizes
168-
const SCRATCH_BUFFER_SIZE: usize = 4096;
169-
let shard = SnapshotShard {
172+
Some(SnapshotShard {
170173
direct_snapshots,
171174
modified,
172175
storage: self,
173-
guard: Some(guard.clone()),
174176
process,
175177
process_snapshot,
176-
scratch_buffer: TurboBincodeBuffer::with_capacity(SCRATCH_BUFFER_SIZE),
177-
};
178-
179-
// Peek to filter out shards that only produce empty items
180-
let mut iter = shard.peekable();
181-
iter.peek().is_some().then_some(iter)
178+
_guard: guard.clone(),
179+
})
182180
})
183181
.into_iter()
184182
.flatten()
@@ -382,8 +380,69 @@ impl DerefMut for StorageWriteGuard<'_> {
382380
}
383381
}
384382

383+
/// How big of a buffer to allocate initially. Based on metrics from a large
384+
/// application this should cover about 98% of values with no resizes.
385+
const SCRATCH_BUFFER_INITIAL_SIZE: usize = 4096;
386+
387+
/// State machine for a per-thread scratch buffer slot.
388+
///
389+
/// Transitions:
390+
/// - `Uninit` → `Taken` (first take)
391+
/// - `Available` → `Taken` (subsequent takes)
392+
/// - `Taken` → `Available` (return)
393+
///
394+
/// Any other transition is a bug (e.g. double-take or double-return).
395+
#[derive(Default)]
396+
enum ScratchBufferSlot {
397+
/// No buffer has been allocated on this thread yet.
398+
#[default]
399+
Uninit,
400+
/// The buffer is currently checked out.
401+
Taken,
402+
/// The buffer is available for reuse.
403+
Available(TurboBincodeBuffer),
404+
}
405+
385406
pub struct SnapshotGuard<'l> {
386407
storage: &'l Storage,
408+
/// Per-thread scratch buffers for encoding task data. Buffers are taken
409+
/// by `SnapshotShardIter` on creation and returned on drop, allowing reuse
410+
/// across multiple shards processed by the same thread. When the guard is
411+
/// dropped (after all iterators are done), the `ThreadLocal` drops too,
412+
/// freeing all buffers.
413+
scratch_buffers: ThreadLocal<Cell<ScratchBufferSlot>>,
414+
}
415+
416+
impl SnapshotGuard<'_> {
417+
fn take_scratch_buffer(&self) -> TurboBincodeBuffer {
418+
let cell = self.scratch_buffers.get_or_default();
419+
match cell.take() {
420+
ScratchBufferSlot::Available(buf) => {
421+
cell.set(ScratchBufferSlot::Taken);
422+
buf
423+
}
424+
ScratchBufferSlot::Uninit => {
425+
cell.set(ScratchBufferSlot::Taken);
426+
TurboBincodeBuffer::with_capacity(SCRATCH_BUFFER_INITIAL_SIZE)
427+
}
428+
ScratchBufferSlot::Taken => {
429+
panic!("scratch buffer taken twice without being returned");
430+
}
431+
}
432+
}
433+
434+
fn return_scratch_buffer(&self, buffer: TurboBincodeBuffer) {
435+
let cell = self.scratch_buffers.get_or_default();
436+
match cell.take() {
437+
ScratchBufferSlot::Taken => cell.set(ScratchBufferSlot::Available(buffer)),
438+
ScratchBufferSlot::Available(_) => {
439+
panic!("scratch buffer returned without being taken (already available)");
440+
}
441+
ScratchBufferSlot::Uninit => {
442+
panic!("scratch buffer returned without being taken (uninit)");
443+
}
444+
}
445+
}
387446
}
388447

389448
impl Drop for SnapshotGuard<'_> {
@@ -396,66 +455,82 @@ pub struct SnapshotShard<'l, P, PS> {
396455
direct_snapshots: Vec<(TaskId, Box<TaskStorage>)>,
397456
modified: SmallVec<[TaskId; 4]>,
398457
storage: &'l Storage,
399-
guard: Option<Arc<SnapshotGuard<'l>>>,
400458
process: &'l P,
401459
process_snapshot: &'l PS,
402-
/// Scratch buffer for encoding task data, reused across iterations to avoid allocations
403-
scratch_buffer: TurboBincodeBuffer,
460+
/// Held for its `Drop` impl — ensures snapshot mode ends when all shards are done.
461+
_guard: Arc<SnapshotGuard<'l>>,
462+
}
463+
464+
impl<'l, P, PS> IntoIterator for SnapshotShard<'l, P, PS>
465+
where
466+
P: Fn(TaskId, &TaskStorage, &mut TurboBincodeBuffer) -> SnapshotItem + Sync,
467+
PS: Fn(TaskId, Box<TaskStorage>, &mut TurboBincodeBuffer) -> SnapshotItem + Sync,
468+
{
469+
type Item = SnapshotItem;
470+
type IntoIter = SnapshotShardIter<'l, P, PS>;
471+
472+
fn into_iter(self) -> Self::IntoIter {
473+
let buffer = self._guard.take_scratch_buffer();
474+
SnapshotShardIter {
475+
shard: self,
476+
buffer,
477+
}
478+
}
479+
}
480+
481+
/// Iterator over a single shard's snapshot items. Holds a thread-local scratch
482+
/// buffer for the duration of iteration and returns it on drop.
483+
pub struct SnapshotShardIter<'l, P, PS> {
484+
shard: SnapshotShard<'l, P, PS>,
485+
buffer: TurboBincodeBuffer,
404486
}
405487

406-
impl<'l, P, PS> SnapshotShard<'l, P, PS>
488+
impl<'l, P, PS> Iterator for SnapshotShardIter<'l, P, PS>
407489
where
408490
P: Fn(TaskId, &TaskStorage, &mut TurboBincodeBuffer) -> SnapshotItem + Sync,
409491
PS: Fn(TaskId, Box<TaskStorage>, &mut TurboBincodeBuffer) -> SnapshotItem + Sync,
410492
{
411-
fn next_item(&mut self) -> Option<SnapshotItem> {
412-
if let Some((task_id, snapshot)) = self.direct_snapshots.pop() {
413-
return Some((self.process_snapshot)(
414-
task_id,
415-
snapshot,
416-
&mut self.scratch_buffer,
417-
));
493+
type Item = SnapshotItem;
494+
495+
fn next(&mut self) -> Option<Self::Item> {
496+
while let Some((task_id, snapshot)) = self.shard.direct_snapshots.pop() {
497+
let item = (self.shard.process_snapshot)(task_id, snapshot, &mut self.buffer);
498+
if !item.is_empty() {
499+
return Some(item);
500+
}
418501
}
419-
while let Some(task_id) = self.modified.pop() {
420-
let inner = self.storage.map.get(&task_id).unwrap();
502+
while let Some(task_id) = self.shard.modified.pop() {
503+
let inner = self.shard.storage.map.get(&task_id).unwrap();
421504
if !inner.flags.any_snapshot() {
422-
return Some((self.process)(task_id, &inner, &mut self.scratch_buffer));
505+
let item = (self.shard.process)(task_id, &inner, &mut self.buffer);
506+
if !item.is_empty() {
507+
return Some(item);
508+
}
423509
} else {
424510
drop(inner);
425511
let maybe_snapshot = {
426-
let mut modified_state = self.storage.modified.get_mut(&task_id).unwrap();
512+
let mut modified_state = self.shard.storage.modified.get_mut(&task_id).unwrap();
427513
let ModifiedState::Snapshot(snapshot) = &mut *modified_state else {
428514
unreachable!("The snapshot bit was set, so it must be in Snapshot state");
429515
};
430516
snapshot.take()
431517
};
432518
if let Some(snapshot) = maybe_snapshot {
433-
return Some((self.process_snapshot)(
434-
task_id,
435-
snapshot,
436-
&mut self.scratch_buffer,
437-
));
519+
let item = (self.shard.process_snapshot)(task_id, snapshot, &mut self.buffer);
520+
if !item.is_empty() {
521+
return Some(item);
522+
}
438523
}
439524
}
440525
}
441526
None
442527
}
443528
}
444529

445-
impl<'l, P, PS> Iterator for SnapshotShard<'l, P, PS>
446-
where
447-
P: Fn(TaskId, &TaskStorage, &mut TurboBincodeBuffer) -> SnapshotItem + Sync,
448-
PS: Fn(TaskId, Box<TaskStorage>, &mut TurboBincodeBuffer) -> SnapshotItem + Sync,
449-
{
450-
type Item = SnapshotItem;
451-
452-
fn next(&mut self) -> Option<Self::Item> {
453-
while let Some(item) = self.next_item() {
454-
if !item.is_empty() {
455-
return Some(item);
456-
}
457-
}
458-
self.guard = None;
459-
None
530+
impl<P, PS> Drop for SnapshotShardIter<'_, P, PS> {
531+
fn drop(&mut self) {
532+
self.shard
533+
._guard
534+
.return_scratch_buffer(std::mem::take(&mut self.buffer));
460535
}
461536
}

turbopack/crates/turbo-tasks-backend/src/backing_storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub trait BackingStorageSealed: 'static + Send + Sync {
6262
snapshots: Vec<I>,
6363
) -> Result<()>
6464
where
65-
I: Iterator<Item = SnapshotItem> + Send + Sync;
65+
I: IntoIterator<Item = SnapshotItem> + Send + Sync;
6666
/// Returns all task IDs that match the given task type (hash collision candidates).
6767
///
6868
/// Since TaskCache uses hash-based keys, multiple task types may (rarely) hash to the same key.
@@ -125,7 +125,7 @@ where
125125
snapshots: Vec<I>,
126126
) -> Result<()>
127127
where
128-
I: Iterator<Item = SnapshotItem> + Send + Sync,
128+
I: IntoIterator<Item = SnapshotItem> + Send + Sync,
129129
{
130130
either::for_both!(self, this => this.save_snapshot(
131131
operations,

turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorageSealed
231231
snapshots: Vec<I>,
232232
) -> Result<()>
233233
where
234-
I: Iterator<Item = SnapshotItem> + Send + Sync,
234+
I: IntoIterator<Item = SnapshotItem> + Send + Sync,
235235
{
236236
let _span = tracing::info_span!("save snapshot", operations = operations.len()).entered();
237237
let batch = self.inner.database.write_batch()?;
@@ -451,7 +451,7 @@ fn process_task_data<'a, B: ConcurrentWriteBatch<'a> + Send + Sync, I>(
451451
batch: &B,
452452
) -> Result<()>
453453
where
454-
I: Iterator<Item = SnapshotItem> + Send + Sync,
454+
I: IntoIterator<Item = SnapshotItem> + Send + Sync,
455455
{
456456
parallel::try_for_each_owned(tasks, |tasks| {
457457
for SnapshotItem {

0 commit comments

Comments
 (0)