From 2deb3d9161e26639dccddd4d3173d034fb44098c Mon Sep 17 00:00:00 2001 From: Myles Scolnick Date: Thu, 23 Oct 2025 16:53:12 -0400 Subject: [PATCH 1/2] improvement: implement ReconnectingWebSocketTransport for improved WebSocket handling Added a new ReconnectingWebSocketTransport class to manage WebSocket connections with automatic reconnection capabilities. This can happen whenever a computer goes to sleep Fixes #6903 --- .../src/core/codemirror/lsp/transports.ts | 20 +- .../src/core/lsp/__tests__/transport.test.ts | 233 ++++++++++++++++++ frontend/src/core/lsp/transport.ts | 138 +++++++++++ 3 files changed, 378 insertions(+), 13 deletions(-) create mode 100644 frontend/src/core/lsp/__tests__/transport.test.ts create mode 100644 frontend/src/core/lsp/transport.ts diff --git a/frontend/src/core/codemirror/lsp/transports.ts b/frontend/src/core/codemirror/lsp/transports.ts index b539cda89b9..65ec42ce9ba 100644 --- a/frontend/src/core/codemirror/lsp/transports.ts +++ b/frontend/src/core/codemirror/lsp/transports.ts @@ -1,5 +1,5 @@ /* Copyright 2024 Marimo. All rights reserved. */ -import { WebSocketTransport } from "@open-rpc/client-js"; +import { ReconnectingWebSocketTransport } from "@/core/lsp/transport"; import { waitForConnectionOpen } from "../../network/connection"; import { getRuntimeManager } from "../../runtime/config"; @@ -16,16 +16,10 @@ export function createTransport( serverName: "pylsp" | "basedpyright" | "copilot" | "ty", ) { const runtimeManager = getRuntimeManager(); - const transport = new WebSocketTransport( - runtimeManager.getLSPURL(serverName).toString(), - ); - - // Override connect to ensure runtime is healthy - const originalConnect = transport.connect.bind(transport); - transport.connect = async () => { - await waitForConnectionOpen(); - return originalConnect(); - }; - - return transport; + return new ReconnectingWebSocketTransport({ + getWsUrl: () => runtimeManager.getLSPURL(serverName).toString(), + waitForConnection: async () => { + await waitForConnectionOpen(); + }, + }); } diff --git a/frontend/src/core/lsp/__tests__/transport.test.ts b/frontend/src/core/lsp/__tests__/transport.test.ts new file mode 100644 index 00000000000..a59c286154b --- /dev/null +++ b/frontend/src/core/lsp/__tests__/transport.test.ts @@ -0,0 +1,233 @@ +/* Copyright 2024 Marimo. All rights reserved. */ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { WebSocketTransport } from "@open-rpc/client-js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { Mocks } from "@/__mocks__/common"; +import { ReconnectingWebSocketTransport } from "../transport"; + +// Mock the Logger +vi.mock("@/utils/Logger", () => ({ + Logger: Mocks.logger(), +})); + +// Mock the WebSocketTransport +vi.mock("@open-rpc/client-js", () => { + const mockWebSocketTransport = vi.fn(); + mockWebSocketTransport.prototype.connect = vi.fn(); + mockWebSocketTransport.prototype.close = vi.fn(); + mockWebSocketTransport.prototype.sendData = vi.fn(); + + return { + WebSocketTransport: mockWebSocketTransport, + }; +}); + +describe("ReconnectingWebSocketTransport", () => { + const mockWsUrl = "ws://localhost:8080/lsp"; + let mockConnection: any; + + beforeEach(() => { + vi.clearAllMocks(); + + // Create a mock WebSocket connection with readyState + mockConnection = { + readyState: WebSocket.OPEN, + }; + + // Mock the WebSocketTransport constructor to set the connection + (WebSocketTransport as any).mockImplementation(function (this: any) { + this.connection = mockConnection; + this.connect = vi.fn().mockResolvedValue(undefined); + this.close = vi.fn(); + this.sendData = vi.fn().mockResolvedValue({ result: "success" }); + }); + }); + + it("should create a transport with the provided URL function", () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const transport = new ReconnectingWebSocketTransport({ getWsUrl }); + + expect(transport).toBeDefined(); + expect(getWsUrl).not.toHaveBeenCalled(); // URL function not called until connect + }); + + it("should connect successfully", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const transport = new ReconnectingWebSocketTransport({ getWsUrl }); + + await transport.connect(); + + expect(getWsUrl).toHaveBeenCalledTimes(1); + expect(WebSocketTransport).toHaveBeenCalledWith(mockWsUrl); + }); + + it("should wait for connection before connecting", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const waitForConnection = vi.fn().mockResolvedValue(undefined); + const transport = new ReconnectingWebSocketTransport({ + getWsUrl, + waitForConnection, + }); + + await transport.connect(); + + expect(waitForConnection).toHaveBeenCalledTimes(1); + expect(getWsUrl).toHaveBeenCalledTimes(1); + }); + + it("should reuse the same connection promise if already connecting", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const waitForConnection = vi + .fn() + .mockImplementation( + () => new Promise((resolve) => setTimeout(resolve, 100)), + ); + const transport = new ReconnectingWebSocketTransport({ + getWsUrl, + waitForConnection, + }); + + // Start two connections concurrently + const promise1 = transport.connect(); + const promise2 = transport.connect(); + + await Promise.all([promise1, promise2]); + + // Should only create one delegate + expect(WebSocketTransport).toHaveBeenCalledTimes(1); + expect(waitForConnection).toHaveBeenCalledTimes(1); + }); + + it("should send data successfully when connected", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const transport = new ReconnectingWebSocketTransport({ getWsUrl }); + + await transport.connect(); + + const data: any = { method: "test", params: [] }; + const result = await transport.sendData(data, 5000); + + expect(result).toEqual({ result: "success" }); + }); + + it("should reconnect when WebSocket is in CLOSED state", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const transport = new ReconnectingWebSocketTransport({ getWsUrl }); + + // First connection + await transport.connect(); + expect(WebSocketTransport).toHaveBeenCalledTimes(1); + + // Simulate WebSocket closing + mockConnection.readyState = WebSocket.CLOSED; + + // Send data should trigger reconnection + const data: any = { method: "test", params: [] }; + await transport.sendData(data, 5000); + + // Should have created a new WebSocketTransport + expect(WebSocketTransport).toHaveBeenCalledTimes(2); + }); + + it("should reconnect when WebSocket is in CLOSING state", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const transport = new ReconnectingWebSocketTransport({ getWsUrl }); + + // First connection + await transport.connect(); + expect(WebSocketTransport).toHaveBeenCalledTimes(1); + + // Simulate WebSocket closing + mockConnection.readyState = WebSocket.CLOSING; + + // Send data should trigger reconnection + const data: any = { method: "test", params: [] }; + await transport.sendData(data, 5000); + + // Should have created a new WebSocketTransport + expect(WebSocketTransport).toHaveBeenCalledTimes(2); + }); + + it("should close the transport and prevent reconnection", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const transport = new ReconnectingWebSocketTransport({ getWsUrl }); + + await transport.connect(); + transport.close(); + + // Attempting to connect again should throw + await expect(transport.connect()).rejects.toThrow("Transport is closed"); + }); + + it("should close old delegate when creating a new one", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const transport = new ReconnectingWebSocketTransport({ getWsUrl }); + + // First connection + await transport.connect(); + const firstDelegate = (transport as any).delegate; + expect(firstDelegate).toBeDefined(); + + // Simulate connection closed + mockConnection.readyState = WebSocket.CLOSED; + + // Reconnect by sending data + const data: any = { method: "test", params: [] }; + await transport.sendData(data, 5000); + + // Old delegate should have been closed + expect(firstDelegate.close).toHaveBeenCalled(); + }); + + it("should handle connection failures gracefully", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const connectionError = new Error("Connection failed"); + + // Mock connect to fail + (WebSocketTransport as any).mockImplementationOnce(function (this: any) { + this.connection = mockConnection; + this.connect = vi.fn().mockRejectedValue(connectionError); + this.close = vi.fn(); + this.sendData = vi.fn(); + }); + + const transport = new ReconnectingWebSocketTransport({ getWsUrl }); + + await expect(transport.connect()).rejects.toThrow("Connection failed"); + + // Delegate should be cleared after failure + expect((transport as any).delegate).toBeUndefined(); + }); + + it("should handle waitForConnection failures", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const waitError = new Error("Wait failed"); + const waitForConnection = vi.fn().mockRejectedValue(waitError); + + const transport = new ReconnectingWebSocketTransport({ + getWsUrl, + waitForConnection, + }); + + await expect(transport.connect()).rejects.toThrow("Wait failed"); + + // Should not have created a delegate + expect(WebSocketTransport).not.toHaveBeenCalled(); + }); + + it("should automatically reconnect on sendData after connection loss", async () => { + const getWsUrl = vi.fn(() => mockWsUrl); + const transport = new ReconnectingWebSocketTransport({ getWsUrl }); + + // Don't connect initially + // Simulate WebSocket in closed state (no delegate exists) + expect((transport as any).delegate).toBeUndefined(); + + // Send data should trigger automatic connection + const data: any = { method: "test", params: [] }; + await transport.sendData(data, 5000); + + expect(WebSocketTransport).toHaveBeenCalledTimes(1); + }); +}); diff --git a/frontend/src/core/lsp/transport.ts b/frontend/src/core/lsp/transport.ts new file mode 100644 index 00000000000..a261480ced5 --- /dev/null +++ b/frontend/src/core/lsp/transport.ts @@ -0,0 +1,138 @@ +/* Copyright 2024 Marimo. All rights reserved. */ +import { WebSocketTransport } from "@open-rpc/client-js"; +import type { JSONRPCRequestData } from "@open-rpc/client-js/build/Request"; +import { Transport } from "@open-rpc/client-js/build/transports/Transport"; +import { Logger } from "@/utils/Logger"; + +export interface ReconnectingWebSocketTransportOptions { + /** + * Function that returns the WebSocket URL to connect to. + */ + getWsUrl: () => string; + + /** + * Optional function to wait for before attempting to connect. + * This is useful for ensuring dependencies (like the runtime) are ready. + */ + waitForConnection?: () => Promise; +} + +/** + * A WebSocket transport that automatically reconnects when the connection is lost. + * This handles cases like computer sleep/wake or network interruptions. + */ +export class ReconnectingWebSocketTransport extends Transport { + private delegate: WebSocketTransport | undefined; + private readonly options: ReconnectingWebSocketTransportOptions; + private connectionPromise: Promise | undefined; + private isClosed = false; + + constructor(options: ReconnectingWebSocketTransportOptions) { + super(); + this.options = options; + this.delegate = undefined; + } + + /** + * Create a new WebSocket delegate, replacing any existing one. + */ + private createDelegate(): WebSocketTransport { + // Close the old delegate if it exists + if (this.delegate) { + try { + this.delegate.close(); + } catch (error) { + Logger.warn("Error closing old WebSocket delegate", error); + } + } + + // Create a new delegate + this.delegate = new WebSocketTransport(this.options.getWsUrl()); + return this.delegate; + } + + /** + * Check if the current delegate's WebSocket is in a closed or closing state. + */ + private isDelegateClosedOrClosing(): boolean { + if (!this.delegate) { + return true; + } + + // Access the internal connection to check its readyState + const ws = this.delegate.connection; + if (!ws) { + return true; + } + + // WebSocket.CLOSING = 2, WebSocket.CLOSED = 3 + return ( + ws.readyState === WebSocket.CLOSING || ws.readyState === WebSocket.CLOSED + ); + } + + override async connect() { + // Don't reconnect if explicitly closed + if (this.isClosed) { + throw new Error("Transport is closed"); + } + + // If already connecting, wait for that connection + if (this.connectionPromise) { + return this.connectionPromise; + } + + this.connectionPromise = (async () => { + try { + // Wait for dependencies to be ready (e.g., runtime connection) + if (this.options.waitForConnection) { + await this.options.waitForConnection(); + } + + // Create a new delegate if needed + if (!this.delegate || this.isDelegateClosedOrClosing()) { + this.createDelegate(); + } + + // Connect the delegate + await this.delegate!.connect(); + Logger.log("WebSocket transport connected successfully"); + } catch (error) { + Logger.error("WebSocket transport connection failed", error); + // Clear the delegate on failure so we create a new one on retry + this.delegate = undefined; + throw error; + } finally { + this.connectionPromise = undefined; + } + })(); + + return this.connectionPromise; + } + + override close() { + this.isClosed = true; + this.delegate?.close(); + this.delegate = undefined; + this.connectionPromise = undefined; + } + + override async sendData( + data: JSONRPCRequestData, + timeout: number | null | undefined, + ) { + // If the delegate is closed or closing, try to reconnect + if (this.isDelegateClosedOrClosing()) { + Logger.warn("WebSocket is closed or closing, attempting to reconnect"); + try { + await this.connect(); + } catch (error) { + Logger.error("Failed to reconnect WebSocket", error); + throw error; + } + } + + // Send the data using the delegate + return this.delegate?.sendData(data, timeout); + } +} From d491de1402406f35ca697aee92ef64961e1c445c Mon Sep 17 00:00:00 2001 From: Myles Scolnick Date: Thu, 23 Oct 2025 18:06:02 -0400 Subject: [PATCH 2/2] fix --- frontend/src/core/lsp/transport.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/frontend/src/core/lsp/transport.ts b/frontend/src/core/lsp/transport.ts index a261480ced5..d5d6a543f6b 100644 --- a/frontend/src/core/lsp/transport.ts +++ b/frontend/src/core/lsp/transport.ts @@ -90,12 +90,13 @@ export class ReconnectingWebSocketTransport extends Transport { } // Create a new delegate if needed - if (!this.delegate || this.isDelegateClosedOrClosing()) { - this.createDelegate(); + let delegate = this.delegate; + if (!delegate || this.isDelegateClosedOrClosing()) { + delegate = this.createDelegate(); } // Connect the delegate - await this.delegate!.connect(); + await delegate.connect(); Logger.log("WebSocket transport connected successfully"); } catch (error) { Logger.error("WebSocket transport connection failed", error);