Skip to content

Commit 5649857

Browse files
[ BugFix ] Move zmq frontend to IPC instead of TCP (#7222)
1 parent 0f7052b commit 5649857

5 files changed

Lines changed: 29 additions & 22 deletions

File tree

vllm/entrypoints/openai/api_server.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
OpenAIServingTokenization)
4444
from vllm.logger import init_logger
4545
from vllm.usage.usage_lib import UsageContext
46-
from vllm.utils import FlexibleArgumentParser, get_open_port
46+
from vllm.utils import FlexibleArgumentParser, get_open_zmq_ipc_path
4747
from vllm.version import __version__ as VLLM_VERSION
4848

4949
TIMEOUT_KEEP_ALIVE = 5 # seconds
@@ -106,16 +106,20 @@ async def build_async_engine_client(args) -> AsyncIterator[AsyncEngineClient]:
106106

107107
# Otherwise, use the multiprocessing AsyncLLMEngine.
108108
else:
109+
# Select random path for IPC.
110+
rpc_path = get_open_zmq_ipc_path()
111+
logger.info("Multiprocessing frontend to use %s for RPC Path.",
112+
rpc_path)
113+
109114
# Start RPCServer in separate process (holds the AsyncLLMEngine).
110-
port = get_open_port(envs.VLLM_RPC_PORT)
111115
rpc_server_process = Process(target=run_rpc_server,
112116
args=(engine_args,
113117
UsageContext.OPENAI_API_SERVER,
114-
port))
118+
rpc_path))
115119
rpc_server_process.start()
116120

117121
# Build RPCClient, which conforms to AsyncEngineClient Protocol.
118-
async_engine_client = AsyncEngineRPCClient(port)
122+
async_engine_client = AsyncEngineRPCClient(rpc_path)
119123
await async_engine_client.setup()
120124

121125
try:

vllm/entrypoints/openai/rpc/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121

2222
class AsyncEngineRPCClient:
2323

24-
def __init__(self, port: int):
24+
def __init__(self, rpc_path: str):
2525
self.context = zmq.asyncio.Context()
26-
self.path = f"tcp://localhost:{port}"
26+
self.rpc_path = rpc_path
2727

2828
async def setup(self):
2929
"""Setup the client before it starts sending server requests."""
@@ -58,7 +58,7 @@ def socket(self):
5858
# to enable streaming.
5959
socket = self.context.socket(zmq.constants.DEALER)
6060
try:
61-
socket.connect(self.path)
61+
socket.connect(self.rpc_path)
6262
yield socket
6363
finally:
6464
socket.close()

vllm/entrypoints/openai/rpc/server.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
class AsyncEngineRPCServer:
2121

2222
def __init__(self, async_engine_args: AsyncEngineArgs,
23-
usage_context: UsageContext, port: int):
23+
usage_context: UsageContext, rpc_path: str):
2424
# Initialize engine first.
2525
self.engine = AsyncLLMEngine.from_engine_args(async_engine_args,
2626
usage_context)
@@ -30,9 +30,7 @@ def __init__(self, async_engine_args: AsyncEngineArgs,
3030

3131
# Init socket for readiness state.
3232
self.socket = self.context.socket(zmq.constants.ROUTER)
33-
# Note numeric form of localhost should be used for zmq bind(),
34-
# see https://stackoverflow.com/a/8958414
35-
self.socket.bind(f"tcp://127.0.0.1:{port}")
33+
self.socket.bind(rpc_path)
3634

3735
def cleanup(self):
3836
"""Cleanup all resources."""
@@ -213,6 +211,6 @@ def signal_handler() -> None:
213211

214212

215213
def run_rpc_server(async_engine_args: AsyncEngineArgs,
216-
usage_context: UsageContext, port: int):
217-
server = AsyncEngineRPCServer(async_engine_args, usage_context, port)
214+
usage_context: UsageContext, rpc_path: str):
215+
server = AsyncEngineRPCServer(async_engine_args, usage_context, rpc_path)
218216
asyncio.run(run_server(server))

vllm/envs.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import os
2+
import tempfile
23
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
34

45
if TYPE_CHECKING:
56
VLLM_HOST_IP: str = ""
67
VLLM_PORT: Optional[int] = None
7-
VLLM_RPC_PORT: int = 5570
8+
VLLM_RPC_BASE_PATH: str = tempfile.gettempdir()
89
VLLM_USE_MODELSCOPE: bool = False
910
VLLM_RINGBUFFER_WARNING_INTERVAL: int = 60
1011
VLLM_INSTANCE_ID: Optional[str] = None
@@ -142,10 +143,10 @@ def get_default_config_root():
142143
lambda: int(os.getenv('VLLM_PORT', '0'))
143144
if 'VLLM_PORT' in os.environ else None,
144145

145-
# used when the frontend api server is running in multi-processing mode,
146-
# to communicate with the backend engine process over ZMQ.
147-
'VLLM_RPC_PORT':
148-
lambda: int(os.getenv('VLLM_RPC_PORT', '5570')),
146+
# path used for ipc when the frontend api server is running in
147+
# multi-processing mode to communicate with the backend engine process.
148+
'VLLM_RPC_BASE_PATH':
149+
lambda: os.getenv('VLLM_RPC_BASE_PATH', tempfile.gettempdir()),
149150

150151
# If true, will load models from ModelScope instead of Hugging Face Hub.
151152
# note that the value is true or false, not numbers

vllm/utils.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from typing import (Any, AsyncGenerator, Awaitable, Callable, Dict, Generic,
2020
Hashable, List, Optional, OrderedDict, Set, Tuple, TypeVar,
2121
Union, overload)
22+
from uuid import uuid4
2223

2324
import numpy as np
2425
import numpy.typing as npt
@@ -484,10 +485,13 @@ def get_distributed_init_method(ip: str, port: int) -> str:
484485
return f"tcp://[{ip}]:{port}" if ":" in ip else f"tcp://{ip}:{port}"
485486

486487

487-
def get_open_port(port: Optional[int] = None) -> int:
488-
if port is None:
489-
# Default behavior here is to return a port for multi-gpu communication
490-
port = envs.VLLM_PORT
488+
def get_open_zmq_ipc_path() -> str:
489+
base_rpc_path = envs.VLLM_RPC_BASE_PATH
490+
return f"ipc://{base_rpc_path}/{uuid4()}"
491+
492+
493+
def get_open_port() -> int:
494+
port = envs.VLLM_PORT
491495
if port is not None:
492496
while True:
493497
try:

0 commit comments

Comments
 (0)