2424
2525import cloudpickle
2626import torch
27+ from ginger .dataweave .dataloader_interface import Sequence
2728
2829import vllm .envs as envs
2930from vllm .config import VllmConfig
@@ -172,6 +173,7 @@ def _init_executor(self) -> None:
172173 # Start background thread to monitor worker health if not in headless mode.
173174 if self .monitor_workers :
174175 self .start_worker_monitor ()
176+
175177 self .response_mqs = []
176178 # Only leader node have remote response mqs
177179 if self .parallel_config .node_rank_within_dp == 0 :
@@ -186,6 +188,7 @@ def _init_executor(self) -> None:
186188 ]
187189 assert remote_message_queue is not None
188190 self .response_mqs .append (remote_message_queue )
191+
189192 # Ensure message queues are ready. Will deadlock if re-ordered
190193 # Must be kept consistent with the WorkerProc.
191194
@@ -280,22 +283,6 @@ def take_draft_token_ids(self) -> DraftTokenIds | None:
280283 "take_draft_token_ids" , unique_reply_rank = self .output_rank
281284 )
282285
283- def get_response_mqs (
284- self , unique_reply_rank : int | None = None
285- ) -> list [MessageQueue ]:
286- message_queues = []
287- for rank in range (self .world_size ):
288- if rank < self .local_world_size :
289- local_message_queue = self .workers [rank ].worker_response_mq
290- message_queues .append (local_message_queue )
291- else :
292- remote_message_queue = self .workers [0 ].peer_worker_response_mqs [rank ]
293- assert remote_message_queue is not None
294- message_queues .append (remote_message_queue )
295- if unique_reply_rank is not None :
296- message_queues = [message_queues [unique_reply_rank ]]
297- return message_queues
298-
299286 def collective_rpc ( # type: ignore[override]
300287 self ,
301288 method : str | Callable ,
@@ -332,9 +319,9 @@ def collective_rpc( # type: ignore[override]
332319 send_method = cloudpickle .dumps (method , protocol = pickle .HIGHEST_PROTOCOL )
333320 self .rpc_broadcast_mq .enqueue ((send_method , args , kwargs , output_rank ))
334321
335- response_mqs = self .response_mqs
336- if unique_reply_rank is not None :
337- response_mqs = [ self . response_mqs [unique_reply_rank ]]
322+ response_mqs : Sequence [ MessageQueue ] = self .response_mqs
323+ if output_rank is not None :
324+ response_mqs = ( response_mqs [output_rank ],)
338325
339326 shutdown_event = self .shutdown_event
340327
0 commit comments