Skip to content

Commit 203f31f

Browse files
authored
Avoids emitting the same output from multiple processes of the same process pool (vercel/turborepo#3531)
It uses a `shared` set of `(line: [u8], occurences: u32)` to avoid emitting lines that has been emitted by other processes already. It still supports emitting the same line multiple times from one process. The `shared` data might be later used for showing logging from a worker pool. It contains the merged output of all processes. Fixes WEB-489
1 parent 32a4441 commit 203f31f

File tree

1 file changed

+70
-4
lines changed

1 file changed

+70
-4
lines changed

crates/turbopack-node/src/pool.rs

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
use std::{
22
collections::HashMap,
3+
mem::take,
34
path::{Path, PathBuf},
45
process::{ExitStatus, Stdio},
56
sync::{Arc, Mutex},
67
time::Duration,
78
};
89

910
use anyhow::{bail, Context, Result};
11+
use indexmap::IndexSet;
1012
use serde::{de::DeserializeOwned, Serialize};
1113
use tokio::{
12-
io::{AsyncReadExt, AsyncWriteExt},
14+
io::{
15+
stderr, stdout, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
16+
BufReader,
17+
},
1318
net::{TcpListener, TcpStream},
1419
process::{Child, Command},
1520
select,
@@ -55,11 +60,53 @@ impl Drop for RunningNodeJsPoolProcess {
5560

5661
const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
5762

63+
type SharedOutputSet = Arc<Mutex<IndexSet<(Arc<[u8]>, u32)>>>;
64+
65+
/// Pipes the `stream` from `final_stream`, but uses `shared` to deduplicate
66+
/// lines that has beem emitted by other `handle_output_stream` instances with
67+
/// the same `shared` before.
68+
async fn handle_output_stream(
69+
stream: impl AsyncRead + Unpin,
70+
shared: SharedOutputSet,
71+
mut final_stream: impl AsyncWrite + Unpin,
72+
) {
73+
let mut buffered = BufReader::new(stream);
74+
let mut own_output = HashMap::<Arc<[u8]>, u32>::new();
75+
let mut buffer = Vec::new();
76+
loop {
77+
match buffered.read_until(b'\n', &mut buffer).await {
78+
Ok(0) => {
79+
break;
80+
}
81+
Err(err) => {
82+
eprintln!("error reading from stream: {}", err);
83+
break;
84+
}
85+
Ok(_) => {}
86+
}
87+
let line = Arc::from(take(&mut buffer).into_boxed_slice());
88+
let occurance_number = *own_output
89+
.entry(Arc::clone(&line))
90+
.and_modify(|c| *c += 1)
91+
.or_insert(0);
92+
let new_line = {
93+
let mut shared = shared.lock().unwrap();
94+
shared.insert((line.clone(), occurance_number))
95+
};
96+
if new_line && final_stream.write(&line).await.is_err() {
97+
// Whatever happened with stdout/stderr, we can't write to it anymore.
98+
break;
99+
}
100+
}
101+
}
102+
58103
impl NodeJsPoolProcess {
59104
async fn new(
60105
cwd: &Path,
61106
env: &HashMap<String, String>,
62107
entrypoint: &Path,
108+
shared_stdout: SharedOutputSet,
109+
shared_stderr: SharedOutputSet,
63110
debug: bool,
64111
) -> Result<Self> {
65112
let listener = TcpListener::bind("127.0.0.1:0")
@@ -85,10 +132,21 @@ impl NodeJsPoolProcess {
85132
.expect("the SystemRoot environment variable should always be set"),
86133
);
87134
cmd.envs(env);
88-
cmd.stderr(Stdio::inherit());
89-
cmd.stdout(Stdio::inherit());
135+
cmd.stderr(Stdio::piped());
136+
cmd.stdout(Stdio::piped());
90137

91-
let child = cmd.spawn().context("spawning node pooled process")?;
138+
let mut child = cmd.spawn().context("spawning node pooled process")?;
139+
140+
tokio::spawn(handle_output_stream(
141+
child.stdout.take().unwrap(),
142+
shared_stdout,
143+
stdout(),
144+
));
145+
tokio::spawn(handle_output_stream(
146+
child.stderr.take().unwrap(),
147+
shared_stderr,
148+
stderr(),
149+
));
92150

93151
Ok(Self::Spawned(SpawnedNodeJsPoolProcess {
94152
listener,
@@ -184,6 +242,10 @@ pub struct NodeJsPool {
184242
processes: Arc<Mutex<Vec<NodeJsPoolProcess>>>,
185243
#[turbo_tasks(trace_ignore, debug_ignore)]
186244
semaphore: Arc<Semaphore>,
245+
#[turbo_tasks(trace_ignore, debug_ignore)]
246+
shared_stdout: SharedOutputSet,
247+
#[turbo_tasks(trace_ignore, debug_ignore)]
248+
shared_stderr: SharedOutputSet,
187249
debug: bool,
188250
}
189251

@@ -203,6 +265,8 @@ impl NodeJsPool {
203265
env,
204266
processes: Arc::new(Mutex::new(Vec::new())),
205267
semaphore: Arc::new(Semaphore::new(if debug { 1 } else { concurrency })),
268+
shared_stdout: Arc::new(Mutex::new(IndexSet::new())),
269+
shared_stderr: Arc::new(Mutex::new(IndexSet::new())),
206270
debug,
207271
}
208272
}
@@ -220,6 +284,8 @@ impl NodeJsPool {
220284
self.cwd.as_path(),
221285
&self.env,
222286
self.entrypoint.as_path(),
287+
self.shared_stdout.clone(),
288+
self.shared_stderr.clone(),
223289
self.debug,
224290
)
225291
.await

0 commit comments

Comments
 (0)