Skip to content

Commit f434b29

Browse files
committed
Implement ZeroMQ-based kernel management
1 parent e02dca3 commit f434b29

File tree

10 files changed

+808
-89
lines changed

10 files changed

+808
-89
lines changed

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ dependencies = [
1010
"lsprotocol>=2025.0.0rc1",
1111
"marimo>=0.15.0",
1212
"pygls>=2.0.0a4",
13+
"pyzmq>=27.0.2",
1314
]
1415

1516
[project.scripts]
@@ -55,3 +56,6 @@ ignore = [
5556
"PLC",
5657
"PLR",
5758
]
59+
60+
[tool.uv.sources]
61+
marimo = { path = "../marimo", editable = true }

src/marimo_lsp/kernel_manager.py

Lines changed: 74 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,77 @@
22

33
from __future__ import annotations
44

5+
import json
6+
import subprocess
57
import typing
68

79
from marimo._config.settings import GLOBAL_SETTINGS
8-
from marimo._messaging.types import KernelMessage
910
from marimo._runtime.requests import AppMetadata
1011
from marimo._server.model import SessionMode
1112
from marimo._server.sessions import KernelManager
12-
from marimo._utils.typed_connection import TypedConnection
13+
14+
from marimo_lsp.zeromq.queue_manager import encode_connection_info
1315

1416
if typing.TYPE_CHECKING:
1517
from marimo._config.manager import MarimoConfigManager
16-
from marimo._server.sessions import QueueManager
1718

