Skip to content

Commit 888ef5b

Browse files
simon-mojimpang
authored andcommitted
Revert "[V1] DP scale-out (1/N): Use zmq ROUTER/DEALER sockets for input queue (vllm-project#15906)"
This reverts commit 651cf0f.
1 parent 1d3fdeb commit 888ef5b

File tree

2 files changed

+22
-41
lines changed

2 files changed

+22
-41
lines changed

vllm/utils.py

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2205,8 +2205,6 @@ def make_zmq_socket(
22052205
ctx: Union[zmq.asyncio.Context, zmq.Context], # type: ignore[name-defined]
22062206
path: str,
22072207
socket_type: Any,
2208-
bind: Optional[bool] = None,
2209-
identity: Optional[bytes] = None,
22102208
) -> Union[zmq.Socket, zmq.asyncio.Socket]: # type: ignore[name-defined]
22112209
"""Make a ZMQ socket with the proper bind/connect semantics."""
22122210

@@ -2225,24 +2223,16 @@ def make_zmq_socket(
22252223
else:
22262224
buf_size = -1 # Use system default buffer size
22272225

2228-
if bind is None:
2229-
bind = socket_type != zmq.PUSH
2230-
2231-
if socket_type in (zmq.PULL, zmq.DEALER, zmq.ROUTER):
2232-
socket.setsockopt(zmq.RCVHWM, 0)
2233-
socket.setsockopt(zmq.RCVBUF, buf_size)
2234-
2235-
if socket_type in (zmq.PUSH, zmq.DEALER, zmq.ROUTER):
2236-
socket.setsockopt(zmq.SNDHWM, 0)
2237-
socket.setsockopt(zmq.SNDBUF, buf_size)
2238-
2239-
if identity is not None:
2240-
socket.setsockopt(zmq.IDENTITY, identity)
2241-
2242-
if bind:
2226+
if socket_type == zmq.constants.PULL:
2227+
socket.setsockopt(zmq.constants.RCVHWM, 0)
2228+
socket.setsockopt(zmq.constants.RCVBUF, buf_size)
22432229
socket.bind(path)
2244-
else:
2230+
elif socket_type == zmq.constants.PUSH:
2231+
socket.setsockopt(zmq.constants.SNDHWM, 0)
2232+
socket.setsockopt(zmq.constants.SNDBUF, buf_size)
22452233
socket.connect(path)
2234+
else:
2235+
raise ValueError(f"Unknown Socket Type: {socket_type}")
22462236

22472237
return socket
22482238

@@ -2251,19 +2241,14 @@ def make_zmq_socket(
22512241
def zmq_socket_ctx(
22522242
path: str,
22532243
socket_type: Any,
2254-
bind: Optional[bool] = None,
22552244
linger: int = 0,
2256-
identity: Optional[bytes] = None,
22572245
) -> Iterator[zmq.Socket]:
22582246
"""Context manager for a ZMQ socket"""
22592247

22602248
ctx = zmq.Context() # type: ignore[attr-defined]
22612249
try:
2262-
yield make_zmq_socket(ctx,
2263-
path,
2264-
socket_type,
2265-
bind=bind,
2266-
identity=identity)
2250+
yield make_zmq_socket(ctx, path, socket_type)
2251+
22672252
except KeyboardInterrupt:
22682253
logger.debug("Got Keyboard Interrupt.")
22692254

vllm/v1/engine/core.py

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -319,11 +319,6 @@ def __init__(
319319
):
320320
super().__init__(vllm_config, executor_class, log_stats)
321321

322-
self.step_fn = (self.step if self.batch_queue is None else
323-
self.step_with_batch_queue)
324-
325-
self.global_unfinished_reqs = False
326-
327322
# Background Threads and Queues for IO. These enable us to
328323
# overlap ZMQ socket IO with GPU since they release the GIL,
329324
# and to overlap some serialization/deserialization with the
@@ -333,16 +328,22 @@ def __init__(
333328
Any]] = queue.Queue()
334329
self.output_queue: queue.Queue[EngineCoreOutputs] = queue.Queue()
335330
threading.Thread(target=self.process_input_socket,
336-
args=(input_path, engine_index),
331+
args=(input_path, ),
337332
daemon=True).start()
338333
threading.Thread(target=self.process_output_socket,
339334
args=(output_path, engine_index),
340335
daemon=True).start()
341336

337+
self.global_unfinished_reqs = False
338+
339+
self.step_fn = (self.step if self.batch_queue is None else
340+
self.step_with_batch_queue)
341+
342342
@staticmethod
343343
def run_engine_core(*args,
344344
dp_rank: int = 0,
345345
local_dp_rank: int = 0,
346+
ready_pipe,
346347
**kwargs):
347348
"""Launch EngineCore busy loop in background process."""
348349

@@ -377,6 +378,9 @@ def signal_handler(signum, frame):
377378
else:
378379
engine_core = EngineCoreProc(*args, **kwargs)
379380

381+
# Send Readiness signal to EngineClient.
382+
ready_pipe.send({"status": "READY"})
383+
380384
engine_core.run_busy_loop()
381385

382386
except SystemExit:
@@ -473,22 +477,14 @@ def _convert_msgspec_args(method, args):
473477
and not isinstance(v, p.annotation) else v
474478
for v, p in zip(args, arg_types))
475479

476-
def process_input_socket(self, input_path: str, engine_index: int):
480+
def process_input_socket(self, input_path: str):
477481
"""Input socket IO thread."""
478482

479483
# Msgpack serialization decoding.
480484
add_request_decoder = MsgpackDecoder(EngineCoreRequest)
481485
generic_decoder = MsgpackDecoder()
482-
identity = engine_index.to_bytes(length=2, byteorder="little")
483-
484-
with zmq_socket_ctx(input_path,
485-
zmq.DEALER,
486-
identity=identity,
487-
bind=False) as socket:
488-
489-
# Send ready message to front-end once input socket is connected.
490-
socket.send(b'READY')
491486

487+
with zmq_socket_ctx(input_path, zmq.constants.PULL) as socket:
492488
while True:
493489
# (RequestType, RequestData)
494490
type_frame, *data_frames = socket.recv_multipart(copy=False)

0 commit comments

Comments
 (0)