Skip to content

Commit 68e5029

Browse files
committed
Implement ZeroMQ-based kernel management
1 parent e02dca3 commit 68e5029

File tree

11 files changed

+931
-80
lines changed

11 files changed

+931
-80
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ dependencies = [
1010
"lsprotocol>=2025.0.0rc1",
1111
"marimo>=0.15.0",
1212
"pygls>=2.0.0a4",
13+
"pyzmq>=27.0.2",
1314
]
1415

1516
[project.scripts]

src/marimo_lsp/kernel_manager.py

Lines changed: 78 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,51 +2,78 @@
22

33
from __future__ import annotations
44

5+
import json
6+
import subprocess
57
import typing
68

79
from marimo._config.settings import GLOBAL_SETTINGS
8-
from marimo._messaging.types import KernelMessage
910
from marimo._runtime.requests import AppMetadata
1011
from marimo._server.model import SessionMode
1112
from marimo._server.sessions import KernelManager
12-
from marimo._utils.typed_connection import TypedConnection
1313

1414
if typing.TYPE_CHECKING:
1515
from marimo._config.manager import MarimoConfigManager
16-
from marimo._server.sessions import QueueManager
1716

1817
from marimo_lsp.app_file_manager import LspAppFileManager
19-
20-
21-
def launch_kernel(*args) -> None: # noqa: ANN002
22-
"""Launch the marimo kernel with the correct Python environment.
23-
24-
Runs inside a `multiprocessing.Process` spawned with `ctx.set_executable()`.
25-
26-
However, multiprocessing reconstructs the parent's `sys.path`, overriding the
27-
venv's paths. We fix this by querying sys.executable (which IS correctly set)
28-
for its natural `sys.path` and replacing ours before importing marimo.
29-
"""
30-
import json # noqa: PLC0415
31-
import subprocess # noqa: PLC0415
32-
import sys # noqa: PLC0415
33-
34-
# Get the natural sys.path from the venv's Python interpreter
35-
# sys.executable is correctly set to the venv's Python thanks to set_executable()
36-
result = subprocess.run( # noqa: S603
37-
[sys.executable, "-c", "import sys, json; print(json.dumps(sys.path))"],
38-
capture_output=True,
18+
from marimo_lsp.zeromq.queue_manager import ZeroMQQueueManager
19+
20+
21+
def launch_kernel_subprocess(
22+
executable: str,
23+
queue_manager: "ZeroMQQueueManager",
24+
configs: dict,
25+
app_metadata: AppMetadata,
26+
config_manager: "MarimoConfigManager",
27+
virtual_files_supported: bool,
28+
redirect_console_to_browser: bool,
29+
profile_path: str | None,
30+
) -> subprocess.Popen:
31+
"""Launch kernel as a subprocess with ZeroMQ IPC."""
32+
from marimo_lsp.zeromq.queue_manager import ZeroMQQueueManager
33+
34+
# Get connection info for kernel
35+
connection_info = queue_manager.get_connection_info()
36+
37+
# Prepare kernel arguments
38+
kernel_args = {
39+
"configs": configs,
40+
"app_metadata": {
41+
"query_params": app_metadata.query_params,
42+
"filename": app_metadata.filename,
43+
"cli_args": app_metadata.cli_args,
44+
"argv": app_metadata.argv,
45+
"app_config": app_metadata.app_config,
46+
},
47+
"user_config": config_manager.get_config(hide_secrets=False),
48+
"virtual_files_supported": virtual_files_supported,
49+
"redirect_console_to_browser": redirect_console_to_browser,
50+
"profile_path": profile_path,
51+
"log_level": GLOBAL_SETTINGS.LOG_LEVEL,
52+
}
53+
54+
# Launch kernel subprocess
55+
kernel_cmd = [
56+
executable,
57+
"-m",
58+
"marimo_lsp.zeromq.kernel_server",
59+
]
60+
61+
process = subprocess.Popen(
62+
kernel_cmd,
63+
stdin=subprocess.PIPE,
64+
stdout=subprocess.PIPE,
65+
stderr=subprocess.PIPE,
3966
text=True,
40-
check=True,
4167
)
42-
43-
# Replace the inherited (wrong) sys.path with the venv's natural paths
44-
sys.path = json.loads(result.stdout)
45-
46-
# Now we can import marimo from the correct environment
47-
from marimo._runtime import runtime # noqa: PLC0415
48-
49-
runtime.launch_kernel(*args)
68+
69+
# Send connection info and kernel args via stdin
70+
if process.stdin:
71+
process.stdin.write(json.dumps(connection_info) + "\n")
72+
process.stdin.write(json.dumps(kernel_args) + "\n")
73+
process.stdin.flush()
74+
process.stdin.close()
75+
76+
return process
5077

5178

5279
class LspKernelManager(KernelManager):
@@ -56,7 +83,7 @@ def __init__(
5683
self,
5784
*,
5885
executable: str,
59-
queue_manager: QueueManager,
86+
queue_manager: typing.Any, # Can be QueueManager or ZeroMQQueueManager
6087
app_file_manager: LspAppFileManager,
6188
config_manager: MarimoConfigManager,
6289
) -> None:
@@ -76,51 +103,24 @@ def __init__(
76103
virtual_files_supported=False,
77104
)
78105
self.executable = executable
106+
self.kernel_process = None
79107

80108
def start_kernel(self) -> None:
81-
"""Start an instance of the marimo kernel."""
82-
import multiprocessing as mp # noqa: PLC0415
83-
from multiprocessing import connection # noqa: PLC0415
84-
85-
# We use a process in edit mode so that we can interrupt the app
86-
# with a SIGINT; we don't mind the additional memory consumption,
87-
# since there's only one client sess
88-
is_edit_mode = self.mode == SessionMode.EDIT
89-
90-
# Need to use a socket for windows compatibility
91-
listener = connection.Listener(family="AF_INET")
92-
93-
ctx = mp.get_context("spawn")
94-
ctx.set_executable(self.executable)
95-
96-
kernel_task = ctx.Process(
97-
target=launch_kernel,
98-
args=(
99-
self.queue_manager.control_queue,
100-
self.queue_manager.set_ui_element_queue,
101-
self.queue_manager.completion_queue,
102-
self.queue_manager.input_queue,
103-
# stream queue unused
104-
None,
105-
listener.address,
106-
is_edit_mode,
107-
self.configs,
108-
self.app_metadata,
109-
self.config_manager.get_config(hide_secrets=False),
110-
self._virtual_files_supported,
111-
self.redirect_console_to_browser,
112-
self.queue_manager.win32_interrupt_queue,
113-
self.profile_path,
114-
GLOBAL_SETTINGS.LOG_LEVEL,
115-
),
116-
# The process can't be a daemon, because daemonic processes
117-
# can't create children
118-
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.daemon # noqa: E501
119-
daemon=False,
109+
"""Start an instance of the marimo kernel using ZeroMQ IPC."""
110+
# Launch kernel subprocess with ZeroMQ
111+
self.kernel_process = launch_kernel_subprocess(
112+
executable=self.executable,
113+
queue_manager=self.queue_manager,
114+
configs=self.configs,
115+
app_metadata=self.app_metadata,
116+
config_manager=self.config_manager,
117+
virtual_files_supported=self._virtual_files_supported,
118+
redirect_console_to_browser=self.redirect_console_to_browser,
119+
profile_path=getattr(self, "profile_path", None),
120120
)
121-
122-
kernel_task.start()
123-
124-
self.kernel_task = kernel_task
125-
# First thing kernel does is connect to the socket, so it's safe to call accept
126-
self._read_conn = TypedConnection[KernelMessage].of(listener.accept())
121+
122+
# Create IOPub connection for receiving kernel messages
123+
self._read_conn = self.queue_manager.create_iopub_connection(for_kernel=False)
124+
125+
# Store process handle (compatible with mp.Process interface)
126+
self.kernel_task = self.kernel_process

src/marimo_lsp/session_manager.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from marimo._config.manager import (
1010
get_default_config_manager,
1111
)
12-
from marimo._server.sessions import KernelManager, QueueManager, Session
12+
from marimo._server.sessions import Session
1313

1414
from marimo_lsp.app_file_manager import LspAppFileManager
1515
from marimo_lsp.kernel_manager import LspKernelManager
@@ -66,7 +66,10 @@ def create_session(self, *, server: LanguageServer, notebook_uri: str) -> Sessio
6666

6767
app_file_manager = LspAppFileManager(server=server, notebook_uri=notebook_uri)
6868
config_manager = get_default_config_manager(current_path=app_file_manager.path)
69-
queue_manager = QueueManager(use_multiprocessing=True)
69+
70+
# Always use ZeroMQQueueManager for cross-Python-version support
71+
from marimo_lsp.zeromq.queue_manager import ZeroMQQueueManager
72+
queue_manager = ZeroMQQueueManager(is_kernel_side=False)
7073

7174
kernel_manager = LspKernelManager(
7275
# TODO: Get executable

src/marimo_lsp/zeromq/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
"""ZeroMQ-based IPC implementation for marimo-lsp."""
2+
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""Standalone kernel server entry point for ZeroMQ-based IPC."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
import sys
7+
import threading
8+
from multiprocessing import connection
9+
10+
from marimo._config.settings import GLOBAL_SETTINGS
11+
from marimo._runtime import runtime
12+
from marimo._runtime.requests import AppMetadata
13+
14+
from marimo_lsp.zeromq.queue_manager import ZeroMQQueueManager
15+
from marimo_lsp.zeromq.zmq_adapter import create_zmq_to_pipe_adapter
16+
17+
18+
def main() -> None:
19+
"""Launch a marimo kernel using ZeroMQ for IPC.
20+
21+
This function is the entry point for the kernel subprocess.
22+
It reads connection information from stdin and sets up ZeroMQ
23+
queues that proxy to marimo's internal kernel.
24+
"""
25+
# Read connection info from stdin
26+
connection_info_str = sys.stdin.readline().strip()
27+
if not connection_info_str:
28+
print("No connection info received", file=sys.stderr)
29+
sys.exit(1)
30+
31+
try:
32+
connection_info = json.loads(connection_info_str)
33+
except json.JSONDecodeError as e:
34+
print(f"Failed to parse connection info: {e}", file=sys.stderr)
35+
sys.exit(1)
36+
37+
# Read additional kernel arguments from stdin
38+
kernel_args_str = sys.stdin.readline().strip()
39+
if not kernel_args_str:
40+
print("No kernel arguments received", file=sys.stderr)
41+
sys.exit(1)
42+
43+
try:
44+
kernel_args = json.loads(kernel_args_str)
45+
except json.JSONDecodeError as e:
46+
print(f"Failed to parse kernel arguments: {e}", file=sys.stderr)
47+
sys.exit(1)
48+
49+
# Create ZeroMQ queue manager for kernel side
50+
queue_manager = ZeroMQQueueManager(is_kernel_side=True)
51+
queue_manager.connect_kernel(connection_info)
52+
53+
# Create a pipe adapter that bridges ZeroMQ IOPub to TypedConnection
54+
listener = connection.Listener(family="AF_INET")
55+
56+
adapter_thread = threading.Thread(
57+
target=create_zmq_to_pipe_adapter, args=(queue_manager, listener), daemon=True
58+
)
59+
adapter_thread.start()
60+
61+
runtime.launch_kernel(
62+
control_queue=queue_manager.control_queue,
63+
set_ui_element_queue=queue_manager.set_ui_element_queue,
64+
completion_queue=queue_manager.completion_queue,
65+
input_queue=queue_manager.input_queue,
66+
stream_queue=None, # Not used with ZeroMQ
67+
socket_addr=listener.address,
68+
is_edit_mode=True,
69+
profile_path=kernel_args.get("profile_path"),
70+
log_level=kernel_args.get("log_level", GLOBAL_SETTINGS.LOG_LEVEL),
71+
interrupt_queue=queue_manager.win32_interrupt_queue,
72+
configs=kernel_args["configs"],
73+
app_metadata=AppMetadata(**kernel_args["app_metadata"]),
74+
user_config=kernel_args["user_config"],
75+
virtual_files_supported=kernel_args.get("virtual_files_supported", False),
76+
redirect_console_to_browser=kernel_args.get(
77+
"redirect_console_to_browser", False
78+
),
79+
)
80+
81+
queue_manager.close_queues()
82+
83+
84+
if __name__ == "__main__":
85+
main()
86+

0 commit comments

Comments
 (0)