Skip to content

Conversation

@noiosoooo9999
Copy link

@noiosoooo9999 noiosoooo9999 commented Oct 21, 2025

全球数智教育创新大赛决赛SynthonX团队代码

@sourcery-ai
Copy link

sourcery-ai bot commented Oct 21, 2025

Reviewer's Guide

This PR implements a unified control stack for a SynthonX liquid‐handling platform, introducing a thread‐safe RS485 bus and Modbus RTU helper, high‐level XYZ motion and pipette abstractions, a CLI and a full tkinter GUI frontend, a YAML registry for UniLab workflows along with advanced flow routines, and a USB relay controller for stirring. It ties together low‐level communication, coordinate transforms, safe homing/motion, JSON‐backed point management, and process sequences into a cohesive package.

Sequence diagram for safe movement and pipetting (SynthonX platform)

sequenceDiagram
    actor User
    participant GUI
    participant Station
    participant SharedXYZController
    participant SOPAPipetteYYQ
    User->>GUI: Request move and pipette
    GUI->>Station: move_to_work_safe(x, y, z)
    Station->>SharedXYZController: move_to_work_safe(x, y, z)
    SharedXYZController->>SharedRS485Bus: send movement commands
    SharedXYZController-->>Station: movement complete
    Station->>SOPAPipetteYYQ: aspirate(volume)
    SOPAPipetteYYQ->>SharedRS485Bus: send pipette command
    SOPAPipetteYYQ-->>Station: aspirate complete
    Station-->>GUI: operation result
Loading

Class diagram for SynthonX unified control stack

classDiagram
    class SharedRS485Bus {
        +port: str
        +baudrate: int
        +timeout: float
        +serial
        +lock
        +open()
        +close()
        +reset_input()
        +write(data: bytes)
        +read(n: int)
        +read_exact(n: int, overall_timeout: float)
    }
    class XYZModbus {
        +bus: SharedRS485Bus
        +ignore_crc_error: bool
        +set_ignore_crc(flag: bool)
        +read_regs(slave: int, addr: int, count: int)
        +write_reg(slave: int, addr: int, val: int)
        +write_regs(slave: int, start: int, values: List[int])
    }
    class SharedXYZController {
        +bus: SharedRS485Bus
        +mb: XYZModbus
        +cfg: MachineConfig
        +origin: CoordinateOrigin
        +enable(axis: MotorAxis, on: bool)
        +emergency_stop(axis: MotorAxis)
        +get_motor_status(axis: MotorAxis)
        +move_to_steps(axis: MotorAxis, steps: int, speed_rpm: int, accel: int, precision: int)
        +wait_for_completion(axis: MotorAxis, timeout: float)
        +home_axis(axis: MotorAxis, direction: int)
        +home_all()
        +set_work_origin_here()
        +move_to_work_safe(x, y, z, speed, accel)
        +move_rel_z_mm(dz: float, speed, accel)
    }
    class SOPAPipetteYYQ {
        +bus: SharedRS485Bus
        +config: SOPAConfig
        +is_initialized: bool
        +initialize()
        +eject_tip()
        +aspirate(volume_uL: float)
        +dispense(volume_uL: float)
    }
    class LiquidStation {
        +bus: SharedRS485Bus
        +xyz: SharedXYZController
        +pip: SOPAPipetteYYQ
        +connect()
        +disconnect()
        +home_all()
        +set_work_origin_here()
        +move_to(x, y, z, speed, accel)
        +move_rel_z(dz_mm: float)
        +pipette_init()
        +eject_tip()
        +aspirate(vol_ul: float)
        +dispense(vol_ul: float)
        +estop_all()
    }
    class Station {
        +bus: SharedRS485Bus
        +xyz: SharedXYZController
        +pip: SOPAPipetteYYQ
        +connect()
        +disconnect()
        +set_work_origin_here()
        +home_safe()
        +emergency_stop()
        +get_status_mm()
        +move_to_work_safe(x, y, z, speed, acc)
        +move_to_work_direct(x, y, z, speed, acc, z_order)
        +move_relative_direct(dx, dy, dz, speed, acc)
        +pip_init()
        +pip_eject()
        +pip_asp(ul: float)
        +pip_dsp(ul: float)
    }
    class RelayController {
        +port: str
        +baudrate: int
        +timeout: float
        +ser
        +connect()
        +on(wait_response: bool)
        +off(wait_response: bool)
        +toggle(wait_response: bool)
        +state()
        +close()
        +ensure_off_on_exit()
    }
    class SynthonXFlowV2 {
        +cfg: FlowConfig
        +station: Station
        +reactor: RelayController
        +system_init()
        +pick_tip(tip_point: str, down_mm: float)
        +drop_tip(tip_point: str, down_mm: float)
        +transfer_A_to_B(...)
        +transfer_B_to_D(...)
        +filtering(...)
        +pushing(...)
        +load_for_nmr(...)
        +stir_on(...)
        +stir_off(...)
        +stir_for(...)
    }
    SharedRS485Bus <|-- XYZModbus
    SharedRS485Bus <|-- SOPAPipetteYYQ
    SharedRS485Bus <|-- SharedXYZController
    SharedXYZController <|-- LiquidStation
    SharedXYZController <|-- Station
    SOPAPipetteYYQ <|-- LiquidStation
    SOPAPipetteYYQ <|-- Station
    Station <|-- SynthonXFlowV2
    RelayController <|-- SynthonXFlowV2
Loading

File-Level Changes

Change Details Files
Implement unified RS485 bus and Modbus RTU helper
  • Add SharedRS485Bus with thread‐safe open/close/read/write
  • Implement CRC16, retry and error handling in XYZModbus._xfer
  • Provide read_regs, write_reg and write_regs methods with fault tolerance
unilabos/devices/SynthonX/SynthonX.py
Provide high‐level XYZ and pipette abstractions
  • Add SharedXYZController for mm↔steps, homing, safe moves, status monitoring
  • Implement MotorPosition dataclass and MotorAxis/Status enums
  • Introduce SOPAPipetteYYQ driver with initialize/eject/aspirate/dispense methods
