Skip to content

Conversation

@xiaoyu10031
Copy link
Collaborator

@xiaoyu10031 xiaoyu10031 commented Dec 1, 2025

fix some errors

Summary by Sourcery

Add a new Uni-Lab-OS camera streaming device driver and register it in the device registry for use as a cameraSII device.

New Features:

  • Introduce a CameraController Python driver that manages camera streaming over RTMP and WebRTC via FFmpeg and WebSocket signaling.
  • Register the CameraController as a cameraSII device in the Uni-Lab-OS registry with configurable connection parameters and start/stop commands.

- Run blocking requests.post in a thread executor to avoid blocking the event loop
- Change _handle_webrtc_offer to async and await it in the offer message handler
- Improve FFmpeg shutdown by waiting after kill() to avoid potential zombie processes
@sourcery-ai
Copy link

sourcery-ai bot commented Dec 1, 2025

Reviewer's Guide

Adds a new Uni-Lab-OS camera streaming driver implemented as a Python CameraController class with WebSocket signaling, FFmpeg RTMP streaming, and WebRTC offer handling, plus a corresponding device registry entry to expose it as a UniLab device.

Sequence diagram for CameraController start and stop lifecycle

sequenceDiagram
    actor UniLabOS
    participant CameraController
    participant Thread_loop_thread as LoopThread
    participant Asyncio_loop as EventLoop
    participant Signal_backend as SignalBackend
    participant FFmpeg

    UniLabOS->>CameraController: start(config)
    CameraController->>CameraController: apply_config(config)
    CameraController->>CameraController: _start_ffmpeg()
    CameraController->>FFmpeg: spawn process to rtmp_url
    CameraController->>Asyncio_loop: create_new_event_loop()
    CameraController->>Thread_loop_thread: start(loop_runner)
    Thread_loop_thread->>Asyncio_loop: run_forever()
    CameraController->>Asyncio_loop: schedule _run_main_loop()
    Asyncio_loop->>Signal_backend: websockets.connect(signal_backend_url)
    Asyncio_loop->>Signal_backend: maintain _recv_loop()
    CameraController-->>UniLabOS: start() result(status=started,...)

    UniLabOS->>CameraController: stop()
    CameraController->>CameraController: _running = False
    CameraController->>FFmpeg: _stop_ffmpeg()
    CameraController->>Asyncio_loop: close WebSocket
    CameraController->>Asyncio_loop: cancel _loop_task
    Asyncio_loop-->>CameraController: _run_main_loop cancelled
    CameraController->>Asyncio_loop: stop()
    CameraController->>Thread_loop_thread: join()
    CameraController-->>UniLabOS: stop() result(status=stopped)
Loading

Sequence diagram for handling WebRTC offer via CameraController

sequenceDiagram
    participant Browser
    participant Signal_backend as SignalBackend
    participant CameraController
    participant Media_server as MediaServer

    Browser->>Signal_backend: send offer(type=offer,sdp,cameraId)
    Signal_backend-->>CameraController: WebSocket message JSON
    CameraController->>CameraController: _handle_message(data)
    CameraController->>CameraController: _handle_webrtc_offer(offer_sdp) async
    CameraController->>Media_server: HTTP POST webrtc_api with offer_sdp
    Media_server-->>CameraController: HTTP 200 JSON {sdp: answer_sdp}
    CameraController-->>CameraController: extract answer_sdp
    CameraController->>Signal_backend: WebSocket send {type:answer,sdp:answer_sdp,cameraId,hostId}
    Signal_backend-->>Browser: deliver answer SDP
Loading

Class diagram for CameraController driver structure

