diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 261fcfb7dad9..888a8056448f 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -393,6 +393,7 @@ async def _run_workers_async( self, method: str, *args, + use_ray_compiled_dag: bool = False, driver_args: Optional[List[Any]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, **kwargs, @@ -409,12 +410,27 @@ async def _run_workers_async( driver_executor = make_async(getattr(self.driver_worker, method)) coros.append(driver_executor(*driver_args, **driver_kwargs)) - # Run the ray workers asynchronously. - for worker in self.workers: - coros.append(worker.execute_method.remote(method, *args, **kwargs)) + if use_ray_compiled_dag: + # Right now, compiled DAG can only accept a single + # input. TODO(sang): Fix it. + output_channels = self.forward_dag.execute(1) + try: + ray_worker_outputs = [ + pickle.loads(chan.begin_read()) for chan in output_channels + ] + finally: + # Has to call end_read in order to reuse the DAG. + for chan in output_channels: + chan.end_read() + return [await asyncio.gather(*coros)] + ray_worker_outputs + else: + # Run the ray workers asynchronously. + for worker in self.workers: + coros.append( + worker.execute_method.remote(method, *args, **kwargs)) - all_outputs = await asyncio.gather(*coros) - return all_outputs + all_outputs = await asyncio.gather(*coros) + return all_outputs async def execute_model_async( self,