unilabos/devices/SynthonX/SynthonX.py
Add CLI LiquidStation with JSON point management
  • Combine SharedRS485Bus, SharedXYZController and SOPAPipetteYYQ into LiquidStation
  • Implement connect/disconnect, load/save points and menu‐driven operations
  • Expose homing, safe moves, relative Z, pipette ops through a text UI
unilabos/devices/SynthonX/SynthonX.py
Introduce tkinter GUI frontend (SynthonX_gui)
  • Wrap backend in Station class for connection, motion and pipette
  • Build multi‐tab UI: XYZ control, pipette, flow demo, map with point selection
  • Handle JSON points file, canvas drawing, and point‐based safe/relative moves
unilabos/devices/SynthonX/SynthonX_gui.py
unilabos/devices/SynthonX/points_gui.json
Define UniLab registry and advanced workflows
  • Add synthonx.yaml describing UniLabJsonCommand schemas
  • Implement SynthonXFlowV2 with system_init, pick/drop tip, transfer, filtering, pushing, load_for_nmr
  • Use Station and FlowConfig to orchestrate high‐level processes
unilabos/registry/devices/synthonx.yaml
unilabos/devices/SynthonX/SynthonX_flow_v3.py
Add USB relay controller for stirrer
  • Implement RelayController with on/off/toggle and serial response
  • Define ensure_off_on_exit for safe shutdown
  • Expose relay commands in flow class as stir_on/off/toggle/for
unilabos/devices/SynthonX/SynthonX_reactor.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • SynthonX.py is massive and mixes RS485 bus, Modbus, XYZ controller, pipette, and CLI logic—consider splitting it into smaller modules (e.g. bus, modbus, controller, pipette) to improve readability and maintainability.
  • I noticed SharedXYZController.move_to_work_safe is defined twice with different logic—remove the redundant definition and consolidate CRC‐ignore handling into a single implementation to avoid confusion.
  • The GUI duplicates a lot of the CLI/station interaction code; extract common station workflows into reusable APIs so both CLI and GUI can share logic without copying it.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- SynthonX.py is massive and mixes RS485 bus, Modbus, XYZ controller, pipette, and CLI logic—consider splitting it into smaller modules (e.g. bus, modbus, controller, pipette) to improve readability and maintainability.
- I noticed SharedXYZController.move_to_work_safe is defined twice with different logic—remove the redundant definition and consolidate CRC‐ignore handling into a single implementation to avoid confusion.
- The GUI duplicates a lot of the CLI/station interaction code; extract common station workflows into reusable APIs so both CLI and GUI can share logic without copying it.

## Individual Comments

### Comment 1
<location> `unilabos/devices/SynthonX/SynthonX.py:515` </location>
<code_context>
+        if z is not None and (z < 0 or z > self.cfg.max_travel_z):
+            raise CoordinateSystemError(f"Z out of range: {z}")
+
+    def move_to_work_safe(self, x=None, y=None, z=None, speed=None, accel=None) -> bool:
+        self.check_limits(x, y, z)
+        speed = speed or self.cfg.default_speed
</code_context>

<issue_to_address>
**issue:** Duplicate definition of move_to_work_safe detected.

Remove the first move_to_work_safe definition to prevent confusion and ensure consistent handling of CRC errors.
</issue_to_address>

### Comment 2
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:301-303` </location>
<code_context>
+        p = self._pt(tip_point)
+        self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
+        time.sleep(self.cfg.settle_s)
+        self.station.move_relative_direct(0.0, 0.0, float(down_mm))
+        time.sleep(self.cfg.settle_s)
+        self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
+        time.sleep(self.cfg.settle_s)
+        print(f'{tip_point}枪头已经装载')
</code_context>

<issue_to_address>
**suggestion (bug_risk):** No error handling for pipette pick/drop failures.

Currently, pick_tip and drop_tip always return True, even if move_relative_direct or pip_eject fail. Please add error handling or status checks to ensure failures are detected and handled appropriately.

Suggested implementation:

```python
    def pick_tip(self, tip_point: str, down_mm: float = 120) -> bool:
        p = self._pt(tip_point)
        ok1 = self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
        time.sleep(self.cfg.settle_s)
        ok2 = self.station.move_relative_direct(0.0, 0.0, float(down_mm))
        time.sleep(self.cfg.settle_s)
        ok3 = self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
        time.sleep(self.cfg.settle_s)
        if not (ok1 and ok2 and ok3):
            print(f'ERROR: {tip_point}枪头装载失败')
            return False
        print(f'{tip_point}枪头已经装载')
        return True

```

```python
    def drop_tip(self, tip_point: str, down_mm: float = 60) -> bool:
        p = self._pt(tip_point)
        ok1 = self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
        time.sleep(self.cfg.settle_s)
        ok2 = self.station.move_relative_direct(0.0, 0.0, float(down_mm))
        time.sleep(self.cfg.settle_s)
        ok3 = self.station.pip_eject()
        time.sleep(self.cfg.settle_s)
        if not (ok1 and ok2 and ok3):
            print(f'ERROR: {tip_point}枪头卸载失败')
            return False
        print(f'{tip_point}枪头已经卸载')
        return True

```

If `move_to_work_safe`, `move_relative_direct`, or `pip_eject` do not currently return a status (True/False), you will need to update those methods to do so, or use another way to detect failure (e.g., exception handling).
</issue_to_address>

### Comment 3
<location> `unilabos/devices/SynthonX/SynthonX.py:75-83` </location>
<code_context>
        while len(buf) < n:
            if time.time() > deadline:
                break
            need = n - len(buf)
            chunk = self.read(need)
            if chunk:
                buf += chunk
            else:
                time.sleep(0.001)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Move a guard clause in a while statement's body into its test ([`while-guard-to-condition`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/while-guard-to-condition))

```suggestion
        while len(buf) < n and not time.time() > deadline:
            need = n - len(buf)
            chunk = self.read(need)
            if chunk:
                buf += chunk
            else:
                time.sleep(0.001)