classDiagram
    class CameraController {
        -str host_id
        -str signal_backend_url
        -str rtmp_url
        -str webrtc_api
        -str webrtc_stream_url
        -object _ws
        -subprocess.Popen _ffmpeg_process
        -bool _running
        -asyncio.Future _loop_task
        -asyncio.AbstractEventLoop _loop
        -threading.Thread _loop_thread
        +CameraController(host_id, signal_backend_url, rtmp_url, webrtc_api, webrtc_stream_url)
        +start(config) Dict
        +stop() Dict
        +get_status() Dict
        -_run_main_loop() async
        -_recv_loop() async
        -_handle_message(data) async
        -_start_ffmpeg() void
        -_stop_ffmpeg() void
        -_handle_webrtc_offer(offer_sdp) async str
    }

    class asyncio {
    }
    class websockets {
    }
    class requests {
    }
    class subprocess {
    }
    class threading {
    }

    CameraController --> websockets : uses WebSocket
    CameraController --> subprocess : manages FFmpeg process
    CameraController --> requests : sends HTTP to media server
    CameraController --> asyncio : runs async loops
    CameraController --> threading : runs event loop thread
Loading

Flow diagram for Uni-Lab-OS device registry integration of CameraController

flowchart TD
    RegistryEntry[cameraSII.yaml cameracontroller_device]
    UniLabCore[Uni-Lab-OS device framework]
    CameraModule[unilabos.devices.cameraSII.cameraDriver]
    CameraClass[CameraController]

    RegistryEntry -->|module field| CameraModule
    CameraModule -->|class instantiation| CameraClass
    RegistryEntry -->|init_param_schema| UniLabCore
    UniLabCore -->|create device with config| CameraClass
    RegistryEntry -->|action_value_mappings auto-start/auto-stop| UniLabCore
    UniLabCore -->|invoke start/stop| CameraClass
    RegistryEntry -->|status_types.status| UniLabCore
    UniLabCore -->|call get_status| CameraClass
Loading

File-Level Changes

Change Details Files
Introduce CameraController driver implementing camera streaming over RTMP/WebRTC with WebSocket signaling and FFmpeg management.
  • Implement CameraController with configurable host and backend URLs, maintaining runtime state for WebSocket, FFmpeg process, and asyncio loop/thread.
  • Provide start/stop/get_status APIs that integrate with Uni-Lab-OS, manage an internal asyncio event loop in a background thread, and automatically start FFmpeg on initialization/start.
  • Implement WebSocket main loop to connect to the signaling backend, receive JSON messages, and handle commands for starting/stopping FFmpeg and processing WebRTC offers.
  • Add robust FFmpeg lifecycle control including safe Popen creation, quiet logging, graceful terminate/kill with timeouts, and status reporting.
  • Implement asynchronous WebRTC offer handling by forwarding SDP to the SRS media server via requests in a thread executor, validating responses and surfacing errors with concise logging.
unilabos/devices/cameraSII/cameraDriver.py
Register the new CameraController as a Uni-Lab-OS device with actions and configuration schema.
  • Create a device registry YAML entry that maps the new CameraController module/class into the registry with type python and category cameraSII.
  • Define UniLabJsonCommand action mappings for auto-start and auto-stop, including lightweight goal schemas for start/stop parameters.
  • Specify init_param_schema with configurable host_id, signaling URL, RTMP URL, and WebRTC endpoints plus a data schema exposing status.
unilabos/registry/devices/cameraSII.yaml

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

Blocking issues:

  • Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)

General comments:

  • Auto-starting the controller in __init__ (calling self.start() in the constructor) can make lifecycle management harder and surprising for callers; consider removing the implicit start and requiring explicit start calls via the registered commands or an explicit flag.
  • The FFmpeg command currently hardcodes /dev/video0, resolution, framerate, and encoding parameters; it would be more flexible to expose these as configuration options (and reflect them in the init_param_schema) so different cameras and environments can be supported without code changes.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Auto-starting the controller in `__init__` (calling `self.start()` in the constructor) can make lifecycle management harder and surprising for callers; consider removing the implicit start and requiring explicit `start` calls via the registered commands or an explicit flag.
- The FFmpeg command currently hardcodes `/dev/video0`, resolution, framerate, and encoding parameters; it would be more flexible to expose these as configuration options (and reflect them in the `init_param_schema`) so different cameras and environments can be supported without code changes.

## Individual Comments

