Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c93bf69
Take charge of parsing and evaluation
lionel- Oct 24, 2025
5df36e9
More caller tracking
lionel- Oct 29, 2025
7dd0f4e
Consolidate debugger states
lionel- Oct 30, 2025
2f07846
Extract `handle_input_request()`
lionel- Oct 30, 2025
0650040
Extract `handle_pending_input()`
lionel- Oct 30, 2025
3367930
Rename `finalize_call_text()` to `handle_read_console()`
lionel- Oct 30, 2025
dccf144
Make `read()` a constructor method on `PendingInputs`
lionel- Oct 30, 2025
fc697b7
Refactor console error and result handling
lionel- Oct 30, 2025
2040e08
Cancel pending inputs when we get in the debugger
lionel- Oct 30, 2025
4bae770
Add test for invalid syntax
lionel- Oct 30, 2025
019cba0
Tweak documentation
lionel- Oct 31, 2025
7c3c168
Remove `into_protected()` method
lionel- Oct 31, 2025
69ca0dc
Make `reply_execute_request()` a free function
lionel- Oct 31, 2025
cad9a3f
Create Jupyter exception in the global condition handler
lionel- Oct 31, 2025
e884469
Fully remove incomplete prompts heuristic since they are now impossible
lionel- Oct 31, 2025
e6ec6e4
Extract ReadConsole event loop into method
lionel- Oct 31, 2025
acf96e4
Return to base REPL in case of error
lionel- Oct 31, 2025
30d244b
Rename `eval_pending()` to `eval()`
lionel- Oct 31, 2025
fde428e
Don't clear pending expressions in browser sessions
lionel- Oct 31, 2025
13f1fd0
Tweak docs
lionel- Nov 4, 2025
666a727
Add failing test
lionel- Nov 4, 2025
fb69d6b
`RMain::eval()` doesn't need self ref
lionel- Nov 4, 2025
02a8aa4
Add `harp::exec_with_cleanup()`
lionel- Nov 4, 2025
acf7965
Keep track of nested REPLs and clean up R's state when needed
lionel- Nov 4, 2025
22e21bb
Flush autoprint buffer in case of error
lionel- Nov 5, 2025
6bb6f3b
Use a `PromptKind` enum to discriminate prompts
lionel- Nov 5, 2025
94229c6
Evaluate in current environment
lionel- Nov 5, 2025
8810197
Keep track of console frame via `r_read_console()`
lionel- Nov 5, 2025
83ba148
Clearer documentation and variable name
lionel- Nov 5, 2025
5780f6d
Add shutdown handling to dummy frontend
lionel- Nov 5, 2025
d257e05
Add integration tests for shutdown
lionel- Nov 5, 2025
3ec41cb
Prevent R from asking about saving workspace
lionel- Nov 5, 2025
5e219c2
Shutdown all nested consoles in case of Shutdown request
lionel- Nov 5, 2025
6216051
Send interrupt before shutting down
lionel- Nov 6, 2025
ab9a936
Move signal declaration inside of Unix context
lionel- Nov 6, 2025
cf63dbc
Make the error message test less brittle
lionel- Nov 6, 2025
42f8775
Disable brittle tests
lionel- Nov 6, 2025
6d2f1e8
Opt out of Shutdown tests on Windows
lionel- Nov 6, 2025
492d33c
Improve naming a bit
lionel- Nov 6, 2025
337efde
Don't include backtrace in syntax errors
lionel- Nov 6, 2025
b4839c2
Fix backtraces in special syntax errors
lionel- Nov 7, 2025
a4272d1
Disable error entracing in sensitive tests
lionel- Nov 7, 2025
0b9052c
Extract `FrontendDummy::execute_request()` and variants
lionel- Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions crates/amalthea/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub enum Error {
UnknownCommName(String),
UnknownCommId(String),
InvalidCommMessage(String, String, String),
InvalidInputRequest(String),
InvalidConsoleInput(String),
Anyhow(anyhow::Error),
ShellErrorReply(Exception),
Expand Down Expand Up @@ -196,9 +195,6 @@ impl fmt::Display for Error {
msg, id, err
)
},
Error::InvalidInputRequest(message) => {
write!(f, "{message}")
},
Error::InvalidConsoleInput(message) => {
write!(f, "{message}")
},
Expand Down Expand Up @@ -228,6 +224,6 @@ impl<T: std::fmt::Debug> From<SendError<T>> for Error {
macro_rules! anyhow {
($($rest: expr),*) => {{
let message = anyhow::anyhow!($($rest, )*);
crate::error::Error::Anyhow(message)
$crate::error::Error::Anyhow(message)
}}
}
110 changes: 107 additions & 3 deletions crates/amalthea/src/fixtures/dummy_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::wire::jupyter_message::JupyterMessage;
use crate::wire::jupyter_message::Message;
use crate::wire::jupyter_message::ProtocolMessage;
use crate::wire::jupyter_message::Status;
use crate::wire::shutdown_reply::ShutdownReply;
use crate::wire::shutdown_request::ShutdownRequest;
use crate::wire::status::ExecutionState;
use crate::wire::stream::Stream;
use crate::wire::wire_message::WireMessage;
Expand All @@ -36,7 +38,7 @@ pub struct DummyConnection {
}

pub struct DummyFrontend {
pub _control_socket: Socket,
pub control_socket: Socket,
pub shell_socket: Socket,
pub iopub_socket: Socket,
pub stdin_socket: Socket,
Expand Down Expand Up @@ -132,7 +134,7 @@ impl DummyFrontend {
// the Jupyter specification, these must share a ZeroMQ identity.
let shell_id = rand::thread_rng().gen::<[u8; 16]>();

let _control_socket = Socket::new(
let control_socket = Socket::new(
connection.session.clone(),
connection.ctx.clone(),
String::from("Control"),
Expand Down Expand Up @@ -198,7 +200,7 @@ impl DummyFrontend {
});

Self {
_control_socket,
control_socket,
shell_socket,
iopub_socket,
stdin_socket,
Expand All @@ -207,12 +209,22 @@ impl DummyFrontend {
}
}

/// Sends a Jupyter message on the Control socket; returns the ID of the newly
/// created message
pub fn send_control<T: ProtocolMessage>(&self, msg: T) -> String {
Self::send(&self.control_socket, &self.session, msg)
}

/// Sends a Jupyter message on the Shell socket; returns the ID of the newly
/// created message
pub fn send_shell<T: ProtocolMessage>(&self, msg: T) -> String {
Self::send(&self.shell_socket, &self.session, msg)
}

pub fn send_shutdown_request(&self, restart: bool) -> String {
self.send_control(ShutdownRequest { restart })
}

pub fn send_execute_request(&self, code: &str, options: ExecuteRequestOptions) -> String {
self.send_shell(ExecuteRequest {
code: String::from(code),
Expand All @@ -224,6 +236,77 @@ impl DummyFrontend {
})
}

/// Sends an execute request and handles the standard message flow:
/// busy -> execute_input -> idle -> execute_reply.
/// Asserts that the input code matches and returns the execution count.
#[track_caller]
pub fn execute_request_invisibly(&self, code: &str) -> u32 {
self.send_execute_request(code, ExecuteRequestOptions::default());
self.recv_iopub_busy();

let input = self.recv_iopub_execute_input();
assert_eq!(input.code, code);

self.recv_iopub_idle();

let execution_count = self.recv_shell_execute_reply();
assert_eq!(execution_count, input.execution_count);

execution_count
}

/// Sends an execute request and handles the standard message flow with a result:
/// busy -> execute_input -> execute_result -> idle -> execute_reply.
/// Asserts that the input code matches and passes the result to the callback.
/// Returns the execution count.
#[track_caller]
pub fn execute_request<F>(&self, code: &str, result_check: F) -> u32
where
F: FnOnce(String),
{
self.send_execute_request(code, ExecuteRequestOptions::default());
self.recv_iopub_busy();

let input = self.recv_iopub_execute_input();
assert_eq!(input.code, code);

let result = self.recv_iopub_execute_result();
result_check(result);

self.recv_iopub_idle();

let execution_count = self.recv_shell_execute_reply();
assert_eq!(execution_count, input.execution_count);

execution_count
}

/// Sends an execute request that produces an error and handles the standard message flow:
/// busy -> execute_input -> execute_error -> idle -> execute_reply_exception.
/// Passes the error message to the callback for custom assertions.
/// Returns the execution count.
#[track_caller]
pub fn execute_request_error<F>(&self, code: &str, error_check: F) -> u32
where
F: FnOnce(String),
{
self.send_execute_request(code, ExecuteRequestOptions::default());
self.recv_iopub_busy();

let input = self.recv_iopub_execute_input();
assert_eq!(input.code, code);

let error_msg = self.recv_iopub_execute_error();
error_check(error_msg);

self.recv_iopub_idle();

let execution_count = self.recv_shell_execute_reply_exception();
assert_eq!(execution_count, input.execution_count);

execution_count
}

/// Sends a Jupyter message on the Stdin socket
pub fn send_stdin<T: ProtocolMessage>(&self, msg: T) {
Self::send(&self.stdin_socket, &self.session, msg);
Expand All @@ -236,6 +319,7 @@ impl DummyFrontend {
id
}

#[track_caller]
pub fn recv(socket: &Socket) -> Message {
// It's important to wait with a timeout because the kernel thread might have
// panicked, preventing it from sending the expected message. The tests would then
Expand All @@ -246,28 +330,48 @@ impl DummyFrontend {
//
// Note that the panic hook will still have run to record the panic, so we'll get
// expected panic information in the test output.
//
// If you're debugging tests, you'll need to bump this timeout to a large value.
if socket.poll_incoming(10000).unwrap() {
return Message::read_from_socket(socket).unwrap();
}

panic!("Timeout while expecting message on socket {}", socket.name);
}

/// Receives a Jupyter message from the Control socket
#[track_caller]
pub fn recv_control(&self) -> Message {
Self::recv(&self.control_socket)
}

/// Receives a Jupyter message from the Shell socket
#[track_caller]
pub fn recv_shell(&self) -> Message {
Self::recv(&self.shell_socket)
}

/// Receives a Jupyter message from the IOPub socket
#[track_caller]
pub fn recv_iopub(&self) -> Message {
Self::recv(&self.iopub_socket)
}

/// Receives a Jupyter message from the Stdin socket
#[track_caller]
pub fn recv_stdin(&self) -> Message {
Self::recv(&self.stdin_socket)
}

/// Receive from Control and assert `ShutdownReply` message.
#[track_caller]
pub fn recv_control_shutdown_reply(&self) -> ShutdownReply {
let message = self.recv_control();
assert_matches!(message, Message::ShutdownReply(message) => {
message.content
})
}

/// Receive from Shell and assert `ExecuteReply` message.
/// Returns `execution_count`.
#[track_caller]
Expand Down
5 changes: 5 additions & 0 deletions crates/amalthea/src/socket/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ impl Control {
H: FnOnce(JupyterMessage<T>) -> Result<(), Error>,
{
// Enter the kernel-busy state in preparation for handling the message.
// The protocol specification does not mandate status messages for
// Control, but we emit them for compatibility with ipykernel:
// https://github.com/ipython/ipykernel/pull/585. These status messages
// can be discriminated from those on Shell by examining the parent
// header.
Comment on lines +105 to +109
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure it does mandate status messages for Control

https://jupyter-client.readthedocs.io/en/latest/messaging.html#status

Busy and idle messages should be sent before/after handling every request, not just execution.

If you agree, I think we should remove or tweak this comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I based my comment on this quote.

"not just execution" could just refer to other types of Shell requests, including comm messages. Note that this section is incomplete and does not say anything about the parent header of the status messages, so an implementer might create inadvertently non-nested shell status messages.

I think wrapping control requests in Busy/Idle is a strange choice that unnecessarily complicates the protocol.

if let Err(err) = self.send_state(req.clone(), ExecutionState::Busy) {
warn!("Failed to change kernel status to busy: {err}");
}
Expand Down
6 changes: 6 additions & 0 deletions crates/amalthea/src/wire/jupyter_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::wire::is_complete_reply::IsCompleteReply;
use crate::wire::is_complete_request::IsCompleteRequest;
use crate::wire::kernel_info_request::KernelInfoRequest;
use crate::wire::originator::Originator;
use crate::wire::shutdown_reply::ShutdownReply;
use crate::wire::shutdown_request::ShutdownRequest;
use crate::wire::status::KernelStatus;
use crate::wire::wire_message::WireMessage;
Expand Down Expand Up @@ -101,6 +102,7 @@ pub enum Message {
// Control
InterruptReply(JupyterMessage<InterruptReply>),
InterruptRequest(JupyterMessage<InterruptRequest>),
ShutdownReply(JupyterMessage<ShutdownReply>),
ShutdownRequest(JupyterMessage<ShutdownRequest>),
// Registration
HandshakeRequest(JupyterMessage<HandshakeRequest>),
Expand Down Expand Up @@ -163,6 +165,7 @@ impl TryFrom<&Message> for WireMessage {
Message::IsCompleteRequest(msg) => WireMessage::try_from(msg),
Message::KernelInfoReply(msg) => WireMessage::try_from(msg),
Message::KernelInfoRequest(msg) => WireMessage::try_from(msg),
Message::ShutdownReply(msg) => WireMessage::try_from(msg),
Message::ShutdownRequest(msg) => WireMessage::try_from(msg),
Message::Status(msg) => WireMessage::try_from(msg),
Message::CommInfoReply(msg) => WireMessage::try_from(msg),
Expand Down Expand Up @@ -245,6 +248,9 @@ impl TryFrom<&WireMessage> for Message {
if kind == UpdateDisplayData::message_type() {
return Ok(Message::UpdateDisplayData(JupyterMessage::try_from(msg)?));
}
if kind == ShutdownReply::message_type() {
return Ok(Message::ShutdownReply(JupyterMessage::try_from(msg)?));
}
if kind == ShutdownRequest::message_type() {
return Ok(Message::ShutdownRequest(JupyterMessage::try_from(msg)?));
}
Expand Down
29 changes: 29 additions & 0 deletions crates/amalthea/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use amalthea::wire::comm_info_request::CommInfoRequest;
use amalthea::wire::comm_msg::CommWireMsg;
use amalthea::wire::comm_open::CommOpen;
use amalthea::wire::jupyter_message::Message;
use amalthea::wire::jupyter_message::Status;
use amalthea::wire::kernel_info_request::KernelInfoRequest;
use amalthea::wire::status::ExecutionState;
use assert_matches::assert_matches;
Expand Down Expand Up @@ -63,6 +64,34 @@ fn test_amalthea_execute_request() {
frontend.recv_iopub_idle();
}

#[test]
fn test_amalthea_shutdown_request() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems weird that this test can send a shutdown request twice. But is the argument that it isn't connected to an R backend that actually does something with the shutdown request?

let frontend = DummyAmaltheaFrontend::lock();

// Send a shutdown request with restart = false
frontend.send_shutdown_request(false);

// Shutdown requests generate busy/idle status messages on IOPub
frontend.recv_iopub_busy();

// Receive the shutdown reply
let reply = frontend.recv_control_shutdown_reply();
assert_eq!(reply.status, Status::Ok);
assert_eq!(reply.restart, false);

frontend.recv_iopub_idle();

// Test with restart = true
Comment on lines +71 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these comments add much

frontend.send_shutdown_request(true);
frontend.recv_iopub_busy();

let reply = frontend.recv_control_shutdown_reply();
assert_eq!(reply.status, Status::Ok);
assert_eq!(reply.restart, true);

frontend.recv_iopub_idle();
}

#[test]
fn test_amalthea_input_request() {
let frontend = DummyAmaltheaFrontend::lock();
Expand Down
6 changes: 6 additions & 0 deletions crates/ark/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ impl ControlHandler for Control {
) -> Result<ShutdownReply, Exception> {
log::info!("Received shutdown request: {msg:?}");

// Interrupt any ongoing computation. We shut down from ReadConsole when
// R has become idle again. Note that Positron will have interrupted us
// beforehand, but another frontend might not have, and it's good to
// have this as a defensive measure in any case.
crate::sys::control::handle_interrupt_request();

// According to the Jupyter protocol we should block here until the
// shutdown is complete. However AFAICS ipykernel doesn't wait
// until complete shutdown before replying and instead just signals
Expand Down
8 changes: 6 additions & 2 deletions crates/ark/src/dap/dap_r_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl RMainDap {
self.debugging = false;
}

pub fn handle_stdout(&mut self, content: &str) {
pub fn handle_write_console(&mut self, content: &str) {
if let DebugCallText::Capturing(ref mut call_text) = self.call_text {
// Append to current expression if we are currently capturing stdout
call_text.push_str(content);
Expand All @@ -164,7 +164,11 @@ impl RMainDap {
}
}

pub fn finalize_call_text(&mut self) {
pub fn handle_read_console(&mut self) {
// Upon entering read-console, finalize any debug call text that we were capturing.
// At this point, the user can either advance the debugger, causing us to capture
// a new expression, or execute arbitrary code, where we will reuse a finalized
// debug call text to maintain the debug state.
match &self.call_text {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we had discussed maybe moving call_text into RMain or something. Maybe that's a future task?

// If not debugging, nothing to do.
DebugCallText::None => (),
Expand Down
25 changes: 22 additions & 3 deletions crates/ark/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
//
//

use amalthea::wire::exception::Exception;
use harp::exec::r_peek_error_buffer;
use harp::exec::RE_STACK_OVERFLOW;
use harp::object::RObject;
use harp::r_symbol;
use harp::session::r_format_traceback;
Expand Down Expand Up @@ -37,9 +40,16 @@ unsafe extern "C-unwind" fn ps_record_error(evalue: SEXP, traceback: SEXP) -> an
Vec::<String>::new()
});

main.error_occurred = true;
main.error_message = evalue;
main.error_traceback = traceback;
main.last_error = Some(
// We don't fill out `ename` with anything meaningful because typically
// R errors don't have names. We could consider using the condition class
// here, which r-lib/tidyverse packages have been using more heavily.
Exception {
ename: String::from(""),
evalue,
traceback,
},
);

Ok(R_NilValue)
}
Expand Down Expand Up @@ -67,3 +77,12 @@ unsafe extern "C-unwind" fn ps_rust_backtrace() -> anyhow::Result<SEXP> {
let trace = format!("{trace}");
Ok(*RObject::from(trace))
}

pub(crate) fn stack_overflow_occurred() -> bool {
// Error handlers are not called on stack overflow so the error flag
// isn't set. Instead we detect stack overflows by peeking at the error
// buffer. The message is explicitly not translated to save stack space
// so the matching should be reliable.
let err_buf = r_peek_error_buffer();
RE_STACK_OVERFLOW.is_match(&err_buf)
}
Loading
Loading