```

<br/><details><summary>Explanation</summary>Removing the guard clause simplifies the code and makes clearer the intention of
the loop.
</details>
</issue_to_address>

### Comment 4
<location> `unilabos/devices/SynthonX/SynthonX.py:79-80` </location>
<code_context>
    def read_exact(self, n: int, overall_timeout: float = 0.3) -> bytes:
        """Read exactly n bytes within overall_timeout; return b'' if timeout."""
        if n <= 0:
            return b""
        buf = b""
        deadline = time.time() + overall_timeout
        while len(buf) < n:
            if time.time() > deadline:
                break
            need = n - len(buf)
            chunk = self.read(need)
            if chunk:
                buf += chunk
            else:
                time.sleep(0.001)
        return buf

</code_context>

<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))

```suggestion
            if chunk := self.read(need):
```
</issue_to_address>

### Comment 5
<location> `unilabos/devices/SynthonX/SynthonX.py:144` </location>
<code_context>
    def set_ignore_crc(self, flag: bool):
        self.ignore_crc_error = bool(flag)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))

```suggestion
        self.ignore_crc_error = flag
```
</issue_to_address>

### Comment 6
<location> `unilabos/devices/SynthonX/SynthonX.py:159` </location>
<code_context>
    def _xfer(self, slave: int, payload: bytes, retries: int = 3) -> bytes:
        req = bytes([slave]) + payload
        frame = req + self._crc16(req)
        fn_req = payload[0]

    # 不做统计,只在最终失败时可选择忽略返回
        for attempt in range(1, retries + 1):
            with self.bus.lock:
                if not self.bus.serial or not self.bus.serial.is_open:
                    raise ModbusException("Bus not open")

                self.bus.reset_input()
                self.bus.write(frame)

            time.sleep(0.010)

            try:
                base = 0.30 + 0.15*(attempt-1)
                header = self.bus.read_exact(2, overall_timeout=base)
                if len(header) < 2:
                    raise ModbusException("No response")

                addr, fn = header[0], header[1]
                if addr != slave:
                    # 把这一帧当成串扰/回波,丢弃后继续本次尝试
                    time.sleep(0.005)
                    continue

                if (fn & 0x80) != 0:
                    rest = self.bus.read_exact(3, overall_timeout=base)
                    resp = header + rest
                    if len(rest) < 3:
                        raise ModbusException("Short exception response")
                    if resp[-2:] != self._crc16(resp[:-2]):
                        logger.warning(f"CRC mismatch (exception response) attempt {attempt}/{retries} slave={slave} fn=0x{fn_req:02X}")
                        if attempt >= retries:
                            if self.ignore_crc_error:
                                logger.error("CRC mismatch(异常帧)重试耗尽已忽略 (风险:异常码可能失真)")
                                return resp  # 返回未校验异常帧
                            raise ModbusException("CRC mismatch (exception)")
                        time.sleep(0.005)
                        continue
                    ex_code = resp[2]
                    raise ModbusException(f"Modbus exception: 0x{ex_code:02X}")

                if fn == 0x03:
                    bc_b = self.bus.read_exact(1, overall_timeout=base)
                    if len(bc_b) < 1:
                        raise ModbusException("Short response (no byte count)")
                    bc = bc_b[0]
                    data_crc = self.bus.read_exact(bc + 2, overall_timeout=base + 0.20)
                    resp = header + bc_b + data_crc
                    if len(data_crc) < bc + 2:
                        raise ModbusException("Short response (payload)")
                elif fn in (0x06, 0x10):
                    rest = self.bus.read_exact(6, overall_timeout=base + 0.20)
                    resp = header + rest
                    if len(rest) < 6:
                        raise ModbusException("Short response")
                else:
                    tail = self.bus.read_exact(254, overall_timeout=base + 0.30)
                    resp = header + tail
                    if len(resp) < 3:
                        raise ModbusException("Short response")

                if resp[-2:] != self._crc16(resp[:-2]):
                    logger.warning(f"CRC mismatch (attempt {attempt}/{retries}) slave={slave} fn=0x{fn_req:02X}")
                    if attempt >= retries:
                        if self.ignore_crc_error:
                            logger.error("CRC mismatch 重试耗尽已忽略 (风险:数据未校验)")
                            return resp  # 直接返回未校验帧
                        raise ModbusException("CRC mismatch")
                    time.sleep(0.005)
                    continue

                if resp[1] != fn_req:
                    raise ModbusException(f"Unexpected function: {resp[1]:02X} (!={fn_req:02X})")

                return resp  # 成功

            except ModbusException:
                if attempt >= retries:
                    # 已在 CRC 分支处理 ignore 情况;这里直接抛出其他类型异常
                    raise
                time.sleep(0.01)

</code_context>

<issue_to_address>
**issue (code-quality):** Low code quality found in XYZModbus.\_xfer - 7% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))

<br/><details><summary>Explanation</summary>The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

- Reduce the function length by extracting pieces of functionality out into
  their own functions. This is the most important thing you can do - ideally a
  function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
  sits together within the function rather than being scattered.</details>
</issue_to_address>

### Comment 7
<location> `unilabos/devices/SynthonX/SynthonX.py:250-253` </location>
<code_context>
    def read_regs(self, slave: int, addr: int, count: int) -> List[int]:
        fn = 0x03
        payload = bytes([fn]) + addr.to_bytes(2, "big") + count.to_bytes(2, "big")
        resp = self._xfer(slave, payload)
        byte_count = resp[2]
        vals = []
        for i in range(0, byte_count, 2):
            vals.append(int.from_bytes(resp[3 + i:5 + i], "big"))
        return vals

</code_context>

<issue_to_address>
**suggestion (code-quality):** We've found these issues:

- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))

```suggestion
        return [
            int.from_bytes(resp[3 + i : 5 + i], "big")
            for i in range(0, byte_count, 2)
        ]