### Comment 1
<location> `unilabos/devices/cameraSII/cameraDriver.py:80-89` </location>
<code_context>
+
+        # 应用 config 覆盖(如果有)
+        if config:
+            cfg_host_id = config.get("host_id")
+            if cfg_host_id:
+                self.host_id = cfg_host_id
+
+            signal_backend_url = config.get("signal_backend_url")
+            if signal_backend_url:
+                signal_backend_url = signal_backend_url.rstrip("/")
+                if not signal_backend_url.endswith("/host"):
+                    signal_backend_url = signal_backend_url + "/host"
+                self.signal_backend_url = f"{signal_backend_url}/{self.host_id}"
+
+            self.rtmp_url = config.get("rtmp_url", self.rtmp_url)
</code_context>

<issue_to_address>
**issue (bug_risk):** Updating host_id without recomputing signal URL can desync host and WS path

In `start()`, if `config` sets a new `host_id` but omits `signal_backend_url`, `self.host_id` is updated but `self.signal_backend_url` still points to the old host. The controller can then identify as one host while connecting to another host’s WebSocket. Consider recomputing `self.signal_backend_url` whenever `host_id` changes (e.g., by storing the base URL separately and rebuilding the full URL) so they stay in sync.
</issue_to_address>

### Comment 2
<location> `unilabos/registry/devices/cameraSII.yaml:6-15` </location>
<code_context>
+  - cameraSII
+  class:
+    action_value_mappings:
+      auto-start:
+        feedback: {}
+        goal: {}
+        goal_default:
+          config: null
+        handles: {}
+        result: {}
+        schema:
+          description: ''
+          properties:
+            feedback: {}
+            goal:
+              properties:
+                config:
+                  type: string
+              required: []
</code_context>

<issue_to_address>
**issue (bug_risk):** Schema defines config as string but driver expects a dict-like config

`CameraController.start()` calls `config.get(...)`, so it needs a mapping, but the schema declares `goal.properties.config.type: string`. If a string is passed (per schema), this will fail at runtime. Either update the schema to make `config` an `object` with the expected fields (`host_id`, `rtmp_url`, etc.), or change the driver to accept a string and parse it (e.g., JSON) before accessing its fields.
</issue_to_address>

### Comment 3
<location> `unilabos/devices/cameraSII/cameraDriver.py:395-400` </location>
<code_context>
            self._ffmpeg_process = subprocess.Popen(
                cmd,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.STDOUT,
                shell=False, # 安全起见不使用 shell
            )
</code_context>

<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

### Comment 4
<location> `unilabos/devices/cameraSII/cameraDriver.py:41` </location>
<code_context>
    def __init__(
        self,
        host_id: str = "demo-host",

        # (1)信令后端(WebSocket)
        signal_backend_url: str = "wss://sciol.ac.cn/api/realtime/signal/host",

        # (2)媒体后端(RTMP + WebRTC API)
        rtmp_url: str = "rtmp://srs.sciol.ac.cn:4499/live/camera-01",
        webrtc_api: str = "https://srs.sciol.ac.cn/rtc/v1/play/",
        webrtc_stream_url: str = "webrtc://srs.sciol.ac.cn:4500/live/camera-01",
    ):
        self.host_id = host_id

        # 拼接最终的 WebSocket URL:.../host/<host_id>
        signal_backend_url = signal_backend_url.rstrip("/")
        if not signal_backend_url.endswith("/host"):
            signal_backend_url = signal_backend_url + "/host"
        self.signal_backend_url = f"{signal_backend_url}/{host_id}"

        # 媒体服务器配置(与 HostSimulator 保持一致)
        self.rtmp_url = rtmp_url
        self.webrtc_api = webrtc_api
        self.webrtc_stream_url = webrtc_stream_url

        # 运行时状态
        self._ws: Optional[object] = None
        self._ffmpeg_process: Optional[subprocess.Popen] = None
        self._running = False
        self._loop_task: Optional[asyncio.Future] = None

        # 事件循环 & 线程
        self._loop: Optional[asyncio.AbstractEventLoop] = None
        self._loop_thread: Optional[threading.Thread] = None

        # 这里不传 config,使用构造参数作为默认配置
        try:
            self.start()
        except Exception as e:
            # 日志调整: 构造阶段只打印错误
            print(f"[CameraController] __init__ auto start failed: {e}", file=sys.stderr)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Replace assignment with augmented assignment ([`aug-assign`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/aug-assign/))

