Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 79 additions & 5 deletions crates/turbopack-node/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use std::{
collections::HashMap,
collections::{hash_map::Entry, HashMap},
mem::take,
path::{Path, PathBuf},
process::{ExitStatus, Stdio},
sync::{Arc, Mutex},
time::Duration,
};

use anyhow::{bail, Context, Result};
use indexmap::IndexSet;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{
stderr, stdout, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
BufReader,
},
net::{TcpListener, TcpStream},
process::{Child, Command},
select,
Expand Down Expand Up @@ -55,11 +60,61 @@ impl Drop for RunningNodeJsPoolProcess {

const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);

/// Pipes the `stream` from `final_stream`, but uses `shared` to deduplicate
/// lines that has beem emitted by other `handle_output_stream` instances with
/// the same `shared` before.
async fn handle_output_stream(
stream: impl AsyncRead + Unpin,
shared: Arc<Mutex<IndexSet<(Arc<[u8]>, u32)>>>,
mut final_stream: impl AsyncWrite + Unpin,
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If stream A reports:

error A:
   shared line
   non-shared line (A)

while stream B reports:

error B:
  shared line
  non-shared line (B)

We would end up with:

error A:
  shared line
  non-shared line (A)
error B:
  non-shared line (B)

I expect this to also happen with empty lines printed for formatting purposes.

Should we instead only dedupe the start of a stream, up until the first different character?

let mut buffered = BufReader::new(stream);
let mut own_output = HashMap::<Arc<[u8]>, u32>::new();
let mut buffer = Vec::new();
loop {
match buffered.read_until(b'\n', &mut buffer).await {
Ok(0) => {
break;
}
Ok(_) => {
let line = Arc::from(take(&mut buffer).into_boxed_slice());
let occurances = match own_output.entry(Arc::clone(&line)) {
Entry::Occupied(mut e) => {
let c = e.get_mut();
*c += 1;
*c
}
Entry::Vacant(e) => {
e.insert(0);
0
}
};
let new_line = {
let mut shared = shared.lock().unwrap();
shared.insert((line.clone(), occurances))
};
if new_line {
Copy link
Contributor

@ForsakenHarmony ForsakenHarmony Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I'm understanding this correctly, a new line gets logged whenever there are more than the previous amount of occurrences?

This also means if one execution logs something 2 times and the next logs it 3 times, it will only output 1 time for the next execution? That seems like confusing behavior that might potentially hide information necessary for debugging

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it could be a bit confusing in some edge cases... Do you have a better idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe do it for the whole output and not line by line

if final_stream.write(&*line).await.is_err() {
// Whatever happened with stdout/stderr, we can't write to it anymore.
break;
}
}
}
Err(err) => {
eprintln!("error reading from stream: {}", err);
break;
}
}
}
}

impl NodeJsPoolProcess {
async fn new(
cwd: &Path,
env: &HashMap<String, String>,
entrypoint: &Path,
shared_stdout: Arc<Mutex<IndexSet<(Arc<[u8]>, u32)>>>,
shared_stderr: Arc<Mutex<IndexSet<(Arc<[u8]>, u32)>>>,
debug: bool,
) -> Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")
Expand All @@ -85,10 +140,21 @@ impl NodeJsPoolProcess {
.expect("the SystemRoot environment variable should always be set"),
);
cmd.envs(env);
cmd.stderr(Stdio::inherit());
cmd.stdout(Stdio::inherit());
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());

let child = cmd.spawn().context("spawning node pooled process")?;
let mut child = cmd.spawn().context("spawning node pooled process")?;

tokio::spawn(handle_output_stream(
child.stdout.take().unwrap(),
shared_stdout,
stdout(),
));
tokio::spawn(handle_output_stream(
child.stderr.take().unwrap(),
shared_stderr,
stderr(),
));

Ok(Self::Spawned(SpawnedNodeJsPoolProcess {
listener,
Expand Down Expand Up @@ -184,6 +250,10 @@ pub struct NodeJsPool {
processes: Arc<Mutex<Vec<NodeJsPoolProcess>>>,
#[turbo_tasks(trace_ignore, debug_ignore)]
semaphore: Arc<Semaphore>,
#[turbo_tasks(trace_ignore, debug_ignore)]
shared_stdout: Arc<Mutex<IndexSet<(Arc<[u8]>, u32)>>>,
#[turbo_tasks(trace_ignore, debug_ignore)]
shared_stderr: Arc<Mutex<IndexSet<(Arc<[u8]>, u32)>>>,
debug: bool,
}

Expand All @@ -203,6 +273,8 @@ impl NodeJsPool {
env,
processes: Arc::new(Mutex::new(Vec::new())),
semaphore: Arc::new(Semaphore::new(if debug { 1 } else { concurrency })),
shared_stdout: Arc::new(Mutex::new(IndexSet::new())),
shared_stderr: Arc::new(Mutex::new(IndexSet::new())),
debug,
}
}
Expand All @@ -220,6 +292,8 @@ impl NodeJsPool {
self.cwd.as_path(),
&self.env,
self.entrypoint.as_path(),
self.shared_stdout.clone(),
self.shared_stderr.clone(),
self.debug,
)
.await
Expand Down