```
</issue_to_address>

### Comment 8
<location> `unilabos/devices/SynthonX/SynthonX.py:392-395` </location>
<code_context>
    def move_to_steps(self, axis: MotorAxis, steps: int, speed_rpm: int = 1000,
                      accel: int = 1000, precision: int = 100) -> bool:
        a = self.addr[axis]
        if steps < 0:
            steps = (steps + 0x100000000) & 0xFFFFFFFF
        hi = (steps >> 16) & 0xFFFF
        lo = steps & 0xFFFF
        ok = self.mb.write_regs(a, XYZModbus.REG_TARGET_POSITION_HIGH, [
            hi, lo, speed_rpm, accel, precision
        ])
        return ok

</code_context>

<issue_to_address>
**suggestion (code-quality):** Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))

```suggestion
        return self.mb.write_regs(
            a,
            XYZModbus.REG_TARGET_POSITION_HIGH,
            [hi, lo, speed_rpm, accel, precision],
        )
```
</issue_to_address>

### Comment 9
<location> `unilabos/devices/SynthonX/SynthonX.py:791` </location>
<code_context>
def main():
    print("\n=== Unified XYZ + YYQ SOPA (Single-Port) ===")
    port = input("串口端口 (默认 COM3): ").strip() or "COM3"
    station = LiquidStation(port)
    station.connect()
    station.load_points()

    init_pip = input("是否初始化移液器? (y/N): ").strip().lower() in ("y", "yes")
    if init_pip:
        if station.pipette_init():
            print("移液器初始化完成。")
        else:
            print("移液器初始化失败。")

    while True:
        print("\n" + "=" * 50)
        print("1) 全轴回零(Z→X→Y)")
        print("2) 设定当前位置为工作原点")
        print("3) 安全移动到点 (X/Y/Z,mm)")
        print("4) Z 轴相对移动 (mm)")
        print("5) 保存/前往点位")
        print("6) 移液:初始化 / 弹出枪头 / 吸液 / 排液")
        print("7) 直接移动(不抬Z) X/Y/Z + 顺序(first/last/auto)")
        print("99) 紧急停止")
        print("0) 退出")
        choice = input("选择: ").strip()

        if choice == "0":
            break

        elif choice == "1":
            print("回零中…")
            print("成功" if station.home_all() else "失败")

        elif choice == "2":
            print("设定工作原点…")
            print("成功" if station.set_work_origin_here() else "失败")

        elif choice == "3":
            x = input("X(mm, 空=跳过): ").strip()
            y = input("Y(mm, 空=跳过): ").strip()
            z = input("Z(mm, 空=跳过): ").strip()
            x = float(x) if x else None
            y = float(y) if y else None
            z = float(z) if z else None
            ok = station.move_to(x, y, z)
            print("到位" if ok else "失败")

        elif choice == "4":
            dz = float(input("Z 相对位移(mm,正=下降): ").strip())
            ok = station.move_rel_z(dz)
            print("完成" if ok else "失败")

        elif choice == "5":
            sub = input("(a)保存点  (b)前往点: ").strip().lower()
            if sub == "a":
                name = input("点名: ").strip()
                x = float(input("X(mm): ").strip())
                y = float(input("Y(mm): ").strip())
                z = float(input("Z(mm): ").strip())
                station._points[name] = {"x": x, "y": y, "z": z}
                station.save_points()
                print("已保存")
            else:
                name = input("点名: ").strip()
                pt = station._points.get(name)
                if not pt:
                    print("未找到该点")
                else:
                    ok = station.move_to(pt["x"], pt["y"], pt["z"])
                    print("到位" if ok else "失败")

        elif choice == "6":
            sub = input("(a)初始化  (b)弹出枪头  (c)吸液  (d)排液: ").strip().lower()
            if sub == "a":
                print("初始化…")
                print("完成" if station.pipette_init() else "失败")
            elif sub == "b":
                print("弹出枪头…")
                station.eject_tip()
                print("完成")
            elif sub == "c":
                vol = float(input("吸液体积(µL): ").strip())
                station.aspirate(vol)
            elif sub == "d":
                vol = float(input("排液体积(µL): ").strip())
                station.dispense(vol)
            else:
                print("无效子选项")

        elif choice == "7":
            x = input("X(mm, 空=跳过): ").strip()
            y = input("Y(mm, 空=跳过): ").strip()
            z = input("Z(mm, 空=跳过): ").strip()
            z_order = input("Z顺序(first/last/auto, 默认auto): ").strip().lower() or "auto"
            x = float(x) if x else None
            y = float(y) if y else None
            z = float(z) if z else None
            ok = station.move_to_direct(x=x, y=y, z=z, z_order=z_order)
            print("到位" if ok else "失败")

        elif choice == "99":
            station.estop_all()
            print("已急停")

        else:
            print("无效选项")

    station.disconnect()
    print("Bye.")

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Swap positions of nested conditionals ([`swap-nested-ifs`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-nested-ifs/))
- Hoist nested repeated code outside conditional statements ([`hoist-similar-statement-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-similar-statement-from-if/))
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Swap if/else branches ([`swap-if-else-branches`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-if-else-branches/))
- Low code quality found in main - 4% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))

<br/><details><summary>Explanation</summary>



The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

- Reduce the function length by extracting pieces of functionality out into
  their own functions. This is the most important thing you can do - ideally a
  function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
  sits together within the function rather than being scattered.</details>
</issue_to_address>

### Comment 10
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:56-57` </location>
<code_context>
def _idx_from_name(name: str) -> int:
    """提取 'C1' / 'D49' 的编号 -> 1 / 49"""
    try:
        return int(''.join(c for c in name if c.isdigit()))
    except Exception:
        raise ValueError(f"无法解析点名编号: {name!r}")

</code_context>

<issue_to_address>
**suggestion (code-quality):** Explicitly raise from a previous error ([`raise-from-previous-error`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/raise-from-previous-error/))

```suggestion
    except Exception as e:
        raise ValueError(f"无法解析点名编号: {name!r}") from e
