Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 33fe557

Browse files
committed
Refactor handling response bytes
1 parent 80d2cf6 commit 33fe557

1 file changed

Lines changed: 89 additions & 67 deletions

File tree

node/core/pvf/src/prepare/worker.rs

Lines changed: 89 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ pub enum Outcome {
7575
DidNotMakeIt,
7676
}
7777

78+
#[derive(Debug)]
79+
enum Selected {
80+
Done(PrepareResult),
81+
IoErr,
82+
Deadline,
83+
}
84+
7885
/// Given the idle token of a worker and parameters of work, communicates with the worker and
7986
/// returns the outcome.
8087
pub async fn start_work(
@@ -109,13 +116,6 @@ pub async fn start_work(
109116
//
110117
// In that case we should propagate the error to the pool.
111118

112-
#[derive(Debug)]
113-
enum Selected {
114-
Done(PrepareResult),
115-
IoErr,
116-
Deadline,
117-
}
118-
119119
// We use a generous timeout here. This is in addition to the one in the child process, in
120120
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
121121
// in the child. We want to use CPU time because it varies less than wall clock time under
@@ -125,64 +125,15 @@ pub async fn start_work(
125125
let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await;
126126

127127
let selected = match result {
128-
// TODO: This case is really long, refactor.
129-
Ok(Ok(response_bytes)) => {
130-
// Received bytes from worker within the time limit.
131-
// By convention we expect encoded `PrepareResult`.
132-
if let Ok(result) = PrepareResult::decode(&mut response_bytes.as_slice()) {
133-
if let Ok(cpu_time_elapsed) = result {
134-
if cpu_time_elapsed > preparation_timeout {
135-
// The job didn't complete within the timeout.
136-
gum::warn!(
137-
target: LOG_TARGET,
138-
worker_pid = %pid,
139-
"prepare job took {}ms cpu time, exceeded preparation timeout {}ms",
140-
cpu_time_elapsed.as_millis(),
141-
preparation_timeout.as_millis()
142-
);
143-
144-
// Return a timeout error. The artifact exists, but is located in a
145-
// temporary file which will be cleared by `with_tmp_file`.
146-
Selected::Deadline
147-
} else {
148-
gum::debug!(
149-
target: LOG_TARGET,
150-
worker_pid = %pid,
151-
"promoting WIP artifact {} to {}",
152-
tmp_file.display(),
153-
artifact_path.display(),
154-
);
155-
156-
async_std::fs::rename(&tmp_file, &artifact_path)
157-
.await
158-
.map(|_| Selected::Done(result))
159-
.unwrap_or_else(|err| {
160-
gum::warn!(
161-
target: LOG_TARGET,
162-
worker_pid = %pid,
163-
"failed to rename the artifact from {} to {}: {:?}",
164-
tmp_file.display(),
165-
artifact_path.display(),
166-
err,
167-
);
168-
Selected::IoErr
169-
})
170-
}
171-
} else {
172-
Selected::Done(result)
173-
}
174-
} else {
175-
// We received invalid bytes from the worker.
176-
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
177-
gum::warn!(
178-
target: LOG_TARGET,
179-
worker_pid = %pid,
180-
"received unexpected response from the prepare worker: {}",
181-
HexDisplay::from(&bound_bytes),
182-
);
183-
Selected::IoErr
184-
}
185-
},
128+
Ok(Ok(response_bytes)) =>
129+
handle_response_bytes(
130+
response_bytes,
131+
pid,
132+
tmp_file,
133+
artifact_path,
134+
preparation_timeout,
135+
)
136+
.await,
186137
Ok(Err(err)) => {
187138
// Communication error within the time limit.
188139
gum::warn!(
@@ -194,7 +145,7 @@ pub async fn start_work(
194145
Selected::IoErr
195146
},
196147
Err(_) => {
197-
// Timed out.
148+
// Timed out here on the host.
198149
gum::warn!(
199150
target: LOG_TARGET,
200151
worker_pid = %pid,
@@ -205,7 +156,7 @@ pub async fn start_work(
205156
};
206157

207158
match selected {
208-
// Timed out. This should already be logged by the child.
159+
// Timed out on the child. This should already be logged by the child.
209160
Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut,
210161
Selected::Done(result) =>
211162
Outcome::Concluded { worker: IdleWorker { stream, pid }, result },
@@ -447,3 +398,74 @@ async fn cpu_time_monitor_loop(
447398
std::thread::sleep(sleep_interval);
448399
}
449400
}
401+
402+
/// Handles receiving response bytes on the host from the child.
403+
async fn handle_response_bytes(
404+
response_bytes: Vec<u8>,
405+
pid: u32,
406+
tmp_file: PathBuf,
407+
artifact_path: PathBuf,
408+
preparation_timeout: Duration,
409+
) -> Selected {
410+
// Received bytes from worker within the time limit.
411+
// By convention we expect encoded `PrepareResult`.
412+
let result = match PrepareResult::decode(&mut response_bytes.as_slice()) {
413+
Ok(result) => result,
414+
Err(_) => {
415+
// We received invalid bytes from the worker.
416+
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
417+
gum::warn!(
418+
target: LOG_TARGET,
419+
worker_pid = %pid,
420+
"received unexpected response from the prepare worker: {}",
421+
HexDisplay::from(&bound_bytes),
422+
);
423+
return Selected::IoErr
424+
},
425+
};
426+
let cpu_time_elapsed = match result {
427+
Ok(result) => result,
428+
Err(_) => return Selected::Done(result),
429+
};
430+
431+
if cpu_time_elapsed > preparation_timeout {
432+
// The job didn't complete within the timeout.
433+
gum::warn!(
434+
target: LOG_TARGET,
435+
worker_pid = %pid,
436+
"prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}",
437+
cpu_time_elapsed.as_millis(),
438+
preparation_timeout.as_millis(),
439+
tmp_file.display(),
440+
);
441+
442+
// Return a timeout error.
443+
//
444+
// NOTE: The artifact exists, but is located in a temporary file which
445+
// will be cleared by `with_tmp_file`.
446+
return Selected::Deadline
447+
}
448+
449+
gum::debug!(
450+
target: LOG_TARGET,
451+
worker_pid = %pid,
452+
"promoting WIP artifact {} to {}",
453+
tmp_file.display(),
454+
artifact_path.display(),
455+
);
456+
457+
async_std::fs::rename(&tmp_file, &artifact_path)
458+
.await
459+
.map(|_| Selected::Done(result))
460+
.unwrap_or_else(|err| {
461+
gum::warn!(
462+
target: LOG_TARGET,
463+
worker_pid = %pid,
464+
"failed to rename the artifact from {} to {}: {:?}",
465+
tmp_file.display(),
466+
artifact_path.display(),
467+
err,
468+
);
469+
Selected::IoErr
470+
})
471+
}

0 commit comments

Comments
 (0)