1819
from marimo_lsp.app_file_manager import LspAppFileManager
19-
20-
21-
def launch_kernel(*args) -> None: # noqa: ANN002
22-
"""Launch the marimo kernel with the correct Python environment.
23-
24-
Runs inside a `multiprocessing.Process` spawned with `ctx.set_executable()`.
25-
26-
However, multiprocessing reconstructs the parent's `sys.path`, overriding the
27-
venv's paths. We fix this by querying sys.executable (which IS correctly set)
28-
for its natural `sys.path` and replacing ours before importing marimo.
29-
"""
30-
import json # noqa: PLC0415
31-
import subprocess # noqa: PLC0415
32-
import sys # noqa: PLC0415
33-
34-
# Get the natural sys.path from the venv's Python interpreter
35-
# sys.executable is correctly set to the venv's Python thanks to set_executable()
36-
result = subprocess.run( # noqa: S603
37-
[sys.executable, "-c", "import sys, json; print(json.dumps(sys.path))"],
38-
capture_output=True,
20+
from marimo_lsp.zeromq.queue_manager import ConnectionInfo, ZeroMqQueueManager
21+
22+
23+
def launch_kernel_subprocess(
24+
executable: str,
25+
connection_info: ConnectionInfo,
26+
configs: dict,
27+
app_metadata: AppMetadata,
28+
config_manager: MarimoConfigManager,
29+
*,
30+
virtual_files_supported: bool,
31+
redirect_console_to_browser: bool,
32+
profile_path: str | None,
33+
) -> subprocess.Popen:
34+
"""Launch kernel as a subprocess with ZeroMQ IPC."""
35+
36+
# Prepare kernel arguments
37+
kernel_args = {
38+
"configs": configs,
39+
"app_metadata": {
40+
"query_params": app_metadata.query_params,
41+
"filename": app_metadata.filename,
42+
"cli_args": app_metadata.cli_args,
43+
"argv": app_metadata.argv,
44+
"app_config": app_metadata.app_config,
45+
},
46+
"user_config": config_manager.get_config(hide_secrets=False),
47+
"virtual_files_supported": virtual_files_supported,
48+
"redirect_console_to_browser": redirect_console_to_browser,
49+
"profile_path": profile_path,
50+
"log_level": GLOBAL_SETTINGS.LOG_LEVEL,
51+
}
52+
53+
# Launch kernel subprocess
54+
kernel_cmd = [
55+
executable,
56+
"-m",
57+
"marimo_lsp.zeromq.kernel_server",
58+
]
59+
60+
process = subprocess.Popen(
61+
kernel_cmd,
62+
stdin=subprocess.PIPE,
63+
stdout=subprocess.PIPE,
64+
stderr=subprocess.PIPE,
3965
text=True,
40-
check=True,
4166
)
4267

43-
# Replace the inherited (wrong) sys.path with the venv's natural paths
44-
sys.path = json.loads(result.stdout)
68+
assert process.stdin, "Expected stdin"
4569

46-
# Now we can import marimo from the correct environment
47-
from marimo._runtime import runtime # noqa: PLC0415
70+
process.stdin.write(encode_connection_info(connection_info) + "\n")
71+
process.stdin.write(json.dumps(kernel_args) + "\n")
72+
process.stdin.flush()
73+
process.stdin.close()
4874

49-
runtime.launch_kernel(*args)
75+
return process
5076

5177

5278
class LspKernelManager(KernelManager):
@@ -56,7 +82,8 @@ def __init__(
5682
self,
5783
*,
5884
executable: str,
59-
queue_manager: QueueManager,
85+
connection_info: ConnectionInfo,
86+
queue_manager: ZeroMqQueueManager,
6087
app_file_manager: LspAppFileManager,
6188
config_manager: MarimoConfigManager,
6289
) -> None:
@@ -75,52 +102,26 @@ def __init__(
75102
redirect_console_to_browser=False,
76103
virtual_files_supported=False,
77104
)
105+
self.kernel_process = None
78106
self.executable = executable
107+
self.connection_info = connection_info
79108

80109
def start_kernel(self) -> None:
81-
"""Start an instance of the marimo kernel."""
82-
import multiprocessing as mp # noqa: PLC0415
83-
from multiprocessing import connection # noqa: PLC0415
84-
85-
# We use a process in edit mode so that we can interrupt the app
86-
# with a SIGINT; we don't mind the additional memory consumption,
87-
# since there's only one client sess
88-
is_edit_mode = self.mode == SessionMode.EDIT
89-
90-
# Need to use a socket for windows compatibility
91-
listener = connection.Listener(family="AF_INET")
92-
93-
ctx = mp.get_context("spawn")
94-
ctx.set_executable(self.executable)
95-
96-
kernel_task = ctx.Process(
97-
target=launch_kernel,
98-
args=(
99-
self.queue_manager.control_queue,
100-
self.queue_manager.set_ui_element_queue,
101-
self.queue_manager.completion_queue,
102-
self.queue_manager.input_queue,
103-
# stream queue unused
104-
None,
105-
listener.address,
106-
is_edit_mode,
107-
self.configs,
108-
self.app_metadata,
109-
self.config_manager.get_config(hide_secrets=False),
110-
self._virtual_files_supported,
111-
self.redirect_console_to_browser,
112-
self.queue_manager.win32_interrupt_queue,
113-
self.profile_path,
114-
GLOBAL_SETTINGS.LOG_LEVEL,
115-
),
116-
# The process can't be a daemon, because daemonic processes
117-
# can't create children
118-
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.daemon # noqa: E501
119-
daemon=False,
110+
"""Start an instance of the marimo kernel using ZeroMQ IPC."""
111+
# Launch kernel subprocess with ZeroMQ
112+
self.kernel_process = launch_kernel_subprocess(
113+
executable=self.executable,
114+
connection_info=self.connection_info,
115+
configs=self.configs,
116+
app_metadata=self.app_metadata,
117+
config_manager=self.config_manager,
118+
virtual_files_supported=self._virtual_files_supported,
119+
redirect_console_to_browser=self.redirect_console_to_browser,
120+
profile_path=getattr(self, "profile_path", None),
120121
)
121122

122-
kernel_task.start()
123+
# Create IOPub connection for receiving kernel messages
124+
self._read_conn = self.queue_manager.create_iopub_connection(for_kernel=False)
123125

124-
self.kernel_task = kernel_task
125-
# First thing kernel does is connect to the socket, so it's safe to call accept
126-
self._read_conn = TypedConnection[KernelMessage].of(listener.accept())
126+
# Store process handle (compatible with mp.Process interface)
127+
self.kernel_task = self.kernel_process

src/marimo_lsp/session_consumer.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
from __future__ import annotations
44

5+
import json
56
from typing import TYPE_CHECKING
67

7-
from marimo._messaging.ops import MessageOperation, serialize
8+
from marimo._messaging.msgspec_encoder import asdict
89
from marimo._server.model import ConnectionState, SessionConsumer
910
from marimo._types.ids import ConsumerId
1011

@@ -13,6 +14,7 @@
1314
if TYPE_CHECKING:
1415
from collections.abc import Callable
1516

17+
from marimo._messaging.ops import MessageOperation
1618
from marimo._messaging.types import KernelMessage
1719
from pygls.lsp.server import LanguageServer
1820

@@ -41,11 +43,15 @@ def on_start(self) -> Callable[[KernelMessage], None]:
4143
def handle_message(msg: KernelMessage) -> None:
4244
"""Forward kernel message over LSP."""
4345
try:
44-
op_name, data = msg
45-
46+
op_name, msg_bytes = msg
4647
self.server.protocol.notify(
4748
"marimo/operation",
48-
{"notebookUri": self.notebook_uri, "op": op_name, "data": data},
49+
{
50+
"notebookUri": self.notebook_uri,
51+
"op": op_name,
52+
# TODO: Just send bytes directly
53+
"data": json.loads(msg_bytes),
54+
},
4955
)
5056

5157
logger.debug(f"Forwarded {op_name} to {self.notebook_uri}")
@@ -66,7 +72,7 @@ def write_operation(self, op: MessageOperation) -> None:
6672
{
6773
"notebookUri": self.notebook_uri,
6874
"op": op.name,
69-
"data": serialize(op),
75+
"data": asdict(op),
7076
},
7177
)
7278
logger.debug(f"Sent {op.name} operation to {self.notebook_uri}")

src/marimo_lsp/session_manager.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,17 @@
99
from marimo._config.manager import (
1010
get_default_config_manager,
1111
)
12-
from marimo._server.sessions import KernelManager, QueueManager, Session
12+
from marimo._server.sessions import Session
1313

1414
from marimo_lsp.app_file_manager import LspAppFileManager
1515
from marimo_lsp.kernel_manager import LspKernelManager
1616
from marimo_lsp.loggers import get_logger
1717
from marimo_lsp.session_consumer import LspSessionConsumer
18+
from marimo_lsp.zeromq.queue_manager import ZeroMqQueueManager
1819

1920
if typing.TYPE_CHECKING:
2021
from marimo._server.file_manager import AppFileManager
21-
from marimo._server.sessions import KernelManager
22+
from marimo._server.sessions import KernelManager, QueueManager
2223
from pygls.lsp.server import LanguageServer
2324

2425

@@ -66,22 +67,24 @@ def create_session(self, *, server: LanguageServer, notebook_uri: str) -> Sessio
6667

6768
app_file_manager = LspAppFileManager(server=server, notebook_uri=notebook_uri)
6869
config_manager = get_default_config_manager(current_path=app_file_manager.path)
69-
queue_manager = QueueManager(use_multiprocessing=True)
70+
71+
queue_manager, connection_info = ZeroMqQueueManager.create_host()
7072

7173
kernel_manager = LspKernelManager(
7274
# TODO: Get executable
7375
executable=sys.executable,
7476
queue_manager=queue_manager,
7577
app_file_manager=app_file_manager,
7678
config_manager=config_manager,
79+
connection_info=connection_info,
7780
)
7881

7982
logger.info(f"Creating new session for {notebook_uri}")
8083

8184
session = Session(
8285
initialization_id=str(uuid4()),
8386
session_consumer=LspSessionConsumer(server, notebook_uri),
84-
queue_manager=queue_manager,
87+
queue_manager=typing.cast("QueueManager", queue_manager),
8588
kernel_manager=typing.cast("KernelManager", kernel_manager),
8689
app_file_manager=typing.cast("AppFileManager", app_file_manager),
8790
config_manager=config_manager,

src/marimo_lsp/zeromq/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""ZeroMQ-based IPC implementation for marimo-lsp."""
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Standalone kernel server entry point for ZeroMQ-based IPC."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
import sys
7+
import threading
8+
from multiprocessing import connection
9+
10+
from marimo._config.settings import GLOBAL_SETTINGS
11+
from marimo._runtime import runtime
12+
from marimo._runtime.requests import AppMetadata
13+
14+
from marimo_lsp.zeromq.queue_manager import ZeroMqQueueManager, decode_connection_info
15+
from marimo_lsp.zeromq.zmq_adapter import create_zmq_to_pipe_adapter
16+
17+
18+
def main() -> None:
19+
"""Launch a marimo kernel using ZeroMQ for IPC.
20+
21+
This function is the entry point for the kernel subprocess.
22+
It reads connection information from stdin and sets up ZeroMQ
23+
queues that proxy to marimo's internal kernel.
24+
"""
25+
# Read connection info from stdin
26+
connection_info_str = sys.stdin.readline().strip()
27+
if not connection_info_str:
28+
print("No connection info received", file=sys.stderr)
29+
sys.exit(1)
30+
31+
try:
32+
connection_info = decode_connection_info(connection_info_str)
33+
except json.JSONDecodeError as e:
34+
print(f"Failed to parse connection info: {e}", file=sys.stderr)
35+
sys.exit(1)
36+
37+
# Read additional kernel arguments from stdin
38+
kernel_args_str = sys.stdin.readline().strip()
39+
if not kernel_args_str:
40+
print("No kernel arguments received", file=sys.stderr)
41+
sys.exit(1)
42+
43+
try:
44+
kernel_args = json.loads(kernel_args_str)
45+
except json.JSONDecodeError as e:
46+
print(f"Failed to parse kernel arguments: {e}", file=sys.stderr)
47+
sys.exit(1)
48+
49+
queue_manager = ZeroMqQueueManager.from_connection_info(connection_info)
50+
51+
listener = connection.Listener(family="AF_INET")
52+
53+
adapter_thread = threading.Thread(
54+
target=create_zmq_to_pipe_adapter, args=(queue_manager, listener), daemon=True
55+
)
56+
adapter_thread.start()
57+
58+
runtime.launch_kernel(
59+
control_queue=queue_manager.control_queue,
60+
set_ui_element_queue=queue_manager.set_ui_element_queue,
61+
completion_queue=queue_manager.completion_queue,
62+
input_queue=queue_manager.input_queue,
63+
stream_queue=None, # Not used with ZeroMQ
64+
socket_addr=listener.address,
65+
is_edit_mode=True,
66+
profile_path=kernel_args.get("profile_path"),
67+
log_level=kernel_args.get("log_level", GLOBAL_SETTINGS.LOG_LEVEL),
68+
interrupt_queue=queue_manager.win32_interrupt_queue,
69+
configs=kernel_args["configs"],
70+
app_metadata=AppMetadata(**kernel_args["app_metadata"]),
71+
user_config=kernel_args["user_config"],
72+
virtual_files_supported=kernel_args.get("virtual_files_supported", False),
73+
redirect_console_to_browser=kernel_args.get(
74+
"redirect_console_to_browser", False
75+
),
76+
)
77+
78+
queue_manager.close_queues()
79+
80+
81+
if __name__ == "__main__":
82+
main()

0 commit comments

Comments
 (0)