```
</issue_to_address>

### Comment 11
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:199` </location>
<code_context>
    def stir_for(self, seconds: float, wait_response: bool = True) -> bool:
        """阻塞式搅拌指定秒数,超简单实用。"""
        _require(seconds > 0, "seconds 必须>0")
        if self.stir_on(wait_response=wait_response):
            try:
                print(f"持续搅拌 {seconds} 秒...")
                time.sleep(float(seconds))
            finally:
                self.stir_off(wait_response=wait_response)
                print("搅拌结束")
            return True
        return False

</code_context>

<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))

```suggestion
                time.sleep(seconds)
```
</issue_to_address>

### Comment 12
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:254` </location>
<code_context>
    def _down_rel(self, dz: float):
        _require(dz >= 0, "下探距离必须>=0")
        self.station.move_relative_direct(0.0, 0.0, -float(dz))
        time.sleep(self.cfg.settle_s)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))

```suggestion
        self.station.move_relative_direct(0.0, 0.0, -dz)
```
</issue_to_address>

### Comment 13
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:259` </location>
<code_context>
    def _up_rel(self, dz: float):
        _require(dz >= 0, "上移距离必须>=0")
        self.station.move_relative_direct(0.0, 0.0, +float(dz))
        time.sleep(self.cfg.settle_s)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))

```suggestion
        self.station.move_relative_direct(0.0, 0.0, +dz)
```
</issue_to_address>

### Comment 14
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:263` </location>
<code_context>
    def _xy_rel(self, dx: float, dy: float):
        self.station.move_relative_direct(float(dx), float(dy), 0.0)
        time.sleep(self.cfg.settle_s)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool [×2] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))

```suggestion
        self.station.move_relative_direct(dx, dy, 0.0)
```
</issue_to_address>

### Comment 15
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:301-303` </location>
<code_context>
    def pick_tip(self, tip_point: str, down_mm: float = 120) -> bool:
        p = self._pt(tip_point)
        self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
        time.sleep(self.cfg.settle_s)
        self.station.move_relative_direct(0.0, 0.0, float(down_mm))
        time.sleep(self.cfg.settle_s)
        self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
        time.sleep(self.cfg.settle_s)
        print(f'{tip_point}枪头已经装载')
        return True

</code_context>

<issue_to_address>
**issue (code-quality):** Remove unnecessary casts to int, str, float or bool [×2] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
</issue_to_address>

### Comment 16
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:312-316` </location>
<code_context>
    def drop_tip(self, tip_point: str, down_mm: float = 60) -> bool:
        p = self._pt(tip_point)
        self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
        time.sleep(self.cfg.settle_s)
        self.station.move_relative_direct(0.0, 0.0, float(down_mm))
        time.sleep(self.cfg.settle_s)
        self.station.pip_eject()
        time.sleep(self.cfg.settle_s)
        self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
        time.sleep(self.cfg.settle_s)
        print(f'枪头已经弃置在{tip_point}')
        return True

</code_context>

<issue_to_address>
**issue (code-quality):** Remove unnecessary casts to int, str, float or bool [×2] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
</issue_to_address>

### Comment 17
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:349` </location>
<code_context>
    def _do_transfer(self,
                    src_name: str,
                    dst_names: Union[str, Iterable[str]],
                    tip_c_name: str,
                    total_ul: float,
                    down_src_mm: float,
                    down_dst_mm: float,
                    split_volumes: Optional[List[float]] = None,
                    stir_post_s: Optional[float] = None) -> bool:

        # 修改:统一解析目标名
        dst_list = _split_names(dst_names)
        _require(len(dst_list) >= 1, "目标点名至少1个")

        # (1) 取枪头
        self.pick_tip(tip_c_name, down_mm=120)
        time.sleep(self.cfg.settle_s)

        # (2) 到源位
        src_p = self._pt(src_name)
        self.station.move_to_work_safe(x=src_p["x"], y=src_p["y"], z=src_p["z"])
        time.sleep(self.cfg.settle_s)
        print('到达源点位')

        # (3) 下探源位
        self.station.move_relative_direct(0.0, 0.0, float(down_src_mm))
        time.sleep(self.cfg.settle_s)
        print('下探完成')

        # (4) 吸液
        self.station.pip_asp(float(total_ul))
        time.sleep(self.cfg.settle_s)
        print('吸取液体完成')

        # (5) 回升源位
        self.station.move_relative_direct(0.0, 0.0, -float(down_src_mm))
        time.sleep(self.cfg.settle_s)
        print('回升完成')

        # === 目标处理 ===
        if len(dst_list) == 1:
            # 单目标
            dst_p = self._pt(dst_list[0])
            self.station.move_to_work_safe(x=dst_p["x"], y=dst_p["y"], z=dst_p["z"])
            time.sleep(self.cfg.settle_s)
            print('移动到目标点位')

            self.station.move_relative_direct(0.0, 0.0, float(down_dst_mm))
            time.sleep(self.cfg.settle_s)
            print('下探')

            self.station.pip_dsp(float(total_ul))
            time.sleep(self.cfg.settle_s)
            time.sleep(self.cfg.delay_after_dispense)

            self.station.move_relative_direct(0.0, 0.0, -float(down_dst_mm))
            time.sleep(self.cfg.settle_s)

        else:
            # 多目标
            if split_volumes is not None:
                _require(len(split_volumes) == len(dst_list), "split_volumes 长度需与目标数量一致")
                vols = [float(v) for v in split_volumes]
            else:
                each = float(total_ul) / float(len(dst_list))
                vols = [each] * len(dst_list)

            first_name = dst_list[0]
            first_p = self._pt(first_name)
            self.station.move_to_work_safe(x=first_p["x"], y=first_p["y"], z=first_p["z"])
            time.sleep(self.cfg.settle_s)

            self.station.move_relative_direct(0.0, 0.0, float(down_dst_mm))
            time.sleep(self.cfg.settle_s)

            self.station.pip_dsp(vols[0])
            time.sleep(self.cfg.settle_s)

            base_x, base_y = first_p["x"], first_p["y"]
            for nm, v in zip(dst_list[1:], vols[1:]):
                self.station.move_relative_direct(0.0, 0.0, -float(down_dst_mm))
                time.sleep(self.cfg.settle_s)
                p = self._pt(nm)
                dx, dy = p["x"] - base_x, p["y"] - base_y
                self.station.move_relative_direct(float(dx), float(dy), 0.0)
                time.sleep(self.cfg.settle_s)
                self.station.move_relative_direct(0.0, 0.0, float(down_dst_mm))
                time.sleep(self.cfg.settle_s)
                self.station.pip_dsp(v)
                time.sleep(self.cfg.settle_s)
                base_x, base_y = p["x"], p["y"]

        # —— 如果设定了加液后搅拌时间,则触发搅拌 ——
        if stir_post_s is not None and float(stir_post_s) > 0:
            try:
                print(f"[搅拌] 加液完成,搅拌 {float(stir_post_s)} s ...")
                self.stir_for(float(stir_post_s))
            except Exception as e:
                print(f"[搅拌] 触发失败:{e}(忽略,不影响主流程)")

        # 映射 C 槽到 +48 的弃置位
        def upgrade_c_name(name: str) -> str:
            m = re.fullmatch(r"C(\d+)", name.strip().upper())
            if not m:
                return name
            idx = int(m.group(1))
            return f"C{idx + 48}"
        tip_c_name_new = upgrade_c_name(tip_c_name)

        # (10) 放枪头
        self.drop_tip(tip_c_name_new, down_mm=60.0)
        return True

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Remove unnecessary casts to int, str, float or bool [×5] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
- Replace m.group(x) with m[x] for re.Match objects ([`use-getitem-for-re-match-groups`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/use-getitem-for-re-match-groups/))
</issue_to_address>

