Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4e8a0e5
Fix worker stop, timestamp handling, and imports
C-Achard Mar 2, 2026
ec86f28
Add WorkerState enum for worker lifecycle
C-Achard Mar 2, 2026
cad824c
Improve DLCLive processor lifecycle and safety
C-Achard Mar 2, 2026
d5ea95a
Restart worker if dead and clear stop event
C-Achard Mar 2, 2026
a4d21f7
Mark worker FAULTED on queue errors
C-Achard Mar 2, 2026
76d9a30
Force-terminate frozen camera threads on stop
C-Achard Mar 2, 2026
16b0d2a
Count unique cameras when starting multi-camera
C-Achard Mar 2, 2026
ae1754c
Improve VideoRecorder shutdown and writer loop
C-Achard Mar 2, 2026
29f2604
Refactor writer thread error handling and stats
C-Achard Mar 2, 2026
64f820e
Revert DLCLive imports due to unguarded torch
C-Achard Mar 2, 2026
9b6e1c8
Improve camera thread cleanup and recorder finalization
C-Achard Mar 2, 2026
acd15da
Update dlc_processor.py
C-Achard Mar 2, 2026
369908e
Add lifecycle management & robust writer shutdown
C-Achard Mar 2, 2026
91dff83
Improve lifecycle handling and stale-writer cleanup
C-Achard Mar 2, 2026
2b0a359
Cleanup stale camera workers on start/shutdown
C-Achard Mar 2, 2026
d0999a7
Make stop join timeout configurable and add test
C-Achard Mar 2, 2026
5c06b5c
Fix locking, camera check, and writer finalization
C-Achard Mar 2, 2026
b4e57d6
Add background reaper for stalled DLC worker
C-Achard Mar 2, 2026
5ec6266
Cache _queue to local variable in VideoRecorder
C-Achard Mar 2, 2026
6be4047
Prevent configure while processor running
C-Achard Mar 2, 2026
71bb1aa
Cleanup abandoned recorder and set pending reset
C-Achard Mar 2, 2026
60da106
Refactor VideoRecorder.stop lifecycle handling
C-Achard Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dlclivegui/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ def _on_multi_frame_ready(self, frame_data: MultiFrameData) -> None:
dlc_cam_id = selected_id
else:
dlc_cam_id = available_ids[0] if available_ids else ""
if dlc_cam_id is not None:
if dlc_cam_id:
self._inference_camera_id = dlc_cam_id
self._set_dlc_combo_to_id(dlc_cam_id)
self.statusBar().showMessage(
Expand Down
2 changes: 1 addition & 1 deletion dlclivegui/gui/recording_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def write_frame(self, cam_id: str, frame: np.ndarray, timestamp: float | None =
if not rec or not rec.is_running:
return
try:
rec.write(frame, timestamp=timestamp or time.time())
rec.write(frame, timestamp=timestamp if timestamp is not None else time.time())
except Exception as exc:
log.warning("Failed to write frame for %s: %s", cam_id, exc)
try:
Expand Down
153 changes: 116 additions & 37 deletions dlclivegui/services/dlc_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from typing import Any

import numpy as np

# from dlclive import DLCLive
from PySide6.QtCore import QObject, Signal

from dlclivegui.config import DLCProcessorSettings, ModelType
Expand All @@ -22,9 +24,6 @@

logger = logging.getLogger(__name__)

# Enable profiling
ENABLE_PROFILING = True

try: # pragma: no cover - optional dependency
from dlclive import (
DLCLive, # type: ignore
Expand All @@ -34,10 +33,22 @@
DLCLive = None # type: ignore[assignment]


# Enable profiling to get more detailed timing metrics for debugging and optimization.
ENABLE_PROFILING = True
Comment on lines +35 to +36
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still needed?



class PoseBackends(Enum):
DLC_LIVE = auto()


class WorkerState(Enum):
STOPPED = auto()
STARTING = auto()
RUNNING = auto()
STOPPING = auto()
FAULTED = auto()


@dataclass
class PoseResult:
pose: np.ndarray | None
Expand Down Expand Up @@ -135,11 +146,15 @@ class DLCLiveProcessor(QObject):

def __init__(self) -> None:
super().__init__()
# DLCLive instance and config
self._settings = DLCProcessorSettings()
self._dlc: Any | None = None
self._processor: Any | None = None
# Worker thread and queue
self._queue: queue.Queue[Any] | None = None
self._worker_thread: threading.Thread | None = None
self._state = WorkerState.STOPPED
self._lifecycle_lock = threading.Lock()
self._stop_event = threading.Event()
self._initialized = False

Expand Down Expand Up @@ -169,7 +184,12 @@ def configure(self, settings: DLCProcessorSettings, processor: Any | None = None

def reset(self) -> None:
"""Stop the worker thread and drop the current DLCLive instance."""
self._stop_worker()
stopped = self._stop_worker()
if not stopped:
logger.warning(
"Reset requested but worker thread is still alive; skipping DLCLive reset to avoid potential issues."
)
return
self._dlc = None
self._initialized = False
with self._stats_lock:
Expand All @@ -186,22 +206,34 @@ def reset(self) -> None:
self._processor_overhead_times.clear()

def shutdown(self) -> None:
self._stop_worker()
stopped = self._stop_worker()
if not stopped:
logger.warning(
"Shutdown requested but worker thread is still alive; DLCLive instance may not be fully released."
)
return
self._dlc = None
self._initialized = False

def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None:
# Start worker on first frame
if self._worker_thread is None:
self._start_worker(frame.copy(), timestamp)
return
frame_c = frame.copy()
enq_time = time.perf_counter()

with self._lifecycle_lock:
if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set():
return
t = self._worker_thread
if t is None or not t.is_alive():
self._start_worker_locked(frame_c, timestamp)
return

q = self._queue # snapshot under lock

# As long as worker and queue are ready, ALWAYS enqueue
if self._queue is None:
if q is None:
return

try:
self._queue.put_nowait((frame.copy(), timestamp, time.perf_counter()))
q.put_nowait((frame_c, timestamp, enq_time))
with self._stats_lock:
self._frames_enqueued += 1
except queue.Full:
Expand Down Expand Up @@ -259,12 +291,13 @@ def get_stats(self) -> ProcessorStats:
avg_processor_overhead=avg_proc_overhead,
)

def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None:
def _start_worker_locked(self, init_frame: np.ndarray, init_timestamp: float) -> None:
# lifecycle_lock must already be held
if self._worker_thread is not None and self._worker_thread.is_alive():
return

self._queue = queue.Queue(maxsize=1)
self._stop_event.clear()
self._state = WorkerState.STARTING
self._worker_thread = threading.Thread(
target=self._worker_loop,
args=(init_frame, init_timestamp),
Expand All @@ -273,19 +306,34 @@ def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None:
)
self._worker_thread.start()

def _stop_worker(self) -> None:
if self._worker_thread is None:
return

self._stop_event.set()

# Just wait for the timed get() loop to observe the flag and drain
self._worker_thread.join(timeout=2.0)
if self._worker_thread.is_alive():
logger.warning("DLC worker thread did not terminate cleanly")

self._worker_thread = None
self._queue = None
def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None:
with self._lifecycle_lock:
self._start_worker_locked(init_frame, init_timestamp)

def _stop_worker(self) -> bool:
with self._lifecycle_lock:
t = self._worker_thread
if t is None:
self._state = WorkerState.STOPPED
self._stop_event.clear()
return True
self._state = WorkerState.STOPPING
self._stop_event.set()

t.join(timeout=2.0)
if t.is_alive():
qsize = self._queue.qsize() if self._queue is not None else -1
logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)", qsize)
with self._lifecycle_lock:
self._state = WorkerState.FAULTED
return False

with self._lifecycle_lock:
self._worker_thread = None
self._queue = None
self._state = WorkerState.STOPPED
self._stop_event.clear()
return True

@contextmanager
def _timed_processor(self):
Expand Down Expand Up @@ -328,6 +376,8 @@ def _process_frame(
Single source of truth for: inference -> (optional) processor timing -> signal emit -> stats.
Updates: frames_processed, latency, processing timeline, profiling metrics.
"""
if self._dlc is None:
raise RuntimeError("DLCLive instance is not initialized.")
# Time GPU inference (and processor overhead when present)
with self._timed_processor() as proc_holder:
inference_start = time.perf_counter()
Expand Down Expand Up @@ -377,8 +427,6 @@ def _process_frame(
def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:
try:
# -------- Initialization (unchanged) --------
if DLCLive is None:
raise RuntimeError("The 'dlclive' package is required for pose estimation.")
if not self._settings.model_path:
raise RuntimeError("No DLCLive model path configured.")

Expand All @@ -403,7 +451,18 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:
if self._settings.device is not None:
options["device"] = self._settings.device

self._dlc = DLCLive(**options)
try:
if DLCLive is None:
raise RuntimeError(
"DLCLive class is not available. Ensure the dlclive package is installed and can be imported."
)
self._dlc = DLCLive(**options)
except Exception as exc:
with self._lifecycle_lock:
self._state = WorkerState.FAULTED
raise RuntimeError(
f"Failed to initialize DLCLive with model '{self._settings.model_path}': {exc}"
) from exc

# First inference to initialize
init_inference_start = time.perf_counter()
Expand All @@ -416,6 +475,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:

self._initialized = True
self.initialized.emit(True)
with self._lifecycle_lock:
self._state = WorkerState.RUNNING

total_init_time = time.perf_counter() - init_start
logger.info(
Expand All @@ -435,14 +496,24 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:
self.initialized.emit(False)
return

q = (
self._queue
) # Assign to local to avoid issues if self._queue is set to None during shutdown while loop is still running.
if q is None:
logger.warning("Worker started without a queue; exiting")
with self._lifecycle_lock:
self._state = WorkerState.FAULTED
self.error.emit("Worker started without a queue")
return

# -------- Main processing loop: stop-flag + timed get + drain --------
# NOTE: We never exit early unless _stop_event is set.
while True:
# If stop requested, only exit when queue is empty
if self._stop_event.is_set():
if self._queue is not None:
if q is not None:
try:
frame, ts, enq = self._queue.get_nowait()
frame, ts, enq = q.get_nowait()
except queue.Empty:
# NOW it is safe to exit
break
Expand All @@ -455,18 +526,24 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:
self.error.emit(str(exc))
finally:
try:
self._queue.task_done()
q.task_done()
except ValueError:
pass
continue # check stop_event again WITHOUT breaking

# Normal operation: timed get
try:
wait_start = time.perf_counter()
item = self._queue.get(timeout=0.05)
item = q.get(timeout=0.05)
queue_wait_time = time.perf_counter() - wait_start
except queue.Empty:
continue
except Exception as exc:
logger.exception("Error getting item from queue", exc_info=exc)
with self._lifecycle_lock:
self._state = WorkerState.FAULTED
self.error.emit(str(exc))
break

try:
frame, ts, enq = item
Expand All @@ -476,7 +553,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:
self.error.emit(str(exc))
finally:
try:
self._queue.task_done()
q.task_done()
except ValueError:
pass

Expand Down Expand Up @@ -513,6 +590,10 @@ def enqueue(self, frame, ts):
self._proc.enqueue_frame(frame, ts)

def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, selected_key) -> bool:
with self._proc._lifecycle_lock:
if self._proc._state != WorkerState.STOPPED:
raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.")

processor = None
if selected_key is not None and scanned_processors:
try:
Expand All @@ -526,11 +607,9 @@ def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, se
def start(self):
self._proc.reset()
self.active = True
self.initialized = False

def stop(self):
self.active = False
self.initialized = False
self._proc.reset()
self._last_pose = None

Expand Down
Loading