Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions vllm_omni/distributed/omni_connectors/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def get_through_connector(connector, target_stage_id, stage_id, req_id, connecto
if payload_data:
connector.request_prompt_token_ids[req_id] = payload_data.get("thinker_input_ids", [])
connector.get_requests[req_id] += 1
logger.debug(f"[Stage-{stage_id}] Received one chunk for request {connector_get_key}")
logger.debug("[Stage-%d] Received one chunk for request %s", stage_id, connector_get_key)
break
time.sleep(0.01)
return payload_data
Expand Down Expand Up @@ -325,7 +325,7 @@ def put_chunk(
logger.error(f"Failed to use custom_process_input_func for payload extraction: {e}")

if not payload_data:
logger.warning(f"[Stage-{stage_id}] No payload data to send for request {request_id}")
logger.warning("[Stage-%d] No payload data to send for request %s", stage_id, request_id)
return

if stage_id == 0 and chunk_id == 0:
Expand All @@ -341,7 +341,7 @@ def put_chunk(
payload_data["thinker_hidden_states"] = torch.cat(
(save_payload.get("thinker_hidden_states"), payload_data.get("thinker_hidden_states")), dim=0
)
logger.info(f"[Stage-{stage_id}] Merged embeddings and hidden states for request {request_id}")
logger.debug("[Stage-%d] Merged embeddings and hidden states for request %s", stage_id, request_id)

if stage_id == 1:
# TODO: Make parameters configurable and optimize algorithms
Expand All @@ -367,7 +367,7 @@ def put_chunk(

if success:
connector.put_requests[request_id] += 1
logger.info(f"[Stage-{stage_id}] Sent {connector_put_key}")
logger.debug("[Stage-%d] Sent %s", stage_id, connector_put_key)


def compute_talker_prompt_ids_length(prompt_ids: list[int]) -> int:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import fcntl
import os
import time
from collections import defaultdict
from typing import Any
Expand Down Expand Up @@ -53,7 +55,12 @@ def put(self, from_stage: str, to_stage: str, put_key: str, data: Any) -> tuple[
# if size > self.threshold:
if True: # TODO: correct put & get logic
# Use Shared Memory
meta = shm_write_bytes(payload, name=put_key)
lock_file = f"/dev/shm/shm_{put_key}_lockfile.lock"
with open(lock_file, "w") as lockf:
fcntl.flock(lockf, fcntl.LOCK_EX)
meta = shm_write_bytes(payload, name=put_key)
fcntl.flock(lockf, fcntl.LOCK_UN)
Comment on lines +58 to +62
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clean up per-chunk lock files to avoid /dev/shm leaks

This change creates a new lock file under /dev/shm for every put_key (request+stage+chunk) but there is no corresponding deletion anywhere in the connector. On long-running servers or high‑throughput workloads, these lock files will accumulate indefinitely and can exhaust /dev/shm inode/space limits, which will then cause future SHM writes/locks to fail. This only happens when many chunks are produced, but it’s a realistic production scenario for streaming workloads.

Useful? React with 👍 / 👎.


# meta contains {'name': ..., 'size': ...}
metadata[put_key] = {"shm": meta, "size": size}
self._metrics["shm_writes"] += 1
Expand Down Expand Up @@ -97,7 +104,14 @@ def get(self, from_stage: str, to_stage: str, get_key: str, metadata=None) -> tu
return None, 0

try:
data_bytes = shm_read_bytes({"name": get_key, "size": shm.size})
lock_file = f"/dev/shm/shm_{get_key}_lockfile.lock"
with open(lock_file) as lockf:
fcntl.flock(lockf, fcntl.LOCK_SH)
data_bytes = shm_read_bytes({"name": get_key, "size": shm.size})
fcntl.flock(lockf, fcntl.LOCK_UN)
# Clean up the temporary file if it still exists.
if os.path.exists(lock_file):
os.remove(lock_file)
obj = self.deserialize_obj(data_bytes)
return obj, shm.size
finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ stage_args:
default_sampling_params:
temperature: 0.9
top_k: 50
max_tokens: 4096
max_tokens: 2048 # TODO: The max_tokens of the async_chunk feature cannot exceed 2048.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this model specific?

seed: 42
detokenize: False
repetition_penalty: 1.05
Expand Down