Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions tests/test_light.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

import asyncio
import contextlib
import logging
from typing import Any
from unittest.mock import AsyncMock, call, patch, sentinel
Expand Down Expand Up @@ -1976,3 +1977,81 @@ async def test_light_state_restoration(zha_gateway: Gateway) -> None:
assert entity.state["xy_color"] == (1, 2)
assert entity.state["color_mode"] == ColorMode.XY
assert entity.state["effect"] == "colorloop"


async def test_turn_on_cancellation_cleans_up_transition_flag(
zha_gateway: Gateway,
) -> None:
"""Test that task cancellation resets the transitioning flag.

When a mode:restart automation cancels the task, the light must not be
stuck ignoring attribute reports indefinitely.
"""
device = await device_light_1_mock(zha_gateway)
entity = get_entity(device, platform=Platform.LIGHT)

cluster_level = device.device.endpoints[1].level

# Make the level cluster block indefinitely so we can cancel the task while
# it is suspended at the first await inside async_turn_on.
blocked: asyncio.Future[None] = asyncio.get_running_loop().create_future()
original_request = cluster_level.request

async def blocking_request(*args, **kwargs):
await blocked
return await original_request(*args, **kwargs)

cluster_level.request = AsyncMock(side_effect=blocking_request)

# Start turn_on as a separate task, mirroring what HA does for automations.
task = asyncio.ensure_future(entity.async_turn_on(brightness=200, transition=1))

# Yield control so the task can run until it suspends on the cluster call.
await asyncio.sleep(0)
await asyncio.sleep(0)

# The transitioning flag must be set now, with no timer running yet.
assert entity.is_transitioning is True
assert entity._transition_listener is None

# Cancel the task (what mode:restart does to the running automation task).
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task

# The finally block must have cleared the flag.
assert entity.is_transitioning is False


async def test_turn_off_cancellation_cleans_up_transition_flag(
zha_gateway: Gateway,
) -> None:
"""Test that task cancellation during async_turn_off resets the transitioning flag."""
device = await device_light_1_mock(zha_gateway)
entity = get_entity(device, platform=Platform.LIGHT)

cluster_on_off = device.device.endpoints[1].on_off

# Make the on/off cluster block indefinitely so we can cancel mid-turn-off.
blocked: asyncio.Future[None] = asyncio.get_running_loop().create_future()
original_request = cluster_on_off.request

async def blocking_request(*args, **kwargs):
await blocked
return await original_request(*args, **kwargs)

cluster_on_off.request = AsyncMock(side_effect=blocking_request)

task = asyncio.ensure_future(entity.async_turn_off())

await asyncio.sleep(0)
await asyncio.sleep(0)

assert entity.is_transitioning is True
assert entity._transition_listener is None

task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task

assert entity.is_transitioning is False
Loading
Loading