websockets-sansio implementation doesn't implement keepalive pings
#2883
-
|
When running uvicorn behind a reverse proxy, e.g. nginx/caddy, if let's say a user opens a browser tab, but then goes away for some time and returns, with old Here's a minimal reproduction: Details"""Minimal reproduction: WebSocket idle message loss with uvicorn --ws websockets-sansio.
Run:
python repro_websocket_idle.py
Requires: uvicorn, fastapi, websockets, httpx
"""
from __future__ import annotations
import asyncio
import contextlib
import json
import socket
import warnings
from collections.abc import AsyncIterator
import httpx
import uvicorn
from fastapi import FastAPI
from fastapi.websockets import WebSocket
warnings.filterwarnings("ignore", message=r"websockets\..* is deprecated", category=DeprecationWarning)
warnings.filterwarnings("ignore", message="remove second argument of ws_handler", category=DeprecationWarning)
# ---------------------------------------------------------------------------
# Minimal FastAPI app with a WebSocket endpoint that forwards messages
# ---------------------------------------------------------------------------
app = FastAPI()
# Shared queue simulating Redis pub/sub
_queues: dict[str, asyncio.Queue[dict]] = {}
@app.websocket("/ws/{item_id}")
async def ws_notify(websocket: WebSocket) -> None:
item_id = websocket.path_params["item_id"]
queue: asyncio.Queue[dict] = asyncio.Queue()
_queues[item_id] = queue
await websocket.accept()
try:
# Forward queued messages to the client
while True:
msg = await queue.get()
await websocket.send_json(msg)
except Exception:
pass
finally:
_queues.pop(item_id, None)
@app.post("/complete/{item_id}")
async def mark_complete(item_id: str) -> dict:
queue = _queues.get(item_id)
if queue is None:
return {"delivered": False, "reason": "no subscriber"}
msg = {"status": "complete", "item_id": item_id}
await queue.put(msg)
return {"delivered": True}
# ---------------------------------------------------------------------------
# Idle-timeout TCP proxy
# ---------------------------------------------------------------------------
async def _relay(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, idle: asyncio.Event) -> None:
try:
while True:
data = await reader.read(65536)
if not data:
break
idle.set()
writer.write(data)
await writer.drain()
except (ConnectionError, asyncio.CancelledError):
pass
finally:
with contextlib.suppress(Exception):
writer.close()
async def start_idle_timeout_proxy(
target_host: str, target_port: int, idle_timeout: float
) -> tuple[int, asyncio.AbstractServer]:
async def handle_client(client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter) -> None:
try:
upstream_reader, upstream_writer = await asyncio.open_connection(target_host, target_port)
except OSError:
client_writer.close()
return
idle = asyncio.Event()
t1 = asyncio.create_task(_relay(client_reader, upstream_writer, idle))
t2 = asyncio.create_task(_relay(upstream_reader, client_writer, idle))
try:
while True:
idle.clear()
try:
await asyncio.wait_for(idle.wait(), timeout=idle_timeout)
except TimeoutError:
break
if t1.done() or t2.done():
break
finally:
t1.cancel()
t2.cancel()
with contextlib.suppress(Exception):
client_writer.close()
with contextlib.suppress(Exception):
upstream_writer.close()
server = await asyncio.start_server(handle_client, "127.0.0.1", 0)
return server.sockets[0].getsockname()[1], server
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
@contextlib.asynccontextmanager
async def _run_uvicorn(ws_impl: str) -> AsyncIterator[int]:
port = _get_free_port()
kwargs: dict = {"ws": ws_impl, "log_level": "critical"}
if ws_impl == "websockets":
kwargs["ws_ping_interval"] = 1.0
kwargs["ws_ping_timeout"] = 20.0
config = uvicorn.Config(app=app, host="127.0.0.1", port=port, **kwargs)
server = uvicorn.Server(config)
task = asyncio.create_task(server.serve())
while not server.started:
if task.done():
raise task.exception() # type: ignore[misc]
await asyncio.sleep(0.05)
try:
yield port
finally:
server.should_exit = True
with contextlib.suppress(Exception):
await asyncio.wait_for(task, timeout=5.0)
# ---------------------------------------------------------------------------
# Test runner
# ---------------------------------------------------------------------------
async def run_test(ws_impl: str, idle_timeout: float = 3.0, sleep: float = 5.0) -> bool:
"""Returns True if the WebSocket message was received, False otherwise."""
from websockets.asyncio.client import connect as ws_connect
from websockets.exceptions import ConnectionClosed
async with _run_uvicorn(ws_impl) as port:
proxy_port, proxy_server = await start_idle_timeout_proxy("127.0.0.1", port, idle_timeout)
try:
ws_url = f"ws://127.0.0.1:{proxy_port}/ws/test-item"
try:
async with ws_connect(ws_url, ping_interval=None, ping_timeout=None, close_timeout=2) as ws:
# Simulate idle browser tab
await asyncio.sleep(sleep)
# Trigger server-side message
async with httpx.AsyncClient(base_url=f"http://127.0.0.1:{port}", timeout=5.0) as client:
resp = await client.post("/complete/test-item")
delivery = resp.json()
if not delivery["delivered"]:
return False
try:
raw = await asyncio.wait_for(ws.recv(), timeout=3.0)
data = json.loads(raw) if isinstance(raw, str) else {}
return data.get("status") == "complete"
except TimeoutError:
return False
except ConnectionClosed:
return False
finally:
proxy_server.close()
await proxy_server.wait_closed()
async def main() -> None:
print("=" * 70)
print("Reproducing WebSocket idle message loss")
print(" Proxy idle timeout: 3s | Sleep (simulated idle): 5s")
print("=" * 70)
print()
for ws_impl in ("websockets", "websockets-sansio"):
print(f"Testing ws={ws_impl!r} ...", end=" ", flush=True)
received = await run_test(ws_impl)
status = "RECEIVED" if received else "LOST"
print(f"message {status}")
print()
print("Expected: 'websockets' receives (pings keep proxy alive),")
print(" 'websockets-sansio' loses (no pings, proxy drops idle conn).")
if __name__ == "__main__":
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
|
Implemented a potential fix in #2884 |
Beta Was this translation helpful? Give feedback.
Now solved in new release, thanks