### Comment 18
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:543` </location>
<code_context>
    def pushing(self,
                tip_c_name: str) -> bool:
        z_down_mm = 136.0
        y_forward_mm = 60.0
        _require(z_down_mm > 0, "z_down_mm 必须>0")
        _require(y_forward_mm > 0, "y_forward_mm 必须>0")
        _require(_zone_from_name(tip_c_name) in ("C"), "枪头点名应在C区")

        self.pick_tip(tip_c_name)

        d25_p = self._pt("D25")
        self.station.move_to_work_safe(x=d25_p["x"], y=d25_p["y"], z=d25_p["z"])
        time.sleep(self.cfg.settle_s)
        print("到达 D25 点位")

        self.station.move_relative_direct(0.0, 0.0, float(z_down_mm))
        time.sleep(self.cfg.settle_s)
        print("下压完成")

        self.station.move_relative_direct(0.0, float(y_forward_mm), 0.0)
        time.sleep(self.cfg.settle_s)
        print("推动完成")

        self.station.move_relative_direct(0.0, 0.0, -float(z_down_mm))
        time.sleep(self.cfg.settle_s)
        print("抬升完成")

        def upgrade_c_name(name: str) -> str:
            m = re.fullmatch(r"C(\d+)", name.strip().upper())
            if not m:
                return name
            idx = int(m.group(1))
            return f"C{idx + 48}"
        tip_c_name_new = upgrade_c_name(tip_c_name)
        self.drop_tip(tip_c_name_new)
        print("推动流程完成")

        return True

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Remove unnecessary casts to int, str, float or bool [×3] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
- Replace m.group(x) with m[x] for re.Match objects ([`use-getitem-for-re-match-groups`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/use-getitem-for-re-match-groups/))
</issue_to_address>

### Comment 19
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:583-592` </location>
<code_context>
    def load_for_nmr(self,
                    src_d_name: str,     # 源:D区 (如 D1-D8)
                    dst_d_name: str,     # 目标:D区 (如 D17-D24)
                    tip_c_name: str,     # 枪头:C区
                    total_ul: float,
                    stir_post_s: Optional[float] = None) -> bool:
        down_src_mm = 138
        down_dst_mm = 9
        _require(_zone_from_name(src_d_name) == "D", "源位必须在 D 区")
        _require(_zone_from_name(dst_d_name) == "D", "目标位必须在 D 区")
        _require(src_d_name != dst_d_name, "源与目标不能相同")
        _require(_zone_from_name(tip_c_name) == "C", "枪头点名必须在 C 区")

        return self._do_transfer(
            src_name=src_d_name,
            dst_names=dst_d_name,
            tip_c_name=tip_c_name,
            total_ul=float(total_ul),
            down_src_mm=float(down_src_mm),
            down_dst_mm=float(down_dst_mm),
            split_volumes=None,
            stir_post_s=stir_post_s
        )

</code_context>