```suggestion
            signal_backend_url += "/host"
```
</issue_to_address>

### Comment 5
<location> `unilabos/devices/cameraSII/cameraDriver.py:88` </location>
<code_context>
    def start(self, config: Optional[Dict[str, Any]] = None):
        """
        启动 Camera 连接 & 消息循环,并在启动时就开启 FFmpeg 推流,
        """

        if self._running:
            return {"status": "already_running", "host_id": self.host_id}

        # 应用 config 覆盖(如果有)
        if config:
            cfg_host_id = config.get("host_id")
            if cfg_host_id:
                self.host_id = cfg_host_id

            signal_backend_url = config.get("signal_backend_url")
            if signal_backend_url:
                signal_backend_url = signal_backend_url.rstrip("/")
                if not signal_backend_url.endswith("/host"):
                    signal_backend_url = signal_backend_url + "/host"
                self.signal_backend_url = f"{signal_backend_url}/{self.host_id}"

            self.rtmp_url = config.get("rtmp_url", self.rtmp_url)
            self.webrtc_api = config.get("webrtc_api", self.webrtc_api)
            self.webrtc_stream_url = config.get(
                "webrtc_stream_url", self.webrtc_stream_url
            )

        self._running = True

        # === start 时启动 FFmpeg 推流 ===
        # 日志调整: 不再输出正常启动信息,保留异常打印
        self._start_ffmpeg()
        # =================================================

        # 创建新的事件循环和线程(用于 WebSocket 信令)
        self._loop = asyncio.new_event_loop()

        def loop_runner(loop: asyncio.AbstractEventLoop):
            asyncio.set_event_loop(loop)
            try:
                loop.run_forever()
            except Exception as e:
                # 日志调整: 只打印循环异常
                print(f"[CameraController] event loop error: {e}", file=sys.stderr)

        self._loop_thread = threading.Thread(
            target=loop_runner, args=(self._loop,), daemon=True
        )
        self._loop_thread.start()

        # 在这个新 loop 上调度主协程,并保留 Future,后续在 stop() 中取消和收割结果
        self._loop_task = asyncio.run_coroutine_threadsafe(
            self._run_main_loop(), self._loop
        )

        return {
            "status": "started",
            "host_id": self.host_id,
            "signal_backend_url": self.signal_backend_url,
            "rtmp_url": self.rtmp_url,
            "webrtc_api": self.webrtc_api,
            "webrtc_stream_url": self.webrtc_stream_url,
        }

</code_context>

<issue_to_address>
**suggestion (code-quality):** Use f-string instead of string concatenation ([`use-fstring-for-concatenation`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-fstring-for-concatenation/))

```suggestion
                    signal_backend_url = f"{signal_backend_url}/host"
```
</issue_to_address>

### Comment 6
<location> `unilabos/devices/cameraSII/cameraDriver.py:207-210` </location>
<code_context>
    def get_status(self) -> Dict[str, Any]:
        """
        查询当前状态,方便在 Uni-Lab-OS 中做监控。
        """
        ws_closed = None
        if self._ws is not None:
            ws_closed = getattr(self._ws, "closed", None)

        if ws_closed is None:
            websocket_connected = self._ws is not None
        else:
            websocket_connected = (self._ws is not None) and (not ws_closed)

        return {
            "host_id": self.host_id,
            "running": self._running,
            "websocket_connected": websocket_connected,
            "ffmpeg_running": bool(
                self._ffmpeg_process and self._ffmpeg_process.poll() is None
            ),
            "signal_backend_url": self.signal_backend_url,
            "rtmp_url": self.rtmp_url,
        }

</code_context>

<issue_to_address>
**suggestion (code-quality):** We've found these issues:

- Move setting of default value for variable into `else` branch ([`introduce-default-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/introduce-default-else/))
- Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))

```suggestion
        ws_closed = getattr(self._ws, "closed", None) if self._ws is not None else None
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant