Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 160 additions & 10 deletions node/core/pvf/worker/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ use crate::LOG_TARGET;
use cpu_time::ProcessTime;
use futures::never::Never;
use std::{
any::Any,
path::PathBuf,
sync::mpsc::{Receiver, RecvTimeoutError},
time::Duration,
};
use tokio::{
io,
net::UnixStream,
runtime::{Handle, Runtime},
};
use tokio::{io, net::UnixStream, runtime::Runtime};

/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
/// child process.
Expand All @@ -44,7 +41,7 @@ pub fn worker_event_loop<F, Fut>(
node_version: Option<&str>,
mut event_loop: F,
) where
F: FnMut(Handle, UnixStream) -> Fut,
F: FnMut(UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
{
let worker_pid = std::process::id();
Expand All @@ -68,13 +65,12 @@ pub fn worker_event_loop<F, Fut>(

// Run the main worker loop.
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let handle = rt.handle();
let err = rt
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need tokio and futures if everything async-related is already purged? Filesystem interactions are sync in nature, and for the worker reading from the socket is blocking too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! I just didn't remove the rest of async to keep this PR focused. But I can do it here if you want.

Note that we still need to remove the dependencies on polkadot-node-core-pvf and tracing-gum to fully remove the dependency on tokio.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's tracing-gum related to tokio?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just ran cargo tree -e normal in the crate and saw tokio several times in the output, e.g. under sc-network and libp2p crates. I have no idea how tracing-gum works though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's jaeger, even though gum only uses hashing from it.

You're right! I just didn't remove the rest of async to keep this PR focused. But I can do it here if you want.

Better to polish the rest of the code and properly synchronize threads and take care of removing tokio later (if it's possible)

.block_on(async move {
let stream = UnixStream::connect(socket_path).await?;
let _ = tokio::fs::remove_file(socket_path).await;

let result = event_loop(handle.clone(), stream).await;
let result = event_loop(stream).await;

result
})
Expand Down Expand Up @@ -108,8 +104,10 @@ pub fn cpu_time_monitor_loop(

// Treat the timeout as CPU time, which is less subject to variance due to load.
if cpu_time_elapsed <= timeout {
// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
// is wall clock time. The CPU clock may be slower than the wall clock.
// Sleep for the remaining CPU time, plus a bit to account for overhead. (And we don't
// want to wake up too often -- so, since we just want to halt the worker thread if it
// stalled, we can sleep longer than necessary.) Note that the sleep is wall clock time.
// The CPU clock may be slower than the wall clock.
let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD;
match finished_rx.recv_timeout(sleep_interval) {
// Received finish signal.
Expand All @@ -124,6 +122,20 @@ pub fn cpu_time_monitor_loop(
}
}

/// Attempt to convert an opaque panic payload to a string.
///
/// This is a best effort, and is not guaranteed to provide the most accurate value.
pub fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
match payload.downcast::<&'static str>() {
Ok(msg) => msg.to_string(),
Err(payload) => match payload.downcast::<String>() {
Ok(msg) => *msg,
// At least we tried...
Err(_) => "unknown panic payload".to_string(),
},
}
}

/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM`
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
Expand All @@ -140,3 +152,141 @@ fn kill_parent_node_in_emergency() {
}
}
}

/// Functionality related to threads spawned by the workers.
///
/// The motivation for this module is to coordinate worker threads without using async Rust. This
/// lets us pull in less dependencies, making the worker binaries smaller and easier to secure.
pub mod thread {
use std::{
panic,
sync::{Arc, Condvar, Mutex},
thread,
time::Duration,
};

/// Contains the outcome of waiting on threads, or `Pending` if none are ready.
#[derive(Clone, Copy)]
pub enum WaitOutcome {
JobFinished,
CpuTimedOut,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Wouldn't JobPending and JobTimedOut sound better?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or remove the prefix completely: Finished, TimedOut, Pending.

Pending,
}

impl WaitOutcome {
pub fn is_pending(&self) -> bool {
matches!(self, Self::Pending)
}
}

/// Helper type.
pub type Cond = Arc<(Mutex<WaitOutcome>, Condvar)>;

/// Gets a condvar initialized to `Pending`.
pub fn get_condvar() -> Cond {
Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new()))
}

/// Runs a thread, afterwards notifying the threads waiting on the condvar. Catches panics and
/// resumes them after triggering the condvar, so that the waiting thread is notified on panics.
pub fn spawn_worker_thread<F, R>(
name: &str,
f: F,
cond: Cond,
outcome: WaitOutcome,
) -> std::io::Result<thread::JoinHandle<R>>
where
F: FnOnce() -> R,
F: Send + 'static + panic::UnwindSafe,
R: Send + 'static,
{
thread::Builder::new()
.name(name.into())
.spawn(move || cond_notify_on_done(f, cond, outcome))
}

/// Runs a worker thread with the given stack size. See [`spawn_worker_thread`].
pub fn spawn_worker_thread_with_stack_size<F, R>(
name: &str,
f: F,
cond: Cond,
outcome: WaitOutcome,
stack_size: usize,
) -> std::io::Result<thread::JoinHandle<R>>
where
F: FnOnce() -> R,
F: Send + 'static + panic::UnwindSafe,
R: Send + 'static,
{
thread::Builder::new()
.name(name.into())
.stack_size(stack_size)
.spawn(move || cond_notify_on_done(f, cond, outcome))
}

/// Runs a function, afterwards notifying the threads waiting on the condvar. Catches panics and
/// resumes them after triggering the condvar, so that the waiting thread is notified on panics.
fn cond_notify_on_done<F, R>(f: F, cond: Cond, outcome: WaitOutcome) -> R
where
F: FnOnce() -> R,
F: panic::UnwindSafe,
{
let result = panic::catch_unwind(|| f());
cond_notify_all(cond, outcome);
match result {
Ok(inner) => return inner,
Err(err) => panic::resume_unwind(err),
}
}

/// Helper function to notify all threads waiting on this condvar.
fn cond_notify_all(cond: Cond, outcome: WaitOutcome) {
let (lock, cvar) = &*cond;
let mut flag = lock
.lock()
.expect("only panics if the lock is already held by the current thread; qed");
if !flag.is_pending() {
// Someone else already triggered the condvar.
return
}
*flag = outcome;
cvar.notify_all();
}

/// Block the thread while it waits on the condvar.
pub fn wait_for_threads(cond: Cond) -> WaitOutcome {
let (lock, cvar) = &*cond;
let guard = cvar
.wait_while(
lock.lock()
.expect("only panics if the lock is already held by the current thread; qed"),
|flag| flag.is_pending(),
)
.unwrap();
*guard
}

/// Block the thread while it waits on the condvar or on a timeout. If the timeout is hit,
/// returns `None`. The signature is different than [`wait_for_threads`] because this is
/// expected to be called in a loop, where we can't take ownership of a `Cond`.
#[cfg_attr(not(any(target_os = "linux", feature = "jemalloc-allocator")), allow(dead_code))]
pub fn wait_for_threads_with_timeout(
lock: &Mutex<WaitOutcome>,
cvar: &Condvar,
dur: Duration,
) -> Option<WaitOutcome> {
let result = cvar
.wait_timeout_while(
lock.lock()
.expect("only panics if the lock is already held by the current thread; qed"),
dur,
|flag| flag.is_pending(),
)
.unwrap();
if result.1.timed_out() {
None
} else {
Some(*result.0)
}
}
}
111 changes: 72 additions & 39 deletions node/core/pvf/worker/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{
common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop},
executor_intf::Executor,
common::{
bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload,
thread::{self, WaitOutcome},
worker_event_loop,
},
executor_intf::{Executor, EXECUTE_THREAD_STACK_SIZE},
LOG_TARGET,
};
use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf::{
framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response,
Expand Down Expand Up @@ -67,18 +70,22 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul
framed_send(stream, &response.encode()).await
}

/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
/// The entrypoint that the spawned execute worker should start with.
///
/// # Parameters
///
/// The `socket_path` specifies the path to the socket used to communicate with the host. The
/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in
/// immediate worker termination. `None` is used for tests and in other situations when version
/// check is not necessary.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
worker_event_loop("execute", socket_path, node_version, |mut stream| async move {
let worker_pid = std::process::id();

let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
let executor = Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);
})?;

loop {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
Expand All @@ -89,31 +96,49 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
artifact_path.display(),
);

// Used to signal to the cpu time monitor thread that it can finish.
let (finished_tx, finished_rx) = channel::<()>();
// Conditional variable to notify us when a thread is done.
let condvar = thread::get_condvar();

let cpu_time_start = ProcessTime::now();

// Spawn a new thread that runs the CPU time monitor.
let cpu_time_monitor_fut = rt_handle
.spawn_blocking(move || {
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
})
.fuse();
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_thread = thread::spawn_worker_thread(
"cpu time monitor thread",
move || {
cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx)
},
Arc::clone(&condvar),
WaitOutcome::CpuTimedOut,
)?;
let executor_2 = executor.clone();
let execute_fut = rt_handle
.spawn_blocking(move || {
let execute_thread = thread::spawn_worker_thread_with_stack_size(
"execute thread",
move || {
validate_using_artifact(&artifact_path, &params, executor_2, cpu_time_start)
})
.fuse();

pin_mut!(cpu_time_monitor_fut);
pin_mut!(execute_fut);

let response = select_biased! {
// If this future is not selected, the join handle is dropped and the thread will
// finish in the background.
cpu_time_monitor_res = cpu_time_monitor_fut => {
match cpu_time_monitor_res {
},
Arc::clone(&condvar),
WaitOutcome::JobFinished,
EXECUTE_THREAD_STACK_SIZE,
)?;

let outcome = thread::wait_for_threads(condvar);

let response = match outcome {
WaitOutcome::JobFinished => {
let _ = cpu_time_monitor_tx.send(());
execute_thread.join().unwrap_or_else(|e| {
// TODO: Use `Panic` error once that is implemented.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have an issue for this , rather than TODO in the code ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already addressed it here, it's approved so I'll merge it right after this PR. (I had planned to do these changes right after another so I left the TODO as a marker for myself, did the change, and set 7155's merge target to this branch. Will do issues instead in the future. 👍)

Response::format_internal(
"execute thread error",
&stringify_panic_payload(e),
)
})
},
// If the CPU thread is not selected, we signal it to end, the join handle is
// dropped and the thread will finish in the background.
WaitOutcome::CpuTimedOut => {
match cpu_time_monitor_thread.join() {
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
Expand All @@ -125,14 +150,20 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
);
Response::TimedOut
},
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
Ok(None) => Response::format_internal(
"cpu time monitor thread error",
"error communicating over finished channel".into(),
),
// We can use an internal error here because errors in this thread are
// independent of the candidate.
Err(e) => Response::format_internal(
"cpu time monitor thread error",
&stringify_panic_payload(e),
),
}
},
execute_res = execute_fut => {
let _ = finished_tx.send(());
execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
},
WaitOutcome::Pending =>
unreachable!("we run wait_while until the outcome is no longer pending; qed"),
};

send_response(&mut stream, response).await?;
Expand All @@ -143,7 +174,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
fn validate_using_artifact(
artifact_path: &Path,
params: &[u8],
executor: Arc<Executor>,
executor: Executor,
cpu_time_start: ProcessTime,
) -> Response {
// Check here if the file exists, because the error from Substrate is not match-able.
Expand All @@ -163,13 +194,15 @@ fn validate_using_artifact(
Ok(d) => d,
};

let duration = cpu_time_start.elapsed();

let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
Err(err) =>
return Response::format_invalid("validation result decoding failed", &err.to_string()),
Ok(r) => r,
};

// Include the decoding in the measured time, to prevent any potential attacks exploiting some
// bug in decoding.
let duration = cpu_time_start.elapsed();

Response::Ok { result_descriptor, duration }
}
Loading