<issue_to_address>
**issue (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
</issue_to_address>

### Comment 20
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:211` </location>
<code_context>
    def move_relative_direct(self, dx: float, dy: float, dz: float, speed: Optional[int]=None, acc: Optional[int]=None) -> bool:
        """基于当前位置直接到新目标(工作坐标 Δmm),不抬Z;
        策略:若目标Z>当前Z,先XY后Z;若目标Z<=当前Z,先Z后XY。
        """
        assert self.xyz is not None
        speed = speed or self.cfg.default_speed
        acc = acc or self.cfg.default_acceleration
        # 当前绝对步(机坐标步数)
        sx = self.xyz.get_motor_status(MotorAxis.X).steps
        sy = self.xyz.get_motor_status(MotorAxis.Y).steps
        sz = self.xyz.get_motor_status(MotorAxis.Z).steps
        # Δmm→Δsteps(Δ与零点无关,可直接换算)
        tx = sx + self.xyz.mm_to_steps(MotorAxis.X, dx)
        ty = sy + self.xyz.mm_to_steps(MotorAxis.Y, dy)
        tz = sz + self.xyz.mm_to_steps(MotorAxis.Z, dz)
        # 顺序:仅按相对大小决定
        order = ("xy","z") if tz > sz else ("z","xy")
        ok = True
        try:
            if "z" in order[0]:
                ok &= self.xyz.move_to_steps(MotorAxis.Z, tz, speed, acc)
                ok &= self.xyz.wait_for_completion(MotorAxis.Z, 20.0)
                ok &= self.xyz.move_to_steps(MotorAxis.X, tx, speed, acc)
                ok &= self.xyz.move_to_steps(MotorAxis.Y, ty, speed, acc)
                ok &= self.xyz.wait_for_completion(MotorAxis.X, 20.0)
                ok &= self.xyz.wait_for_completion(MotorAxis.Y, 20.0)
            else:
                ok &= self.xyz.move_to_steps(MotorAxis.X, tx, speed, acc)
                ok &= self.xyz.move_to_steps(MotorAxis.Y, ty, speed, acc)
                ok &= self.xyz.wait_for_completion(MotorAxis.X, 20.0)
                ok &= self.xyz.wait_for_completion(MotorAxis.Y, 20.0)
                ok &= self.xyz.move_to_steps(MotorAxis.Z, tz, speed, acc)
                ok &= self.xyz.wait_for_completion(MotorAxis.Z, 20.0)
        except Exception:
            ok = False
        return bool(ok)

</code_context>

<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))

```suggestion
        return ok
```
</issue_to_address>

### Comment 21
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:441` </location>
<code_context>
    def xyz_move_relative_inputs(self):
        s = self._need_station();  
        if not s: return
        try:
            sp = int(self.speed_var.get())
            ac = int(self.acc_var.get())
            dx, dy, dz = self.rx_var.get(), self.ry_var.get(), self.rz_var.get()
            ok = s.move_relative_direct(dx, dy, dz, speed=sp, acc=ac)
            if ok:
                # 获取并输出当前绝对坐标(工作坐标)
                pos = s.get_status_mm()
                self.console.log(
                    f"相对位移(工作坐标):OK (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac}) → 绝对(x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f})"
                )
                # 刷新状态标签
                try: self.xyz_refresh()
                except Exception: pass
            else:
                self.console.log(f"相对位移(工作坐标):Fail (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac})")
        except Exception as e:
            messagebox.showerror("相对位移失败", str(e))
            self.console.log(f"相对位移失败:{e}")

</code_context>

<issue_to_address>
**issue (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
</issue_to_address>

### Comment 22
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:500` </location>
<code_context>
    def xyz_move_relative(self):
        s = self._need_station();  
        if not s: return
        try:
            sp = int(self.speed_var.get())
            ac = int(self.acc_var.get())
            dx, dy, dz = self.x_var.get(), self.y_var.get(), self.z_var.get()
            ok = s.move_relative_direct(dx, dy, dz, speed=sp, acc=ac)
            if ok:
                pos = s.get_status_mm()
                self.console.log(
                    f"相对直接移动(工作坐标):OK (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac}) → 绝对(x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f})"
                )
                try: self.xyz_refresh()
                except Exception: pass
            else:
                self.console.log(f"相对直接移动(工作坐标):Fail (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac})")
        except Exception as e:
            messagebox.showerror("相对移动失败", str(e))
            self.console.log(f"相对移动失败:{e}")

</code_context>

<issue_to_address>
**issue (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
</issue_to_address>

### Comment 23
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:618-621` </location>
<code_context>
    def _workspace_xy(self):
        try:
            if self.station and self.station.xyz:
                mc = self.station.cfg
            else:
                mc = MachineConfig()
            return float(mc.max_travel_x), float(mc.max_travel_y)
        except Exception:
            return 340.0, 250.0

</code_context>

<issue_to_address>
**suggestion (code-quality):** Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))

```suggestion
            mc = self.station.cfg if self.station and self.station.xyz else MachineConfig()
```
</issue_to_address>

### Comment 24
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:717` </location>
<code_context>
    def map_move_relative(self):
        s = self._need_station();  
        if not s: return
        name = self.map_selected.get().strip()
        if name not in self.points:
            messagebox.showwarning("未选择点", "请先在示意图上点击一个点"); return
        p = self.points[name]
        try:
            cur = s.get_status_mm()  # 工作坐标
            dx, dy, dz = p['x']-cur['x'], p['y']-cur['y'], p['z']-cur['z']
        except Exception:
            dx, dy, dz = p['x'], p['y'], p['z']
        sp, ac = int(self.speed_var.get()), int(self.acc_var.get())
        try:
            ok = s.move_relative_direct(dx, dy, dz, speed=sp, acc=ac)
            if ok:
                pos = s.get_status_mm()
                self.console.log(
                    f"[示意图] 相对移动至 '{name}'(工作坐标): OK (Δx={dx:.3f}, Δy={dy:.3f}, Δz={dz:.3f}) → 绝对(x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f})"
                )
                try: self.draw_map()
                except Exception: pass
            else:
                self.console.log(f"[示意图] 相对移动至 '{name}'(工作坐标): Fail (Δx={dx:.3f}, Δy={dy:.3f}, Δz={dz:.3f})")
        except Exception as e:
            messagebox.showerror("相对移动失败", str(e))
            self.console.log(f"[示意图] 相对移动失败:{e}")

</code_context>

<issue_to_address>
**issue (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
</issue_to_address>

### Comment 25
<location> `unilabos/devices/SynthonX/SynthonX_reactor.py:34` </location>
<code_context>
    def connect(self):
        if self.ser and self.ser.is_open:
            return
        try:
            self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
            time.sleep(2)  # 等待设备初始化
        except Exception as e:
            raise RuntimeError(f'连接串口失败: {e}')

</code_context>

<issue_to_address>
**suggestion (code-quality):** Explicitly raise from a previous error ([`raise-from-previous-error`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/raise-from-previous-error/))

```suggestion
            raise RuntimeError(f'连接串口失败: {e}') from e
```
</issue_to_address>

### Comment 26
<location> `unilabos/devices/SynthonX/SynthonX_reactor.py:43-44` </location>
<code_context>
    def _write(self, data: bytes, expect_response: bool = True) -> bytes | None:
        if not self.ser or not self.ser.is_open:
            raise RuntimeError('串口未连接, 请先调用 connect()')
        self.ser.write(data)
        if expect_response:
            # 读取直到最后一个字节或超时
            end_byte = data[-1:]
            resp = self.ser.read_until(end_byte)
            return resp
        return None

</code_context>

<issue_to_address>
**suggestion (code-quality):** Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))

```suggestion
            return self.ser.read_until(end_byte)
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

if z is not None and (z < 0 or z > self.cfg.max_travel_z):
raise CoordinateSystemError(f"Z out of range: {z}")

def move_to_work_safe(self, x=None, y=None, z=None, speed=None, accel=None) -> bool:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Duplicate definition of move_to_work_safe detected.

Remove the first move_to_work_safe definition to prevent confusion and ensure consistent handling of CRC errors.

Comment on lines +301 to +303
self.station.move_relative_direct(0.0, 0.0, float(down_mm))
time.sleep(self.cfg.settle_s)
self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): No error handling for pipette pick/drop failures.

Currently, pick_tip and drop_tip always return True, even if move_relative_direct or pip_eject fail. Please add error handling or status checks to ensure failures are detected and handled appropriately.

Suggested implementation:

    def pick_tip(self, tip_point: str, down_mm: float = 120) -> bool:
        p = self._pt(tip_point)
        ok1 = self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
        time.sleep(self.cfg.settle_s)
        ok2 = self.station.move_relative_direct(0.0, 0.0, float(down_mm))
        time.sleep(self.cfg.settle_s)
        ok3 = self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
        time.sleep(self.cfg.settle_s)
        if not (ok1 and ok2 and ok3):
            print(f'ERROR: {tip_point}枪头装载失败')
            return False
        print(f'{tip_point}枪头已经装载')
        return True
    def drop_tip(self, tip_point: str, down_mm: float = 60) -> bool:
        p = self._pt(tip_point)
        ok1 = self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
        time.sleep(self.cfg.settle_s)
        ok2 = self.station.move_relative_direct(0.0, 0.0, float(down_mm))
        time.sleep(self.cfg.settle_s)
        ok3 = self.station.pip_eject()
        time.sleep(self.cfg.settle_s)
        if not (ok1 and ok2 and ok3):
            print(f'ERROR: {tip_point}枪头卸载失败')
            return False
        print(f'{tip_point}枪头已经卸载')
        return True

If move_to_work_safe, move_relative_direct, or pip_eject do not currently return a status (True/False), you will need to update those methods to do so, or use another way to detect failure (e.g., exception handling).

Comment on lines +75 to +83
while len(buf) < n:
if time.time() > deadline:
break
need = n - len(buf)
chunk = self.read(need)
if chunk:
buf += chunk
else:
time.sleep(0.001)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Move a guard clause in a while statement's body into its test (while-guard-to-condition)

Suggested change
while len(buf) < n:
if time.time() > deadline:
break
need = n - len(buf)
chunk = self.read(need)
if chunk:
buf += chunk
else:
time.sleep(0.001)
while len(buf) < n and not time.time() > deadline:
need = n - len(buf)
chunk = self.read(need)
if chunk:
buf += chunk
else:
time.sleep(0.001)


ExplanationRemoving the guard clause simplifies the code and makes clearer the intention of
the loop.

self.ignore_crc_error = ignore_crc_error

def set_ignore_crc(self, flag: bool):
self.ignore_crc_error = bool(flag)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Remove unnecessary casts to int, str, float or bool (remove-unnecessary-cast)

Suggested change
self.ignore_crc_error = bool(flag)
self.ignore_crc_error = flag

crc >>= 1
return crc.to_bytes(2, "little")

def _xfer(self, slave: int, payload: bytes, retries: int = 3) -> bytes:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Low code quality found in XYZModbus._xfer - 7% (low-code-quality)


ExplanationThe quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

time.sleep(self.cfg.settle_s)

def _xy_rel(self, dx: float, dy: float):
self.station.move_relative_direct(float(dx), float(dy), 0.0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Remove unnecessary casts to int, str, float or bool [×2] (remove-unnecessary-cast)

Suggested change
self.station.move_relative_direct(float(dx), float(dy), 0.0)
self.station.move_relative_direct(dx, dy, 0.0)

ok &= self.xyz.wait_for_completion(MotorAxis.Z, 20.0)
except Exception:
ok = False
return bool(ok)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Remove unnecessary casts to int, str, float or bool (remove-unnecessary-cast)

Suggested change
return bool(ok)
return ok

Comment on lines +618 to +621
if self.station and self.station.xyz:
mc = self.station.cfg
else:
mc = MachineConfig()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Replace if statement with if expression (assign-if-exp)

Suggested change
if self.station and self.station.xyz:
mc = self.station.cfg
else:
mc = MachineConfig()
mc = self.station.cfg if self.station and self.station.xyz else MachineConfig()

self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
time.sleep(2) # 等待设备初始化
except Exception as e:
raise RuntimeError(f'连接串口失败: {e}')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Explicitly raise from a previous error (raise-from-previous-error)

Suggested change
raise RuntimeError(f'连接串口失败: {e}')
raise RuntimeError(f'连接串口失败: {e}') from e

Comment on lines +43 to +44
resp = self.ser.read_until(end_byte)
return resp
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Inline variable that is immediately returned (inline-immediately-returned-variable)

Suggested change
resp = self.ser.read_until(end_byte)
return resp
return self.ser.read_until(end_byte)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant