diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3a7d59657..50b32194a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -13,7 +13,7 @@ jobs: linters: name: Linting and static analysis runs-on: ubuntu-24.04 - timeout-minutes: 5 # usually 1-2, rarely 3 mins (because of installations) + timeout-minutes: 7 # usually 5 mins with coverage steps: - uses: actions/checkout@v5 - uses: actions/setup-python@v6 @@ -44,7 +44,7 @@ jobs: python-version: "3.14" name: Python ${{ matrix.python-version }}${{ matrix.install-extras && ' ' || '' }}${{ matrix.install-extras }} runs-on: ubuntu-24.04 - timeout-minutes: 5 # usually 2-3 mins + timeout-minutes: 7 # usually 5 mins with coverage steps: - uses: actions/checkout@v5 - uses: actions/setup-python@v6 @@ -84,7 +84,7 @@ jobs: python-version: [ "pypy-3.10", "pypy-3.11" ] name: Python ${{ matrix.python-version }}${{ matrix.install-extras && ' ' || '' }}${{ matrix.install-extras }} runs-on: ubuntu-24.04 - timeout-minutes: 10 + timeout-minutes: 5 steps: - uses: actions/checkout@v5 - uses: actions/setup-python@v6 diff --git a/.github/workflows/thorough.yaml b/.github/workflows/thorough.yaml index 979303612..6c6e6ed2a 100644 --- a/.github/workflows/thorough.yaml +++ b/.github/workflows/thorough.yaml @@ -17,7 +17,7 @@ jobs: linters: name: Linting and static analysis runs-on: ubuntu-24.04 - timeout-minutes: 5 # usually 1-2, rarely 3 mins (because of installations) + timeout-minutes: 7 # usually 5 mins with coverage steps: - uses: actions/checkout@v5 - uses: actions/setup-python@v6 @@ -48,7 +48,7 @@ jobs: python-version: "3.14" name: Python ${{ matrix.python-version }}${{ matrix.install-extras && ' ' || '' }}${{ matrix.install-extras }} runs-on: ubuntu-24.04 - timeout-minutes: 5 # usually 2-3 mins + timeout-minutes: 7 # usually 5 mins with coverage steps: - uses: actions/checkout@v5 - uses: actions/setup-python@v6 @@ -88,7 +88,7 @@ jobs: python-version: [ "pypy-3.10", "pypy-3.11" ] name: Python ${{ matrix.python-version }}${{ matrix.install-extras && ' ' || '' }}${{ matrix.install-extras }} runs-on: ubuntu-24.04 - timeout-minutes: 10 + timeout-minutes: 5 steps: - uses: actions/checkout@v5 - uses: actions/setup-python@v6 diff --git a/pyproject.toml b/pyproject.toml index a007d88d3..1ff9ba684 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,6 +77,7 @@ test = [ "codecov", "coverage>=7.12.0", "freezegun", + "looptime>=0.7", "lxml", "pyngrok", "pytest>=9.0.0", @@ -116,7 +117,7 @@ ignore_missing_imports = true minversion = "9.0" asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" -addopts = ["--strict-markers"] +addopts = ["--strict-markers", "--looptime"] [tool.isort] line_length = 100 diff --git a/tests/apis/test_api_requests.py b/tests/apis/test_api_requests.py index d6b05d0a1..a303602fd 100644 --- a/tests/apis/test_api_requests.py +++ b/tests/apis/test_api_requests.py @@ -138,23 +138,23 @@ async def test_parsing_in_streams( (delete, 'delete'), ]) async def test_direct_timeout_in_requests( - resp_mocker, aresponses, hostname, fn, method, settings, logger, timer): + resp_mocker, aresponses, hostname, fn, method, settings, logger, looptime): async def serve_slowly(): - await asyncio.sleep(1.0) + await asyncio.sleep(10) return aiohttp.web.json_response({}) mock = resp_mocker(side_effect=serve_slowly) aresponses.add(hostname, '/url', method, mock) - with timer, pytest.raises(asyncio.TimeoutError): - timeout = aiohttp.ClientTimeout(total=0.1) + with pytest.raises(asyncio.TimeoutError): + timeout = aiohttp.ClientTimeout(total=1.23) # aiohttp raises an asyncio.TimeoutError which is automatically retried. # To reduce the test duration we disable retries for this test. settings.networking.error_backoffs = None await fn('/url', timeout=timeout, settings=settings, logger=logger) - assert 0.1 < timer.seconds < 0.2 + assert looptime == 1.23 # Let the server request finish and release all resources (tasks). # TODO: Remove when fixed: https://github.com/aio-libs/aiohttp/issues/7551 @@ -168,23 +168,23 @@ async def serve_slowly(): (delete, 'delete'), ]) async def test_settings_timeout_in_requests( - resp_mocker, aresponses, hostname, fn, method, settings, logger, timer): + resp_mocker, aresponses, hostname, fn, method, settings, logger, looptime): async def serve_slowly(): - await asyncio.sleep(1.0) + await asyncio.sleep(10) return aiohttp.web.json_response({}) mock = resp_mocker(side_effect=serve_slowly) aresponses.add(hostname, '/url', method, mock) - with timer, pytest.raises(asyncio.TimeoutError): - settings.networking.request_timeout = 0.1 + with pytest.raises(asyncio.TimeoutError): + settings.networking.request_timeout = 1.23 # aiohttp raises an asyncio.TimeoutError which is automatically retried. # To reduce the test duration we disable retries for this test. settings.networking.error_backoffs = None await fn('/url', settings=settings, logger=logger) - assert 0.1 < timer.seconds < 0.2 + assert looptime == 1.23 # Let the server request finish and release all resources (tasks). # TODO: Remove when fixed: https://github.com/aio-libs/aiohttp/issues/7551 @@ -193,24 +193,24 @@ async def serve_slowly(): @pytest.mark.parametrize('method', ['get']) # the only supported method at the moment async def test_direct_timeout_in_streams( - resp_mocker, aresponses, hostname, method, settings, logger, timer): + resp_mocker, aresponses, hostname, method, settings, logger, looptime): async def serve_slowly(): - await asyncio.sleep(1.0) + await asyncio.sleep(10) return "{}" mock = resp_mocker(side_effect=serve_slowly) aresponses.add(hostname, '/url', method, mock) - with timer, pytest.raises(asyncio.TimeoutError): - timeout = aiohttp.ClientTimeout(total=0.1) + with pytest.raises(asyncio.TimeoutError): + timeout = aiohttp.ClientTimeout(total=1.23) # aiohttp raises an asyncio.TimeoutError which is automatically retried. # To reduce the test duration we disable retries for this test. settings.networking.error_backoffs = None async for _ in stream('/url', timeout=timeout, settings=settings, logger=logger): pass - assert 0.1 < timer.seconds < 0.2 + assert looptime == 1.23 # Let the server request finish and release all resources (tasks). # TODO: Remove when fixed: https://github.com/aio-libs/aiohttp/issues/7551 @@ -219,46 +219,47 @@ async def serve_slowly(): @pytest.mark.parametrize('method', ['get']) # the only supported method at the moment async def test_settings_timeout_in_streams( - resp_mocker, aresponses, hostname, method, settings, logger, timer): + resp_mocker, aresponses, hostname, method, settings, logger, looptime): async def serve_slowly(): - await asyncio.sleep(1.0) + await asyncio.sleep(10) return "{}" mock = resp_mocker(side_effect=serve_slowly) aresponses.add(hostname, '/url', method, mock) - with timer, pytest.raises(asyncio.TimeoutError): - settings.networking.request_timeout = 0.1 + with pytest.raises(asyncio.TimeoutError): + settings.networking.request_timeout = 1.23 # aiohttp raises an asyncio.TimeoutError which is automatically retried. # To reduce the test duration we disable retries for this test. settings.networking.error_backoffs = None async for _ in stream('/url', settings=settings, logger=logger): pass - assert 0.1 < timer.seconds < 0.2 + assert looptime == 1.23 # Let the server request finish and release all resources (tasks). # TODO: Remove when fixed: https://github.com/aio-libs/aiohttp/issues/7551 await asyncio.sleep(1.0) -@pytest.mark.parametrize('delay, expected', [ - pytest.param(0.0, [], id='instant-none'), - pytest.param(0.1, [{'fake': 'result1'}], id='fast-single'), - pytest.param(9.9, [{'fake': 'result1'}, {'fake': 'result2'}], id='inf-double'), +@pytest.mark.parametrize('delay, expected_times, expected_items', [ + pytest.param(0, [], [], id='instant-none'), + pytest.param(2, [1], [{'fake': 'result1'}], id='fast-single'), + pytest.param(9, [1, 4], [{'fake': 'result1'}, {'fake': 'result2'}], id='inf-double'), ]) @pytest.mark.parametrize('method', ['get']) # the only supported method at the moment async def test_stopper_in_streams( - resp_mocker, aresponses, hostname, method, delay, expected, settings, logger): + resp_mocker, aresponses, hostname, method, delay, settings, logger, looptime, + expected_items, expected_times): async def stream_slowly(request: aiohttp.web.Request) -> aiohttp.web.StreamResponse: response = aiohttp.web.StreamResponse() await response.prepare(request) try: - await asyncio.sleep(0.05) + await asyncio.sleep(1) await response.write(b'{"fake": "result1"}\n') - await asyncio.sleep(0.15) + await asyncio.sleep(3) await response.write(b'{"fake": "result2"}\n') await response.write_eof() except ConnectionError: @@ -271,9 +272,13 @@ async def stream_slowly(request: aiohttp.web.Request) -> aiohttp.web.StreamRespo asyncio.get_running_loop().call_later(delay, stopper.set_result, None) items = [] + times = [] async for item in stream('/url', stopper=stopper, settings=settings, logger=logger): items.append(item) + times.append(float(looptime)) - assert items == expected + assert items == expected_items + assert times == expected_times - await asyncio.sleep(0.2) # give the response some time to be cancelled and its tasks closed + # Give the response some time to be cancelled and its tasks closed. That is aiohttp's issue. + await asyncio.sleep(30) diff --git a/tests/conftest.py b/tests/conftest.py index 91f2361de..3afc13a2e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,6 +12,7 @@ import aiohttp.web import pytest +import pytest_asyncio import kopf from kopf._cogs.clients.auth import APIContext @@ -207,7 +208,6 @@ class K8sMocks: patch: Mock delete: Mock stream: Mock - sleep: Mock @pytest.fixture() @@ -226,7 +226,6 @@ async def itr(*_, **__): patch=mocker.patch('kopf._cogs.clients.api.patch', return_value={}), delete=mocker.patch('kopf._cogs.clients.api.delete', return_value={}), stream=mocker.patch('kopf._cogs.clients.api.stream', side_effect=itr), - sleep=mocker.patch('kopf._cogs.aiokits.aiotime.sleep', return_value=None), ) @@ -566,69 +565,6 @@ def no_certvalidator(): yield from _with_module_absent('certvalidator') -# -# Helpers for the timing checks. -# - -@pytest.fixture() -def timer(): - return Timer() - - -class Timer: - """ - A helper context manager to measure the time of the code-blocks. - Also, supports direct comparison with time-deltas and the numbers of seconds. - - Usage: - - with Timer() as timer: - do_something() - print(f"Executing for {timer.seconds}s already.") - do_something_else() - - print(f"Executed in {timer.seconds}s.") - assert timer < 5.0 - """ - - def __init__(self): - super().__init__() - self._ts = None - self._te = None - - @property - def seconds(self): - if self._ts is None: - return None - elif self._te is None: - return time.perf_counter() - self._ts - else: - return self._te - self._ts - - def __repr__(self): - status = 'new' if self._ts is None else 'running' if self._te is None else 'finished' - return f'' - - def __enter__(self): - self._ts = time.perf_counter() - self._te = None - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._te = time.perf_counter() - - async def __aenter__(self): - return self.__enter__() - - async def __aexit__(self, exc_type, exc_val, exc_tb): - return self.__exit__(exc_type, exc_val, exc_tb) - - def __int__(self): - return int(self.seconds) - - def __float__(self): - return float(self.seconds) - # # Helpers for the logging checks. # @@ -708,13 +644,8 @@ def assert_logs_fn(patterns, prohibited=[], strict=False): # # Helpers for asyncio checks. # -@pytest.fixture() -async def loop(): - yield asyncio.get_running_loop() - - -@pytest.fixture(autouse=True) -def _no_asyncio_pending_tasks(loop: asyncio.AbstractEventLoop): +@pytest_asyncio.fixture(autouse=True) +def _no_asyncio_pending_tasks(request: pytest.FixtureRequest): """ Ensure there are no unattended asyncio tasks after the test. @@ -735,7 +666,28 @@ def _no_asyncio_pending_tasks(loop: asyncio.AbstractEventLoop): # Let the pytest-asyncio's async2sync wrapper to finish all callbacks. Otherwise, it raises: # ()>> - loop.run_until_complete(asyncio.sleep(0)) + # We don't know which loops were used in the test & fixtures, so we wait on all of them. + for fixture_name, fixture_value in request.node.funcargs.items(): + if isinstance(fixture_value, asyncio.BaseEventLoop): + fixture_value.run_until_complete(asyncio.sleep(0)) + + # Safe-guards for Python 3.10 until deprecated in ≈Oct'2026 (not needed for 3.11+). + try: + from asyncio import Runner as stdlib_Runner # python >= 3.11 (absent in 3.10) + except ImportError: + pass + else: + if isinstance(fixture_value, stdlib_Runner): + fixture_value.get_loop().run_until_complete(asyncio.sleep(0)) + + # In case pytest's asyncio libraries use the backported runners in Python 3.10. + try: + from backports.asyncio.runner import Runner as backported_Runner + except ImportError: + pass + else: + if isinstance(fixture_value, backported_Runner): + fixture_value.get_loop().run_until_complete(asyncio.sleep(0)) # Detect all leftover tasks. after = _get_all_tasks() @@ -760,3 +712,9 @@ def _get_all_tasks() -> set[asyncio.Task]: else: break return {t for t in tasks if not t.done()} + + +@pytest.fixture() +def loop(): + """Sync aiohttp's server-side timeline with kopf's client-side timeline.""" + return asyncio.get_running_loop() diff --git a/tests/handling/daemons/conftest.py b/tests/handling/daemons/conftest.py index a6f8e9220..94009d544 100644 --- a/tests/handling/daemons/conftest.py +++ b/tests/handling/daemons/conftest.py @@ -1,9 +1,8 @@ import asyncio import contextlib -import time -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock -import freezegun +import looptime import pytest import kopf @@ -20,23 +19,24 @@ class DaemonDummy: def __init__(self): super().__init__() self.mock = MagicMock() - self.kwargs = {} - self.steps = { - 'called': asyncio.Event(), - 'finish': asyncio.Event(), - 'error': asyncio.Event(), - } - - async def wait_for_daemon_done(self): - stopped = self.kwargs['stopped'] + + async def wait_for_daemon_done(self) -> None: + stopped = self.mock.call_args[1]['stopped'] await stopped.wait() - while not stopped.reason & stopped.reason.DONE: + while stopped.reason is None or not stopped.reason & stopped.reason.DONE: await asyncio.sleep(0) # give control back to asyncio event loop @pytest.fixture() -def dummy(): - return DaemonDummy() +async def dummy(simulate_cycle): + dummy = DaemonDummy() + yield dummy + + # Cancel the background tasks, if any. + event_object = {'metadata': {'deletionTimestamp': '...'}} + with looptime.enabled(strict=True): + await simulate_cycle(event_object) + await dummy.wait_for_daemon_done() @pytest.fixture() @@ -52,7 +52,11 @@ def _merge_dicts(src, dst): else: dst[key] = val - async def _simulate_cycle(event_object: RawBody): + async def _simulate_cycle( + event_object: RawBody, + *, + stream_pressure: asyncio.Event | None = None, + ) -> None: mocker.resetall() await process_resource_event( @@ -65,6 +69,7 @@ async def _simulate_cycle(event_object: RawBody): indexers=OperatorIndexers(), raw_event={'type': 'irrelevant', 'object': event_object}, event_queue=asyncio.Queue(), + stream_pressure=stream_pressure, ) # Do the same as k8s does: merge the patches into the object. @@ -96,33 +101,3 @@ async def background_daemon_killer(settings, memories, operator_paused): with contextlib.suppress(asyncio.CancelledError): task.cancel() await task - - -@pytest.fixture() -async def frozen_time(): - """ - A helper to simulate time movements to step over long sleeps/timeouts. - """ - with freezegun.freeze_time("2020-01-01 00:00:00") as frozen: - # Use freezegun-supported time instead of system clocks -- for testing purposes only. - # NB: Patch strictly after the time is frozen -- to use fake_time(), not real time(). - # NB: StdLib's event loops use time.monotonic(), but uvloop uses its own C-level time, - # so patch the loop object directly instead of its implied underlying implementation. - with patch.object(asyncio.get_running_loop(), 'time', time.time): - yield frozen - - -# The time-driven tests mock the sleeps, and shift the time as much as it was requested to sleep. -# This makes the sleep realistic for the app code, though executed instantly for the tests. -@pytest.fixture() -def manual_time(k8s_mocked, frozen_time): - async def sleep_substitute(delay, *_, **__): - if delay is None: - pass - elif isinstance(delay, float): - frozen_time.tick(delay) - else: - frozen_time.tick(min(delay)) - - k8s_mocked.sleep.side_effect = sleep_substitute - yield frozen_time diff --git a/tests/handling/daemons/test_daemon_errors.py b/tests/handling/daemons/test_daemon_errors.py index 1bdd7bf10..12ffb7167 100644 --- a/tests/handling/daemons/test_daemon_errors.py +++ b/tests/handling/daemons/test_daemon_errors.py @@ -1,3 +1,4 @@ +import asyncio import logging import kopf @@ -5,26 +6,21 @@ async def test_daemon_stopped_on_permanent_error( - settings, resource, dummy, manual_time, caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) - @kopf.daemon(*resource, id='fn', backoff=0.01) + @kopf.daemon(*resource, id='fn', backoff=1.23) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) raise PermanentError("boo!") finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) + await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. restart) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 123 assert dummy.mock.call_count == 1 - assert k8s_mocked.patch.call_count == 0 - assert k8s_mocked.sleep.call_count == 0 assert_logs([ "Daemon 'fn' failed permanently: boo!", @@ -35,25 +31,21 @@ async def fn(**kwargs): async def test_daemon_stopped_on_arbitrary_errors_with_mode_permanent( - settings, resource, dummy, manual_time, caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) - @kopf.daemon(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=0.01) + @kopf.daemon(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=1.23) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) raise Exception("boo!") finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) + await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. restart) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 123 assert dummy.mock.call_count == 1 - assert k8s_mocked.sleep.call_count == 0 assert_logs([ "Daemon 'fn' failed with an exception and will stop now: boo!", @@ -64,31 +56,25 @@ async def fn(**kwargs): async def test_daemon_retried_on_temporary_error( - registry, settings, resource, dummy, manual_time, - caplog, assert_logs, k8s_mocked, simulate_cycle): + registry, settings, resource, dummy, + caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + finished = asyncio.Event() - @kopf.daemon(*resource, id='fn', backoff=1.0) + @kopf.daemon(*resource, id='fn', backoff=1.23) async def fn(retry, **kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) if not retry: - raise TemporaryError("boo!", delay=1.0) + raise TemporaryError("boo!", delay=3.45) else: - dummy.steps['finish'].set() + finished.set() finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) + await finished.wait() - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count == 1 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] - + assert looptime == 3.45 assert_logs([ "Daemon 'fn' failed temporarily: boo!", "Daemon 'fn' succeeded.", @@ -97,70 +83,66 @@ async def fn(retry, **kwargs): async def test_daemon_retried_on_arbitrary_error_with_mode_temporary( - settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + finished = asyncio.Event() - @kopf.daemon(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.0) + @kopf.daemon(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.23) async def fn(retry, **kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) if not retry: raise Exception("boo!") else: - dummy.steps['finish'].set() + finished.set() finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) + await finished.wait() - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count == 1 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] - + assert looptime == 1.23 assert_logs([ - "Daemon 'fn' failed with an exception and will try again in 1.0 seconds: boo!", + "Daemon 'fn' failed with an exception and will try again in 1.23 seconds: boo!", "Daemon 'fn' succeeded.", "Daemon 'fn' has exited on its own", ]) async def test_daemon_retried_until_retries_limit( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.daemon(*resource, id='fn', retries=3) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() - raise TemporaryError("boo!", delay=1.0) + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + raise TemporaryError("boo!", delay=1.23) await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + async with trigger: + await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages)) - assert k8s_mocked.sleep.call_count == 2 # one between each retry (3 attempts - 2 sleeps) - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#] + assert looptime == 2.46 + assert dummy.mock.call_count == 3 async def test_daemon_retried_until_timeout( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() - @kopf.daemon(*resource, id='fn', timeout=3.0) + @kopf.daemon(*resource, id='fn', timeout=4) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() - raise TemporaryError("boo!", delay=1.0) + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + raise TemporaryError("boo!", delay=1.23) await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + async with trigger: + await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages)) - assert k8s_mocked.sleep.call_count == 2 # one between each retry (3 attempts - 2 sleeps) - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#] + assert looptime == 3.69 + assert dummy.mock.call_count == 4 diff --git a/tests/handling/daemons/test_daemon_filtration.py b/tests/handling/daemons/test_daemon_filtration.py index b7a48c73b..ea5bdf719 100644 --- a/tests/handling/daemons/test_daemon_filtration.py +++ b/tests/handling/daemons/test_daemon_filtration.py @@ -1,3 +1,4 @@ +import asyncio import logging import pytest @@ -11,22 +12,23 @@ async def test_daemon_filtration_satisfied( settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() @kopf.daemon(*resource, id='fn', labels={'a': 'value', 'b': kopf.PRESENT, 'c': kopf.ABSENT}, annotations={'x': 'value', 'y': kopf.PRESENT, 'z': kopf.ABSENT}) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() finalizer = settings.persistence.finalizer event_body = {'metadata': {'labels': {'a': 'value', 'b': '...'}, 'annotations': {'x': 'value', 'y': '...'}, 'finalizers': [finalizer]}} await simulate_cycle(event_body) + await executed.wait() - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + assert dummy.mock.call_count == 1 @pytest.mark.parametrize('labels, annotations', [ @@ -56,6 +58,7 @@ async def fn(**kwargs): 'annotations': annotations, 'finalizers': [finalizer]}} await simulate_cycle(event_body) + await asyncio.sleep(123) # give it enough time to do something when nothing is expected assert spawn_daemons.called assert spawn_daemons.call_args_list[0][1]['handlers'] == [] diff --git a/tests/handling/daemons/test_daemon_rematching.py b/tests/handling/daemons/test_daemon_rematching.py index 3190c032d..36bf39a96 100644 --- a/tests/handling/daemons/test_daemon_rematching.py +++ b/tests/handling/daemons/test_daemon_rematching.py @@ -1,3 +1,4 @@ +import asyncio import logging import kopf @@ -5,30 +6,29 @@ async def test_running_daemon_is_stopped_when_mismatches( - resource, dummy, timer, mocker, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, looptime, mocker, caplog, assert_logs, k8s_mocked, simulate_cycle): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() @kopf.daemon(*resource, id='fn', when=lambda **_: is_matching) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() await kwargs['stopped'].wait() # Ensure it is spawned while it is matching. (The same as the spawning tests.) mocker.resetall() is_matching = True await simulate_cycle({}) - await dummy.steps['called'].wait() + await executed.wait() assert dummy.mock.call_count == 1 # Ensure it is stopped once it stops matching. (The same as the termination tests.) mocker.resetall() is_matching = False await simulate_cycle({}) - with timer: - await dummy.wait_for_daemon_done() + await dummy.wait_for_daemon_done() - assert timer.seconds < 0.01 # near-instantly - stopped = dummy.kwargs['stopped'] + assert looptime == 0 + stopped = dummy.mock.call_args[1]['stopped'] assert DaemonStoppingReason.FILTERS_MISMATCH in stopped.reason diff --git a/tests/handling/daemons/test_daemon_spawning.py b/tests/handling/daemons/test_daemon_spawning.py index 46b7d9590..d341981ae 100644 --- a/tests/handling/daemons/test_daemon_spawning.py +++ b/tests/handling/daemons/test_daemon_spawning.py @@ -1,41 +1,37 @@ +import asyncio import logging import kopf async def test_daemon_is_spawned_at_least_once( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() @kopf.daemon(*resource, id='fn') async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() await simulate_cycle({}) + await executed.wait() - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 0 assert dummy.mock.call_count == 1 # not restarted async def test_daemon_initial_delay_obeyed( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() - @kopf.daemon(*resource, id='fn', initial_delay=1.0) + @kopf.daemon(*resource, id='fn', initial_delay=5.0) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() await simulate_cycle({}) + await executed.wait() - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count >= 1 - assert k8s_mocked.sleep.call_count <= 2 # one optional extra call for sleep(None) - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#] + assert looptime == 5.0 diff --git a/tests/handling/daemons/test_daemon_termination.py b/tests/handling/daemons/test_daemon_termination.py index 3ab2411f7..5da71e0c6 100644 --- a/tests/handling/daemons/test_daemon_termination.py +++ b/tests/handling/daemons/test_daemon_termination.py @@ -9,21 +9,24 @@ async def test_daemon_exits_gracefully_and_instantly_on_resource_deletion( settings, resource, dummy, simulate_cycle, - caplog, assert_logs, k8s_mocked, frozen_time, mocker, timer): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) + called = asyncio.Condition() # A daemon-under-test. @kopf.daemon(*resource, id='fn') async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + async with called: + called.notify_all() await kwargs['stopped'].wait() # 0th cycle: trigger spawning and wait until ready. Assume the finalizers are already added. finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) - await dummy.steps['called'].wait() + async with called: + await called.wait() # 1st stage: trigger termination due to resource deletion. mocker.resetall() @@ -32,43 +35,42 @@ async def fn(**kwargs): await simulate_cycle(event_object) # Check that the daemon has exited near-instantly, with no delays. - with timer: - await dummy.wait_for_daemon_done() + await dummy.wait_for_daemon_done() - assert timer.seconds < 0.01 # near-instantly - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 0 assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['metadata']['finalizers'] == [] async def test_daemon_exits_gracefully_and_instantly_on_operator_exiting( settings, resource, dummy, simulate_cycle, background_daemon_killer, - caplog, assert_logs, k8s_mocked, frozen_time, mocker, timer): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) + called = asyncio.Condition() # A daemon-under-test. @kopf.daemon(*resource, id='fn') async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + async with called: + called.notify_all() await kwargs['stopped'].wait() # 0th cycle: trigger spawning and wait until ready. Assume the finalizers are already added. finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) - await dummy.steps['called'].wait() + async with called: + await called.wait() # 1st stage: trigger termination due to operator exiting. mocker.resetall() background_daemon_killer.cancel() # Check that the daemon has exited near-instantly, with no delays. - with timer: - await dummy.wait_for_daemon_done() + await dummy.wait_for_daemon_done() - assert timer.seconds < 0.01 # near-instantly - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 0 assert k8s_mocked.patch.call_count == 0 # To prevent double-cancelling of the scheduler's system tasks in the fixture, let them finish: @@ -79,30 +81,32 @@ async def fn(**kwargs): @pytest.mark.usefixtures('background_daemon_killer') async def test_daemon_exits_gracefully_and_instantly_on_operator_pausing( settings, memories, resource, dummy, simulate_cycle, conflicts_found, - caplog, assert_logs, k8s_mocked, frozen_time, mocker, timer): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) + called = asyncio.Condition() # A daemon-under-test. @kopf.daemon(*resource, id='fn') async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + async with called: + called.notify_all() await kwargs['stopped'].wait() # 0th cycle: trigger spawning and wait until ready. Assume the finalizers are already added. finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) - await dummy.steps['called'].wait() + async with called: + await called.wait() # 1st stage: trigger termination due to the operator's pause. mocker.resetall() await conflicts_found.turn_to(True) # Check that the daemon has exited near-instantly, with no delays. - with timer: - await dummy.wait_for_daemon_done() - assert timer.seconds < 0.01 # near-instantly + await dummy.wait_for_daemon_done() + assert looptime == 0 # There is no way to test for re-spawning here: it is done by watch-events, # which are tested by the paused operators elsewhere (test_daemon_spawning.py). @@ -111,27 +115,26 @@ async def fn(**kwargs): assert not memory.daemons_memory.forever_stopped -async def test_daemon_exits_instantly_via_cancellation_with_backoff( +async def test_daemon_exits_instantly_on_cancellation_with_backoff( settings, resource, dummy, simulate_cycle, - caplog, assert_logs, k8s_mocked, frozen_time, mocker): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) - dummy.steps['finish'].set() + called = asyncio.Condition() # A daemon-under-test. - @kopf.daemon(*resource, id='fn', cancellation_backoff=5, cancellation_timeout=10) + @kopf.daemon(*resource, id='fn', cancellation_backoff=1.23, cancellation_timeout=10) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() - try: - await asyncio.Event().wait() # this one is cancelled. - except asyncio.CancelledError: - await dummy.steps['finish'].wait() # simulated slow (non-instant) exiting. + dummy.mock(**kwargs) + async with called: + called.notify_all() + await asyncio.Event().wait() # this one is cancelled. # Trigger spawning and wait until ready. Assume the finalizers are already added. finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) - await dummy.steps['called'].wait() + async with called: + await called.wait() # 1st stage: trigger termination due to resource deletion. Wait for backoff. mocker.resetall() @@ -139,17 +142,15 @@ async def fn(**kwargs): event_object['metadata'] |= {'deletionTimestamp': '...'} await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 5.0 + assert looptime == 1.23 # i.e. the slept through the whole backoff time assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['status']['kopf']['dummy'] # 2nd cycle: cancelling after the backoff is reached. Wait for cancellation timeout. mocker.resetall() - frozen_time.tick(5) # backoff time or slightly above it await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 1.23 # i.e. no additional sleeps happened assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['metadata']['finalizers'] == [] @@ -157,26 +158,31 @@ async def fn(**kwargs): await dummy.wait_for_daemon_done() -async def test_daemon_exits_slowly_via_cancellation_with_backoff( +async def test_daemon_exits_slowly_on_cancellation_with_backoff( settings, resource, dummy, simulate_cycle, - caplog, assert_logs, k8s_mocked, frozen_time, mocker): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) + called = asyncio.Condition() + finish = asyncio.Condition() # A daemon-under-test. - @kopf.daemon(*resource, id='fn', cancellation_backoff=5, cancellation_timeout=10) + @kopf.daemon(*resource, id='fn', cancellation_backoff=1.23, cancellation_timeout=4.56) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + async with called: + called.notify_all() try: await asyncio.Event().wait() # this one is cancelled. except asyncio.CancelledError: - await dummy.steps['finish'].wait() # simulated slow (non-instant) exiting. + async with finish: + await finish.wait() # simulated slow (non-instant) exiting. # Trigger spawning and wait until ready. Assume the finalizers are already added. finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) - await dummy.steps['called'].wait() + async with called: + await called.wait() # 1st stage: trigger termination due to resource deletion. Wait for backoff. mocker.resetall() @@ -184,54 +190,57 @@ async def fn(**kwargs): event_object['metadata'] |= {'deletionTimestamp': '...'} await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 5.0 + assert looptime == 1.23 assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['status']['kopf']['dummy'] # 2nd cycle: cancelling after the backoff is reached. Wait for cancellation timeout. mocker.resetall() - frozen_time.tick(5) # backoff time or slightly above it await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 10.0 + assert looptime == 1.23 + 4.56 # i.e. it really spent all the timeout assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['status']['kopf']['dummy'] # 3rd cycle: the daemon has exited, the resource should be unblocked from actual deletion. mocker.resetall() - frozen_time.tick(1) # any time below timeout - dummy.steps['finish'].set() - await asyncio.sleep(0) + async with finish: + finish.notify_all() + await asyncio.sleep(0) # let the daemon to exit and all the routines to trigger await simulate_cycle(event_object) await dummy.wait_for_daemon_done() - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 1.23 + 4.56 # i.e. not additional sleeps happened assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['metadata']['finalizers'] == [] async def test_daemon_is_abandoned_due_to_cancellation_timeout_reached( settings, resource, dummy, simulate_cycle, - caplog, assert_logs, k8s_mocked, frozen_time, mocker): + looptime, caplog, assert_logs, k8s_mocked, mocker): caplog.set_level(logging.DEBUG) + called = asyncio.Condition() + finish = asyncio.Condition() # A daemon-under-test. - @kopf.daemon(*resource, id='fn', cancellation_timeout=10) + @kopf.daemon(*resource, id='fn', cancellation_timeout=4.56) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + async with called: + called.notify_all() try: - await dummy.steps['finish'].wait() # this one is cancelled. + async with finish: + await finish.wait() # this one is cancelled. except asyncio.CancelledError: - await dummy.steps['finish'].wait() # simulated disobedience to be cancelled. + async with finish: + await finish.wait() # simulated disobedience to be cancelled. # 0th cycle:tTrigger spawning and wait until ready. Assume the finalizers are already added. finalizer = settings.persistence.finalizer event_object = {'metadata': {'finalizers': [finalizer]}} await simulate_cycle(event_object) - await dummy.steps['called'].wait() + async with called: + await called.wait() # 1st stage: trigger termination due to resource deletion. Wait for backoff. mocker.resetall() @@ -239,22 +248,22 @@ async def fn(**kwargs): event_object['metadata'] |= {'deletionTimestamp': '...'} await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 10.0 + assert looptime == 4.56 assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['status']['kopf']['dummy'] # 2rd cycle: the daemon has exited, the resource should be unblocked from actual deletion. mocker.resetall() - frozen_time.tick(50) + await asyncio.sleep(1000) # unnecessary, but let's fast-forward time just in case with pytest.warns(ResourceWarning, match=r"Daemon .+ did not exit in time"): await simulate_cycle(event_object) - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 1000 + 4.56 assert k8s_mocked.patch.call_count == 1 assert k8s_mocked.patch.call_args_list[0][1]['payload']['metadata']['finalizers'] == [] assert_logs(["Daemon 'fn' did not exit in time. Leaving it orphaned."]) # Cleanup. - dummy.steps['finish'].set() + async with finish: + finish.notify_all() await dummy.wait_for_daemon_done() diff --git a/tests/handling/daemons/test_timer_errors.py b/tests/handling/daemons/test_timer_errors.py index 934ff4fe9..834825541 100644 --- a/tests/handling/daemons/test_timer_errors.py +++ b/tests/handling/daemons/test_timer_errors.py @@ -1,3 +1,4 @@ +import asyncio import logging import kopf @@ -5,26 +6,20 @@ async def test_timer_stopped_on_permanent_error( - settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) - @kopf.timer(*resource, id='fn', backoff=0.01, interval=1.0) + @kopf.timer(*resource, id='fn', backoff=1.23, interval=999) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) raise PermanentError("boo!") event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_object) + await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. retry) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 123 # no intervals used, as there were no retries assert dummy.mock.call_count == 1 - assert k8s_mocked.sleep.call_count == 1 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 assert_logs([ "Timer 'fn' failed permanently: boo!", @@ -34,26 +29,20 @@ async def fn(**kwargs): async def test_timer_stopped_on_arbitrary_errors_with_mode_permanent( - settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) - @kopf.timer(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=0.01, interval=1.0) + @kopf.timer(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=1.23, interval=999) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) raise Exception("boo!") event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_object) + await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. retry) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() - + assert looptime == 123 # no intervals used, as there were no retries assert dummy.mock.call_count == 1 - assert k8s_mocked.sleep.call_count == 1 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 assert_logs([ "Timer 'fn' failed with an exception and will stop now: boo!", @@ -63,32 +52,23 @@ async def fn(**kwargs): async def test_timer_retried_on_temporary_error( - settings, resource, dummy, manual_time, - caplog, assert_logs, k8s_mocked, simulate_cycle): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + finished = asyncio.Event() - @kopf.timer(*resource, id='fn', backoff=1.0, interval=1.0) + @kopf.timer(*resource, id='fn', backoff=1.23, interval=2.34) async def fn(retry, **kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) if not retry: - raise TemporaryError("boo!", delay=1.0) + raise TemporaryError("boo!", delay=3.45) else: - kwargs['stopped']._setter.set() # to exit the cycle - dummy.steps['finish'].set() + finished.set() event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_object) + await finished.wait() - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count == 2 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # interval - + assert looptime == 3.45 assert_logs([ "Timer 'fn' failed temporarily: boo!", "Timer 'fn' succeeded.", @@ -96,78 +76,67 @@ async def fn(retry, **kwargs): async def test_timer_retried_on_arbitrary_error_with_mode_temporary( - settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + finished = asyncio.Event() - @kopf.timer(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.0, interval=1.0) + @kopf.timer(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.23, interval=2.34) async def fn(retry, **kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) if not retry: raise Exception("boo!") else: - kwargs['stopped']._setter.set() # to exit the cycle - dummy.steps['finish'].set() + finished.set() event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_object) + await finished.wait() - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() - - assert k8s_mocked.sleep.call_count == 2 # one for each retry - assert k8s_mocked.sleep.call_args_list[0][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # interval - + assert looptime == 1.23 assert_logs([ - "Timer 'fn' failed with an exception and will try again in 1.0 seconds: boo!", + "Timer 'fn' failed with an exception and will try again in 1.23 seconds: boo!", "Timer 'fn' succeeded.", ]) async def test_timer_retried_until_retries_limit( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() - @kopf.timer(*resource, id='fn', retries=3, interval=1.0) + @kopf.timer(*resource, id='fn', retries=3, interval=2.34) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - if dummy.mock.call_count >= 5: - kwargs['stopped']._setter.set() # to exit the cycle - raise TemporaryError("boo!", delay=1.0) + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + raise TemporaryError("boo!", delay=3.45) - await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} + await simulate_cycle(event_object) + async with trigger: + await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages)) - assert k8s_mocked.sleep.call_count >= 3 # one between each retry (3 attempts - 2 sleeps) - assert k8s_mocked.sleep.call_args_list[0][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[1][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[2][0][0] == 1.0 # interval + assert looptime == 6.9 # 2*3.45 -- 2 sleeps between 3 attempts + assert dummy.mock.call_count == 3 async def test_timer_retried_until_timeout( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time): + settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() - @kopf.timer(*resource, id='fn', timeout=3.0, interval=1.0) + @kopf.timer(*resource, id='fn', timeout=10.0, interval=1.23) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - if dummy.mock.call_count >= 5: - kwargs['stopped']._setter.set() # to exit the cycle - raise TemporaryError("boo!", delay=1.0) + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + raise TemporaryError("boo!", delay=3.45) await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + event_object = {'metadata': {'finalizers': [settings.persistence.finalizer]}} + await simulate_cycle(event_object) + async with trigger: + await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages)) - assert k8s_mocked.sleep.call_count >= 3 # one between each retry (3 attempts - 2 sleeps) - assert k8s_mocked.sleep.call_args_list[0][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[1][0][0] == [1.0] # delays - assert k8s_mocked.sleep.call_args_list[2][0][0] == 1.0 # interval + assert looptime == 6.9 # 2*3.45 -- 2 sleeps between 3 attempts + assert dummy.mock.call_count == 3 diff --git a/tests/handling/daemons/test_timer_filtration.py b/tests/handling/daemons/test_timer_filtration.py index c2979188c..84b09ca89 100644 --- a/tests/handling/daemons/test_timer_filtration.py +++ b/tests/handling/daemons/test_timer_filtration.py @@ -1,3 +1,4 @@ +import asyncio import logging import pytest @@ -11,21 +12,22 @@ async def test_timer_filtration_satisfied( settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): caplog.set_level(logging.DEBUG) + executed = asyncio.Event() @kopf.timer(*resource, id='fn', labels={'a': 'value', 'b': kopf.PRESENT, 'c': kopf.ABSENT}, annotations={'x': 'value', 'y': kopf.PRESENT, 'z': kopf.ABSENT}) async def fn(**kwargs): - dummy.kwargs = kwargs - dummy.steps['called'].set() + dummy.mock(**kwargs) + executed.set() event_body = {'metadata': {'labels': {'a': 'value', 'b': '...'}, 'annotations': {'x': 'value', 'y': '...'}, 'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_body) + await executed.wait() - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + assert dummy.mock.call_count == 1 @pytest.mark.parametrize('labels, annotations', [ @@ -54,6 +56,7 @@ async def fn(**kwargs): 'annotations': annotations, 'finalizers': [settings.persistence.finalizer]}} await simulate_cycle(event_body) + await asyncio.sleep(123) # give it enough time to do something when nothing is expected assert spawn_daemons.called assert spawn_daemons.call_args_list[0][1]['handlers'] == [] diff --git a/tests/handling/daemons/test_timer_intervals.py b/tests/handling/daemons/test_timer_intervals.py index 637234c58..89798a97e 100644 --- a/tests/handling/daemons/test_timer_intervals.py +++ b/tests/handling/daemons/test_timer_intervals.py @@ -1,54 +1,45 @@ +import asyncio import logging import kopf -# TODO: tests for idle= (more complicated) - async def test_timer_regular_interval( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, frozen_time): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.timer(*resource, id='fn', interval=1.0, sharp=False) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - frozen_time.tick(0.3) - if dummy.mock.call_count >= 2: - dummy.steps['finish'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + await asyncio.sleep(0.3) # simulate a slow operation await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.wait_for_daemon_done() + async with trigger: + await trigger.wait() + await trigger.wait() assert dummy.mock.call_count == 2 - assert k8s_mocked.sleep.call_count == 2 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 + assert looptime == 1.3 async def test_timer_sharp_interval( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, frozen_time): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.timer(*resource, id='fn', interval=1.0, sharp=True) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - frozen_time.tick(0.3) - if dummy.mock.call_count >= 2: - dummy.steps['finish'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() + await asyncio.sleep(0.3) # simulate a slow operation await simulate_cycle({}) - await dummy.steps['called'].wait() - await dummy.steps['finish'].wait() - await dummy.wait_for_daemon_done() + async with trigger: + await trigger.wait_for(lambda: dummy.mock.call_count >= 2) assert dummy.mock.call_count == 2 - assert k8s_mocked.sleep.call_count == 2 - assert 0.7 <= k8s_mocked.sleep.call_args_list[0][0][0] < 0.71 - assert 0.7 <= k8s_mocked.sleep.call_args_list[1][0][0] < 0.71 + assert looptime == 1 # not 1.3! diff --git a/tests/handling/daemons/test_timer_triggering.py b/tests/handling/daemons/test_timer_triggering.py index ed074979c..c203c4af7 100644 --- a/tests/handling/daemons/test_timer_triggering.py +++ b/tests/handling/daemons/test_timer_triggering.py @@ -1,48 +1,44 @@ +import asyncio import logging import kopf async def test_timer_is_spawned_at_least_once( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.timer(*resource, id='fn', interval=1.0) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() await simulate_cycle({}) - await dummy.steps['called'].wait() + async with trigger: + await trigger.wait() + await trigger.wait() - assert dummy.mock.call_count == 1 - assert dummy.kwargs['retry'] == 0 - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 - - await dummy.wait_for_daemon_done() + assert looptime == 1 + assert dummy.mock.call_count == 2 async def test_timer_initial_delay_obeyed( - resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle): + resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime): caplog.set_level(logging.DEBUG) + trigger = asyncio.Condition() @kopf.timer(*resource, id='fn', initial_delay=5.0, interval=1.0) async def fn(**kwargs): - dummy.mock() - dummy.kwargs = kwargs - dummy.steps['called'].set() - kwargs['stopped']._setter.set() # to exit the cycle + dummy.mock(**kwargs) + async with trigger: + trigger.notify_all() await simulate_cycle({}) - await dummy.steps['called'].wait() - - assert dummy.mock.call_count == 1 - assert dummy.kwargs['retry'] == 0 - assert k8s_mocked.sleep.call_count == 2 - assert k8s_mocked.sleep.call_args_list[0][0][0] == 5.0 - assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 + async with trigger: + await trigger.wait() + await trigger.wait() - await dummy.wait_for_daemon_done() + assert looptime == 6 + assert dummy.mock.call_count == 2 diff --git a/tests/handling/indexing/test_blocking_until_indexed.py b/tests/handling/indexing/test_blocking_until_indexed.py index 8d8c58a3c..f15fb5ec6 100644 --- a/tests/handling/indexing/test_blocking_until_indexed.py +++ b/tests/handling/indexing/test_blocking_until_indexed.py @@ -16,26 +16,25 @@ @pytest.mark.parametrize('event_type', EVENT_TYPES_WHEN_EXISTS) async def test_reporting_on_resource_readiness( - resource, settings, registry, indexers, caplog, event_type, handlers, timer): + resource, settings, registry, indexers, caplog, event_type, handlers, looptime): caplog.set_level(logging.DEBUG) operator_indexed = ToggleSet(all) resource_indexed = await operator_indexed.make_toggle() - with timer: - await process_resource_event( - lifecycle=all_at_once, - registry=registry, - settings=settings, - resource=resource, - indexers=indexers, - memories=ResourceMemories(), - memobase=Memo(), - raw_event={'type': event_type, 'object': {}}, - event_queue=asyncio.Queue(), - operator_indexed=operator_indexed, - resource_indexed=resource_indexed, - ) - assert timer.seconds < 0.2 # asap, nowait + await process_resource_event( + lifecycle=all_at_once, + registry=registry, + settings=settings, + resource=resource, + indexers=indexers, + memories=ResourceMemories(), + memobase=Memo(), + raw_event={'type': event_type, 'object': {}}, + event_queue=asyncio.Queue(), + operator_indexed=operator_indexed, + resource_indexed=resource_indexed, + ) + assert looptime == 0 assert operator_indexed.is_on() assert set(operator_indexed) == set() # save RAM assert handlers.event_mock.called @@ -43,13 +42,13 @@ async def test_reporting_on_resource_readiness( @pytest.mark.parametrize('event_type', EVENT_TYPES_WHEN_EXISTS) async def test_blocking_when_operator_is_not_ready( - resource, settings, registry, indexers, caplog, event_type, handlers, timer): + resource, settings, registry, indexers, caplog, event_type, handlers, looptime): caplog.set_level(logging.DEBUG) operator_indexed = ToggleSet(all) resource_listed = await operator_indexed.make_toggle() resource_indexed = await operator_indexed.make_toggle() - with pytest.raises(asyncio.TimeoutError), timer: + with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(process_resource_event( lifecycle=all_at_once, registry=registry, @@ -62,8 +61,8 @@ async def test_blocking_when_operator_is_not_ready( event_queue=asyncio.Queue(), operator_indexed=operator_indexed, resource_indexed=resource_indexed, - ), timeout=0.2) - assert 0.2 < timer.seconds < 0.4 + ), timeout=1.23) + assert looptime == 1.23 assert operator_indexed.is_off() assert set(operator_indexed) == {resource_listed} assert not handlers.event_mock.called @@ -71,7 +70,7 @@ async def test_blocking_when_operator_is_not_ready( @pytest.mark.parametrize('event_type', EVENT_TYPES_WHEN_EXISTS) async def test_unblocking_once_operator_is_ready( - resource, settings, registry, indexers, caplog, event_type, handlers, timer): + resource, settings, registry, indexers, caplog, event_type, handlers, looptime): caplog.set_level(logging.DEBUG) async def delayed_readiness(delay: float): @@ -81,22 +80,21 @@ async def delayed_readiness(delay: float): operator_indexed = ToggleSet(all) resource_listed = await operator_indexed.make_toggle() resource_indexed = await operator_indexed.make_toggle() - with timer: - asyncio.create_task(delayed_readiness(0.2)) - await process_resource_event( - lifecycle=all_at_once, - registry=registry, - settings=settings, - resource=resource, - indexers=indexers, - memories=ResourceMemories(), - memobase=Memo(), - raw_event={'type': event_type, 'object': {}}, - event_queue=asyncio.Queue(), - operator_indexed=operator_indexed, - resource_indexed=resource_indexed, - ) - assert 0.2 < timer.seconds < 0.4 + asyncio.create_task(delayed_readiness(1.23)) + await process_resource_event( + lifecycle=all_at_once, + registry=registry, + settings=settings, + resource=resource, + indexers=indexers, + memories=ResourceMemories(), + memobase=Memo(), + raw_event={'type': event_type, 'object': {}}, + event_queue=asyncio.Queue(), + operator_indexed=operator_indexed, + resource_indexed=resource_indexed, + ) + assert looptime == 1.23 assert operator_indexed.is_on() assert set(operator_indexed) == {resource_listed} assert handlers.event_mock.called diff --git a/tests/handling/subhandling/test_subhandling.py b/tests/handling/subhandling/test_subhandling.py index b83604fbe..414bbd45c 100644 --- a/tests/handling/subhandling/test_subhandling.py +++ b/tests/handling/subhandling/test_subhandling.py @@ -16,7 +16,7 @@ @pytest.mark.parametrize('event_type', EVENT_TYPES_WHEN_EXISTS) async def test_1st_level(registry, settings, resource, cause_mock, event_type, - caplog, assert_logs, k8s_mocked): + caplog, assert_logs, k8s_mocked, looptime): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.CREATE @@ -57,7 +57,7 @@ async def sub1b(**_): assert sub1a_mock.call_count == 1 assert sub1b_mock.call_count == 1 - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 0 assert k8s_mocked.patch.call_count == 1 assert not event_queue.empty() @@ -76,7 +76,7 @@ async def sub1b(**_): @pytest.mark.parametrize('event_type', EVENT_TYPES_WHEN_EXISTS) async def test_2nd_level(registry, settings, resource, cause_mock, event_type, - caplog, assert_logs, k8s_mocked): + caplog, assert_logs, k8s_mocked, looptime): caplog.set_level(logging.DEBUG) cause_mock.reason = Reason.CREATE @@ -137,7 +137,7 @@ def sub1b2b(**kwargs): assert sub1b2a_mock.call_count == 1 assert sub1b2b_mock.call_count == 1 - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 0 assert k8s_mocked.patch.call_count == 1 assert not event_queue.empty() diff --git a/tests/handling/test_activity_triggering.py b/tests/handling/test_activity_triggering.py index f7033610c..dfb092445 100644 --- a/tests/handling/test_activity_triggering.py +++ b/tests/handling/test_activity_triggering.py @@ -154,7 +154,7 @@ def sample_fn(**_): @pytest.mark.parametrize('activity', list(Activity)) -async def test_delays_are_simulated(settings, activity, mocker): +async def test_delays_are_simulated(settings, activity, looptime): def sample_fn(**_): raise TemporaryError('to be retried', delay=123) @@ -165,24 +165,14 @@ def sample_fn(**_): param=None, errors=None, timeout=None, retries=3, backoff=None, )) - with freezegun.freeze_time() as frozen: - - async def sleep_substitute(*_, **__): - frozen.tick(123) - - sleep = mocker.patch('kopf._cogs.aiokits.aiotime.sleep', wraps=sleep_substitute) - - with pytest.raises(ActivityError) as e: - await run_activity( - registry=registry, - settings=settings, - activity=activity, - lifecycle=all_at_once, - indices=OperatorIndexers().indices, - memo=Memo(), - ) + with pytest.raises(ActivityError) as e: + await run_activity( + registry=registry, + settings=settings, + activity=activity, + lifecycle=all_at_once, + indices=OperatorIndexers().indices, + memo=Memo(), + ) - assert sleep.call_count >= 3 # 3 retries, 1 sleep each - assert sleep.call_count <= 4 # 3 retries, 1 final success (delay=None), not more - if sleep.call_count > 3: - sleep.call_args_list[-1][0][0] is None + assert looptime == 123 * 2 diff --git a/tests/handling/test_cause_handling.py b/tests/handling/test_cause_handling.py index 5ccf31150..4e275d2e6 100644 --- a/tests/handling/test_cause_handling.py +++ b/tests/handling/test_cause_handling.py @@ -39,7 +39,6 @@ async def test_create(registry, settings, handlers, resource, cause_mock, event_ assert not handlers.update_mock.called assert not handlers.delete_mock.called - assert k8s_mocked.sleep.call_count == 0 assert k8s_mocked.patch.call_count == 1 assert not event_queue.empty() @@ -80,7 +79,6 @@ async def test_update(registry, settings, handlers, resource, cause_mock, event_ assert handlers.update_mock.call_count == 1 assert not handlers.delete_mock.called - assert k8s_mocked.sleep.call_count == 0 assert k8s_mocked.patch.call_count == 1 assert not event_queue.empty() @@ -123,7 +121,6 @@ async def test_delete(registry, settings, handlers, resource, cause_mock, event_ assert not handlers.update_mock.called assert handlers.delete_mock.call_count == 1 - assert k8s_mocked.sleep.call_count == 0 assert k8s_mocked.patch.call_count == 1 assert not event_queue.empty() @@ -194,8 +191,6 @@ async def test_free(registry, settings, handlers, resource, cause_mock, event_ty assert not handlers.create_mock.called assert not handlers.update_mock.called assert not handlers.delete_mock.called - - assert not k8s_mocked.sleep.called assert not k8s_mocked.patch.called assert event_queue.empty() @@ -226,8 +221,6 @@ async def test_noop(registry, settings, handlers, resource, cause_mock, event_ty assert not handlers.create_mock.called assert not handlers.update_mock.called assert not handlers.delete_mock.called - - assert not k8s_mocked.sleep.called assert not k8s_mocked.patch.called assert event_queue.empty() diff --git a/tests/handling/test_delays.py b/tests/handling/test_delays.py index 91848f337..0d5347289 100644 --- a/tests/handling/test_delays.py +++ b/tests/handling/test_delays.py @@ -52,7 +52,6 @@ async def test_delayed_handlers_progress( assert handlers.delete_mock.call_count == (1 if cause_reason == Reason.DELETE else 0) assert handlers.resume_mock.call_count == (1 if cause_reason == Reason.RESUME else 0) - assert not k8s_mocked.sleep.called assert k8s_mocked.patch.called fname = f'{cause_reason}_fn' @@ -72,7 +71,7 @@ async def test_delayed_handlers_progress( ], ids=['fast', 'slow']) async def test_delayed_handlers_sleep( registry, settings, handlers, resource, cause_mock, cause_reason, - caplog, assert_logs, k8s_mocked, now, delayed_iso, delay): + caplog, assert_logs, k8s_mocked, now, delayed_iso, delay, looptime): caplog.set_level(logging.DEBUG) # Any "future" time works and affects nothing as long as it is the same @@ -117,8 +116,7 @@ async def test_delayed_handlers_sleep( assert 'dummy' in k8s_mocked.patch.call_args_list[-1][1]['payload']['status']['kopf'] # The duration of sleep should be as expected. - assert k8s_mocked.sleep.called - assert k8s_mocked.sleep.call_args_list[0][0][0] == delay + assert looptime == delay assert_logs([ r"Sleeping for ([\d\.]+|[\d\.]+ \(capped [\d\.]+\)) seconds", diff --git a/tests/handling/test_error_handling.py b/tests/handling/test_error_handling.py index c987d18f3..4383d7dd2 100644 --- a/tests/handling/test_error_handling.py +++ b/tests/handling/test_error_handling.py @@ -16,7 +16,7 @@ @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_fatal_error_stops_handler( registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, - caplog, assert_logs, k8s_mocked): + caplog, assert_logs, k8s_mocked, looptime): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -44,7 +44,7 @@ async def test_fatal_error_stops_handler( assert handlers.delete_mock.call_count == (1 if cause_type == Reason.DELETE else 0) assert handlers.resume_mock.call_count == (1 if cause_type == Reason.RESUME else 0) - assert not k8s_mocked.sleep.called + assert looptime == 0 assert k8s_mocked.patch.called patch = k8s_mocked.patch.call_args_list[0][1]['payload'] @@ -61,7 +61,7 @@ async def test_fatal_error_stops_handler( @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_retry_error_delays_handler( registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, - caplog, assert_logs, k8s_mocked): + caplog, assert_logs, k8s_mocked, looptime): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -89,7 +89,7 @@ async def test_retry_error_delays_handler( assert handlers.delete_mock.call_count == (1 if cause_type == Reason.DELETE else 0) assert handlers.resume_mock.call_count == (1 if cause_type == Reason.RESUME else 0) - assert not k8s_mocked.sleep.called + assert looptime == 0 assert k8s_mocked.patch.called patch = k8s_mocked.patch.call_args_list[0][1]['payload'] @@ -107,7 +107,7 @@ async def test_retry_error_delays_handler( @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_arbitrary_error_delays_handler( registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, - caplog, assert_logs, k8s_mocked): + caplog, assert_logs, k8s_mocked, looptime): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -135,7 +135,7 @@ async def test_arbitrary_error_delays_handler( assert handlers.delete_mock.call_count == (1 if cause_type == Reason.DELETE else 0) assert handlers.resume_mock.call_count == (1 if cause_type == Reason.RESUME else 0) - assert not k8s_mocked.sleep.called + assert looptime == 0 assert k8s_mocked.patch.called patch = k8s_mocked.patch.call_args_list[0][1]['payload'] diff --git a/tests/handling/test_multistep.py b/tests/handling/test_multistep.py index 5a146666e..5f0bf45ee 100644 --- a/tests/handling/test_multistep.py +++ b/tests/handling/test_multistep.py @@ -18,7 +18,7 @@ @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_1st_step_stores_progress_by_patching( registry, settings, handlers, extrahandlers, - resource, cause_mock, cause_type, k8s_mocked, deletion_ts): + resource, cause_mock, cause_type, k8s_mocked, looptime, deletion_ts): name1 = f'{cause_type}_fn' name2 = f'{cause_type}_fn2' @@ -46,7 +46,7 @@ async def test_1st_step_stores_progress_by_patching( assert handlers.delete_mock.call_count == (1 if cause_type == Reason.DELETE else 0) assert handlers.resume_mock.call_count == (1 if cause_type == Reason.RESUME else 0) - assert not k8s_mocked.sleep.called + assert looptime == 0 assert k8s_mocked.patch.called patch = k8s_mocked.patch.call_args_list[0][1]['payload'] @@ -74,7 +74,7 @@ async def test_1st_step_stores_progress_by_patching( @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_2nd_step_finishes_the_handlers(caplog, registry, settings, handlers, extrahandlers, - resource, cause_mock, cause_type, k8s_mocked, deletion_ts): + resource, cause_mock, cause_type, k8s_mocked, looptime, deletion_ts): name1 = f'{cause_type}_fn' name2 = f'{cause_type}_fn2' @@ -106,7 +106,7 @@ async def test_2nd_step_finishes_the_handlers(caplog, assert extrahandlers.delete_mock.call_count == (1 if cause_type == Reason.DELETE else 0) assert extrahandlers.resume_mock.call_count == (1 if cause_type == Reason.RESUME else 0) - assert not k8s_mocked.sleep.called + assert looptime == 0 assert k8s_mocked.patch.called patch = k8s_mocked.patch.call_args_list[0][1]['payload'] diff --git a/tests/handling/test_no_handlers.py b/tests/handling/test_no_handlers.py index c62d9dadf..50c12f31c 100644 --- a/tests/handling/test_no_handlers.py +++ b/tests/handling/test_no_handlers.py @@ -46,7 +46,6 @@ async def test_skipped_with_no_handlers( event_queue=asyncio.Queue(), ) - assert not k8s_mocked.sleep.called assert k8s_mocked.patch.called # The patch must contain ONLY the last-seen update, and nothing else. @@ -102,6 +101,5 @@ async def test_stealth_mode_with_mismatching_handlers( event_queue=asyncio.Queue(), ) - assert not k8s_mocked.sleep.called assert not k8s_mocked.patch.called assert not caplog.messages # total stealth mode! diff --git a/tests/handling/test_retrying_limits.py b/tests/handling/test_retrying_limits.py index bff463e27..b2185b565 100644 --- a/tests/handling/test_retrying_limits.py +++ b/tests/handling/test_retrying_limits.py @@ -20,7 +20,7 @@ ], ids=['slow']) async def test_timed_out_handler_fails( registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, - caplog, assert_logs, k8s_mocked, now, ts): + caplog, assert_logs, k8s_mocked, looptime, now, ts): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -54,7 +54,7 @@ async def test_timed_out_handler_fails( assert not handlers.resume_mock.called # Progress is reset, as the handler is not going to retry. - assert not k8s_mocked.sleep.called + assert looptime == 0 assert k8s_mocked.patch.called patch = k8s_mocked.patch.call_args_list[0][1]['payload'] @@ -71,7 +71,7 @@ async def test_timed_out_handler_fails( @pytest.mark.parametrize('cause_type', HANDLER_REASONS) async def test_retries_limited_handler_fails( registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, - caplog, assert_logs, k8s_mocked): + caplog, assert_logs, k8s_mocked, looptime): caplog.set_level(logging.DEBUG) name1 = f'{cause_type}_fn' @@ -104,7 +104,7 @@ async def test_retries_limited_handler_fails( assert not handlers.resume_mock.called # Progress is reset, as the handler is not going to retry. - assert not k8s_mocked.sleep.called + assert looptime == 0 assert k8s_mocked.patch.called patch = k8s_mocked.patch.call_args_list[0][1]['payload'] diff --git a/tests/k8s/test_watching_with_freezes.py b/tests/k8s/test_watching_with_freezes.py index c1512da86..efecbbc1c 100644 --- a/tests/k8s/test_watching_with_freezes.py +++ b/tests/k8s/test_watching_with_freezes.py @@ -8,21 +8,20 @@ async def test_pausing_is_ignored_if_turned_off( - resource, namespace, timer, caplog, assert_logs): + resource, namespace, looptime, caplog, assert_logs): caplog.set_level(logging.DEBUG) operator_paused = ToggleSet(any) await operator_paused.make_toggle(False) - with timer: - async with streaming_block( - resource=resource, - namespace=namespace, - operator_paused=operator_paused, - ): - pass + async with streaming_block( + resource=resource, + namespace=namespace, + operator_paused=operator_paused, + ): + pass - assert timer.seconds < 0.2 # no waits, exits as soon as possible + assert looptime == 0 assert_logs([], prohibited=[ r"Pausing the watch-stream for", r"Resuming the watch-stream for", @@ -30,7 +29,7 @@ async def test_pausing_is_ignored_if_turned_off( async def test_pausing_waits_forever_if_not_resumed( - resource, namespace, timer, caplog, assert_logs): + resource, namespace, looptime, caplog, assert_logs): caplog.set_level(logging.DEBUG) operator_paused = ToggleSet(any) @@ -44,10 +43,10 @@ async def do_it(): ): pass - with pytest.raises(asyncio.TimeoutError), timer: - await asyncio.wait_for(do_it(), timeout=0.5) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(do_it(), timeout=1.23) - assert timer.seconds >= 0.5 + assert looptime == 1.23 assert_logs([ r"Pausing the watch-stream for", ], prohibited=[ @@ -56,7 +55,7 @@ async def do_it(): async def test_pausing_waits_until_resumed( - resource, namespace, timer, caplog, assert_logs): + resource, namespace, looptime, caplog, assert_logs): caplog.set_level(logging.DEBUG) operator_paused = ToggleSet(any) @@ -66,17 +65,15 @@ async def delayed_resuming(delay: float): await asyncio.sleep(delay) await conflicts_found.turn_to(False) - with timer: - asyncio.create_task(delayed_resuming(0.2)) - async with streaming_block( - resource=resource, - namespace=namespace, - operator_paused=operator_paused, - ): - pass + asyncio.create_task(delayed_resuming(1.23)) + async with streaming_block( + resource=resource, + namespace=namespace, + operator_paused=operator_paused, + ): + pass - assert timer.seconds >= 0.2 - assert timer.seconds <= 0.5 + assert looptime == 1.23 assert_logs([ r"Pausing the watch-stream for", r"Resuming the watch-stream for", diff --git a/tests/logging/conftest.py b/tests/logging/conftest.py index da1915de8..c88245c58 100644 --- a/tests/logging/conftest.py +++ b/tests/logging/conftest.py @@ -10,11 +10,17 @@ def _caplog_all_levels(caplog): caplog.set_level(0) +# We can get a properly scoped running loop of the test only in the async fixture. +@pytest.fixture +async def _event_queue_running_loop(): + return asyncio.get_running_loop() + + @pytest.fixture(autouse=True) -def event_queue_loop(loop): # must be sync-def - token = event_queue_loop_var.set(loop) +def event_queue_loop(_event_queue_running_loop): # must be sync-def + token = event_queue_loop_var.set(_event_queue_running_loop) try: - yield loop + yield finally: event_queue_loop_var.reset(token) diff --git a/tests/observation/test_processing_of_namespaces.py b/tests/observation/test_processing_of_namespaces.py index 3d2bbd957..2fbe1a5f1 100644 --- a/tests/observation/test_processing_of_namespaces.py +++ b/tests/observation/test_processing_of_namespaces.py @@ -7,7 +7,7 @@ from kopf._core.reactor.observation import process_discovered_namespace_event -async def test_initial_listing_is_ignored(): +async def test_initial_listing_is_ignored(looptime): insights = Insights() e1 = RawEvent(type=None, object=RawBody(metadata={'name': 'ns1'})) @@ -19,13 +19,15 @@ async def delayed_injection(delay: float): task = asyncio.create_task(delayed_injection(0)) with pytest.raises(asyncio.TimeoutError): async with insights.revised: - await asyncio.wait_for(insights.revised.wait(), timeout=0.1) + await asyncio.wait_for(insights.revised.wait(), timeout=1.23) await task + + assert looptime == 1.23 assert not insights.namespaces @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED']) -async def test_followups_for_addition(timer, etype): +async def test_followups_for_addition(looptime, etype): insights = Insights() e1 = RawEvent(type=etype, object=RawBody(metadata={'name': 'ns1'})) @@ -34,17 +36,16 @@ async def delayed_injection(delay: float): await process_discovered_namespace_event( insights=insights, raw_event=e1, namespaces=['ns*']) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(9)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 0.13 + assert looptime == 9 assert insights.namespaces == {'ns1'} @pytest.mark.parametrize('etype', ['DELETED']) -async def test_followups_for_deletion(timer, etype): +async def test_followups_for_deletion(looptime, etype): insights = Insights() insights.namespaces.add('ns1') e1 = RawEvent(type=etype, object=RawBody(metadata={'name': 'ns1'})) @@ -54,10 +55,9 @@ async def delayed_injection(delay: float): await process_discovered_namespace_event( insights=insights, raw_event=e1, namespaces=['ns*']) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(9)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 0.11 + assert looptime == 9 assert not insights.namespaces diff --git a/tests/observation/test_processing_of_resources.py b/tests/observation/test_processing_of_resources.py index b3b67ed6b..857fbeef6 100644 --- a/tests/observation/test_processing_of_resources.py +++ b/tests/observation/test_processing_of_resources.py @@ -112,7 +112,7 @@ def fn(**_): pass @pytest.mark.parametrize('decorator', [kopf.on.validate, kopf.on.mutate]) @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED']) async def test_nonwatchable_resources_are_ignored( - settings, registry, apis_mock, group1_mock, timer, etype, decorator, insights): + settings, registry, apis_mock, group1_mock, looptime, etype, decorator, insights): @decorator('group1', 'version1', 'plural1') def fn(**_): pass @@ -124,19 +124,18 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(1.23)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 1.0 + assert looptime == 1.23 assert not insights.watched_resources assert apis_mock.called assert group1_mock.called async def test_initial_listing_is_ignored( - settings, registry, apis_mock, group1_mock, insights): + settings, registry, apis_mock, group1_mock, looptime, insights): e1 = RawEvent(type=None, object=RawBody(spec={'group': 'group1'})) @@ -148,8 +147,10 @@ async def delayed_injection(delay: float): task = asyncio.create_task(delayed_injection(0)) with pytest.raises(asyncio.TimeoutError): async with insights.revised: - await asyncio.wait_for(insights.revised.wait(), timeout=0.1) + await asyncio.wait_for(insights.revised.wait(), timeout=1.23) await task + + assert looptime == 1.23 assert not insights.indexed_resources assert not insights.watched_resources assert not insights.webhook_resources @@ -159,7 +160,7 @@ async def delayed_injection(delay: float): @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED']) async def test_followups_for_addition( - settings, registry, apis_mock, group1_mock, timer, etype, insights, insights_resources): + settings, registry, apis_mock, group1_mock, looptime, etype, insights, insights_resources): e1 = RawEvent(type=etype, object=RawBody(spec={'group': 'group1'})) r1 = Resource(group='group1', version='version1', plural='plural1') @@ -169,12 +170,12 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(1.23)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 1.0 + + assert looptime == 1.23 assert insights_resources == {r1} assert apis_mock.called assert group1_mock.called @@ -182,7 +183,7 @@ async def delayed_injection(delay: float): @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED', 'DELETED']) async def test_followups_for_deletion_of_resource( - settings, registry, apis_mock, group1_empty_mock, timer, etype, + settings, registry, apis_mock, group1_empty_mock, looptime, etype, insights, insights_resources): e1 = RawEvent(type=etype, object=RawBody(spec={'group': 'group1'})) @@ -194,12 +195,12 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(1.23)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 1.0 + + assert looptime == 1.23 assert not insights_resources assert apis_mock.called assert group1_empty_mock.called @@ -207,7 +208,7 @@ async def delayed_injection(delay: float): @pytest.mark.parametrize('etype', ['ADDED', 'MODIFIED', 'DELETED']) async def test_followups_for_deletion_of_group( - settings, registry, apis_mock, group1_404mock, timer, etype, insights, insights_resources): + settings, registry, apis_mock, group1_404mock, looptime, etype, insights, insights_resources): e1 = RawEvent(type=etype, object=RawBody(spec={'group': 'group1'})) r1 = Resource(group='group1', version='version1', plural='plural1') @@ -218,12 +219,12 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - async with insights.revised: - await insights.revised.wait() + task = asyncio.create_task(delayed_injection(1.23)) + async with insights.revised: + await insights.revised.wait() await task - assert 0.1 < timer.seconds < 1.0 + + assert looptime == 1.23 assert not insights_resources assert apis_mock.called assert group1_404mock.called @@ -231,7 +232,7 @@ async def delayed_injection(delay: float): @pytest.mark.parametrize('etype', ['DELETED']) async def test_backbone_is_filled( - settings, registry, core_mock, corev1_mock, timer, etype, insights): + settings, registry, core_mock, corev1_mock, looptime, etype, insights): e1 = RawEvent(type=etype, object=RawBody(spec={'group': ''})) @@ -240,11 +241,11 @@ async def delayed_injection(delay: float): await process_discovered_resource_event( insights=insights, raw_event=e1, registry=registry, settings=settings) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - await insights.backbone.wait_for(NAMESPACES) + task = asyncio.create_task(delayed_injection(1.23)) + await insights.backbone.wait_for(NAMESPACES) await task - assert 0.1 < timer.seconds < 1.0 + + assert looptime == 1.23 assert NAMESPACES in insights.backbone assert core_mock.called assert corev1_mock.called diff --git a/tests/peering/test_freeze_mode.py b/tests/peering/test_freeze_mode.py index 3e42b59ce..734770e59 100644 --- a/tests/peering/test_freeze_mode.py +++ b/tests/peering/test_freeze_mode.py @@ -1,3 +1,5 @@ +import asyncio + import freezegun import pytest @@ -7,7 +9,7 @@ async def test_other_peering_objects_are_ignored( - mocker, k8s_mocked, settings, + mocker, k8s_mocked, settings, looptime, peering_resource, peering_namespace): status = mocker.Mock() @@ -30,12 +32,12 @@ async def test_other_peering_objects_are_ignored( ) assert not status.items.called assert not k8s_mocked.patch.called - assert k8s_mocked.sleep.call_count == 0 + assert looptime == 0 @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_toggled_on_for_higher_priority_peer_when_initially_off( - k8s_mocked, caplog, assert_logs, settings, + k8s_mocked, caplog, assert_logs, settings, looptime, peering_resource, peering_namespace): event = bodies.RawEvent( @@ -54,13 +56,16 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off( settings.peering.priority = 100 conflicts_found = aiotoggles.Toggle(False) - k8s_mocked.sleep.return_value = 1 # as if interrupted by stream pressure + stream_pressure = asyncio.Event() + loop = asyncio.get_running_loop() + loop.call_later(1.23, stream_pressure.set) caplog.set_level(0) assert conflicts_found.is_off() await process_peering_event( raw_event=event, conflicts_found=conflicts_found, + stream_pressure=stream_pressure, autoclean=False, namespace=peering_namespace, resource=peering_resource, @@ -68,8 +73,7 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off( settings=settings, ) assert conflicts_found.is_on() - assert k8s_mocked.sleep.call_count == 1 - assert 9 < k8s_mocked.sleep.call_args[0][0][0] < 10 + assert looptime == 1.23 assert not k8s_mocked.patch.called assert_logs(["Pausing operations in favour of"], prohibited=[ "Possibly conflicting operators", @@ -80,7 +84,7 @@ async def test_toggled_on_for_higher_priority_peer_when_initially_off( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_ignored_for_higher_priority_peer_when_already_on( - k8s_mocked, caplog, assert_logs, settings, + k8s_mocked, caplog, assert_logs, settings, looptime, peering_resource, peering_namespace): event = bodies.RawEvent( @@ -99,13 +103,16 @@ async def test_ignored_for_higher_priority_peer_when_already_on( settings.peering.priority = 100 conflicts_found = aiotoggles.Toggle(True) - k8s_mocked.sleep.return_value = 1 # as if interrupted by stream pressure + stream_pressure = asyncio.Event() + loop = asyncio.get_running_loop() + loop.call_later(1.23, stream_pressure.set) caplog.set_level(0) assert conflicts_found.is_on() await process_peering_event( raw_event=event, conflicts_found=conflicts_found, + stream_pressure=stream_pressure, autoclean=False, namespace=peering_namespace, resource=peering_resource, @@ -113,8 +120,7 @@ async def test_ignored_for_higher_priority_peer_when_already_on( settings=settings, ) assert conflicts_found.is_on() - assert k8s_mocked.sleep.call_count == 1 - assert 9 < k8s_mocked.sleep.call_args[0][0][0] < 10 + assert looptime == 1.23 assert not k8s_mocked.patch.called assert_logs([], prohibited=[ "Possibly conflicting operators", @@ -126,7 +132,7 @@ async def test_ignored_for_higher_priority_peer_when_already_on( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_toggled_off_for_lower_priority_peer_when_initially_on( - k8s_mocked, caplog, assert_logs, settings, + k8s_mocked, caplog, assert_logs, settings, looptime, peering_resource, peering_namespace): event = bodies.RawEvent( @@ -145,13 +151,16 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on( settings.peering.priority = 100 conflicts_found = aiotoggles.Toggle(True) - k8s_mocked.sleep.return_value = 1 # as if interrupted by stream pressure + stream_pressure = asyncio.Event() + loop = asyncio.get_running_loop() + loop.call_later(1.23, stream_pressure.set) caplog.set_level(0) assert conflicts_found.is_on() await process_peering_event( raw_event=event, conflicts_found=conflicts_found, + stream_pressure=stream_pressure, autoclean=False, namespace=peering_namespace, resource=peering_resource, @@ -159,8 +168,7 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on( settings=settings, ) assert conflicts_found.is_off() - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args[0][0] == [] + assert looptime == 0 assert not k8s_mocked.patch.called assert_logs(["Resuming operations after the pause"], prohibited=[ "Possibly conflicting operators", @@ -171,7 +179,7 @@ async def test_toggled_off_for_lower_priority_peer_when_initially_on( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_ignored_for_lower_priority_peer_when_already_off( - k8s_mocked, caplog, assert_logs, settings, + k8s_mocked, caplog, assert_logs, settings, looptime, peering_resource, peering_namespace): event = bodies.RawEvent( @@ -190,13 +198,16 @@ async def test_ignored_for_lower_priority_peer_when_already_off( settings.peering.priority = 100 conflicts_found = aiotoggles.Toggle(False) - k8s_mocked.sleep.return_value = 1 # as if interrupted by stream pressure + stream_pressure = asyncio.Event() + loop = asyncio.get_running_loop() + loop.call_later(1.23, stream_pressure.set) caplog.set_level(0) assert conflicts_found.is_off() await process_peering_event( raw_event=event, conflicts_found=conflicts_found, + stream_pressure=stream_pressure, autoclean=False, namespace=peering_namespace, resource=peering_resource, @@ -204,8 +215,7 @@ async def test_ignored_for_lower_priority_peer_when_already_off( settings=settings, ) assert conflicts_found.is_off() - assert k8s_mocked.sleep.call_count == 1 - assert k8s_mocked.sleep.call_args[0][0] == [] + assert looptime == 0 assert not k8s_mocked.patch.called assert_logs([], prohibited=[ "Possibly conflicting operators", @@ -217,7 +227,7 @@ async def test_ignored_for_lower_priority_peer_when_already_off( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_toggled_on_for_same_priority_peer_when_initially_off( - k8s_mocked, caplog, assert_logs, settings, + k8s_mocked, caplog, assert_logs, settings, looptime, peering_resource, peering_namespace): event = bodies.RawEvent( @@ -236,13 +246,16 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off( settings.peering.priority = 100 conflicts_found = aiotoggles.Toggle(False) - k8s_mocked.sleep.return_value = 1 # as if interrupted by stream pressure + stream_pressure = asyncio.Event() + loop = asyncio.get_running_loop() + loop.call_later(1.23, stream_pressure.set) caplog.set_level(0) assert conflicts_found.is_off() await process_peering_event( raw_event=event, conflicts_found=conflicts_found, + stream_pressure=stream_pressure, autoclean=False, namespace=peering_namespace, resource=peering_resource, @@ -250,8 +263,7 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off( settings=settings, ) assert conflicts_found.is_on() - assert k8s_mocked.sleep.call_count == 1 - assert 9 < k8s_mocked.sleep.call_args[0][0][0] < 10 + assert looptime == 1.23 assert not k8s_mocked.patch.called assert_logs([ "Possibly conflicting operators", @@ -264,7 +276,7 @@ async def test_toggled_on_for_same_priority_peer_when_initially_off( @freezegun.freeze_time('2020-12-31T23:59:59.123456') async def test_ignored_for_same_priority_peer_when_already_on( - k8s_mocked, caplog, assert_logs, settings, + k8s_mocked, caplog, assert_logs, settings, looptime, peering_resource, peering_namespace): event = bodies.RawEvent( @@ -283,13 +295,16 @@ async def test_ignored_for_same_priority_peer_when_already_on( settings.peering.priority = 100 conflicts_found = aiotoggles.Toggle(True) - k8s_mocked.sleep.return_value = 1 # as if interrupted by stream pressure + stream_pressure = asyncio.Event() + loop = asyncio.get_running_loop() + loop.call_later(1.23, stream_pressure.set) caplog.set_level(0) assert conflicts_found.is_on() await process_peering_event( raw_event=event, conflicts_found=conflicts_found, + stream_pressure=stream_pressure, autoclean=False, namespace=peering_namespace, resource=peering_resource, @@ -297,8 +312,7 @@ async def test_ignored_for_same_priority_peer_when_already_on( settings=settings, ) assert conflicts_found.is_on() - assert k8s_mocked.sleep.call_count == 1 - assert 9 < k8s_mocked.sleep.call_args[0][0][0] < 10 + assert looptime == 1.23 assert not k8s_mocked.patch.called assert_logs([ "Possibly conflicting operators", @@ -312,7 +326,7 @@ async def test_ignored_for_same_priority_peer_when_already_on( @freezegun.freeze_time('2020-12-31T23:59:59.123456') @pytest.mark.parametrize('priority', [100, 101]) async def test_resumes_immediately_on_expiration_of_blocking_peers( - k8s_mocked, caplog, assert_logs, settings, priority, + k8s_mocked, caplog, assert_logs, settings, priority, looptime, peering_resource, peering_namespace): event = bodies.RawEvent( @@ -331,13 +345,14 @@ async def test_resumes_immediately_on_expiration_of_blocking_peers( settings.peering.priority = 100 conflicts_found = aiotoggles.Toggle(True) - k8s_mocked.sleep.return_value = None # as if finished sleeping uninterrupted + stream_pressure = asyncio.Event() caplog.set_level(0) assert conflicts_found.is_on() await process_peering_event( raw_event=event, conflicts_found=conflicts_found, + stream_pressure=stream_pressure, autoclean=False, namespace=peering_namespace, resource=peering_resource, @@ -345,6 +360,5 @@ async def test_resumes_immediately_on_expiration_of_blocking_peers( settings=settings, ) assert conflicts_found.is_on() - assert k8s_mocked.sleep.call_count == 1 - assert 9 < k8s_mocked.sleep.call_args[0][0][0] < 10 + assert looptime == 9.876544 assert k8s_mocked.patch.called diff --git a/tests/persistence/test_states.py b/tests/persistence/test_states.py index 04c05078a..54e0a3700 100644 --- a/tests/persistence/test_states.py +++ b/tests/persistence/test_states.py @@ -1,3 +1,4 @@ +import asyncio import datetime from unittest.mock import Mock @@ -380,12 +381,17 @@ async def test_started_from_storage(storage, handler, body, expected): (TSB_ISO, {'status': {'kopf': {'progress': {'some-id': {'started': TSB_ISO}}}}}), (TSA_ISO, {'status': {'kopf': {'progress': {'some-id': {'started': TSA_ISO}}}}}), ]) -async def test_started_from_storage_is_preferred_over_from_scratch(storage, handler, body, expected): - with freezegun.freeze_time(TS0): - state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) - with freezegun.freeze_time(TS1): - state = state.with_handlers([handler]) +@freezegun.freeze_time(TS0) +async def test_started_from_storage_is_preferred_over_from_scratch(storage, handler, body, expected, looptime): patch = Patch() + state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) + + # The state uses both the loop time and the wall-clock "base time", so we move them in sync. + await asyncio.sleep(123) + with freezegun.freeze_time() as ft: + ft.tick(123) + + state = state.with_handlers([handler]) state.store(body=Body({}), patch=patch, storage=storage) assert patch['status']['kopf']['progress']['some-id']['started'] == expected @@ -507,7 +513,11 @@ async def test_awakened_flag(storage, handler, expected, body): (None, {'status': {'kopf': {'progress': {'some-id': {}}}}}), (None, {'status': {'kopf': {'progress': {'some-id': {'delayed': None}}}}}), (TS0, {'status': {'kopf': {'progress': {'some-id': {'delayed': TS0_ISO}}}}}), + (TS1, {'status': {'kopf': {'progress': {'some-id': {'delayed': TS1_ISO}}}}}), + (TSB, {'status': {'kopf': {'progress': {'some-id': {'delayed': TSB_ISO}}}}}), + (TSA, {'status': {'kopf': {'progress': {'some-id': {'delayed': TSA_ISO}}}}}), ]) +@freezegun.freeze_time(TS0) async def test_awakening_time(storage, handler, expected, body): state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) diff --git a/tests/posting/conftest.py b/tests/posting/conftest.py index 92f8fecd8..b70affdab 100644 --- a/tests/posting/conftest.py +++ b/tests/posting/conftest.py @@ -5,11 +5,18 @@ from kopf._core.engines.posting import event_queue_loop_var, event_queue_var +# We can get a properly scoped running loop of the test only in the async fixture. +@pytest.fixture +async def _event_queue_running_loop(): + return asyncio.get_running_loop() + + + @pytest.fixture() -def event_queue_loop(loop): # must be sync-def - token = event_queue_loop_var.set(loop) +def event_queue_loop(_event_queue_running_loop): # must be sync-def + token = event_queue_loop_var.set(_event_queue_running_loop) try: - yield loop + yield finally: event_queue_loop_var.reset(token) diff --git a/tests/posting/test_threadsafety.py b/tests/posting/test_threadsafety.py index 47137145f..7a796a7cb 100644 --- a/tests/posting/test_threadsafety.py +++ b/tests/posting/test_threadsafety.py @@ -33,13 +33,13 @@ If thread safety is not ensured, the operators get sporadic errors regarding thread-unsafe calls, which are difficult to catch and reproduce. """ - import asyncio import contextvars import functools import threading import time +import looptime import pytest from kopf import event @@ -48,26 +48,16 @@ 'metadata': {'uid': 'uid1', 'name': 'name1', 'namespace': 'ns1'}} -@pytest.fixture() -def awakener(): - handles = [] - - def noop(): - pass - - def awaken_fn(delay, fn=noop): - handle = asyncio.get_running_loop().call_later(delay, fn) - handles.append(handle) - - try: - yield awaken_fn - finally: - for handle in handles: - handle.cancel() - - @pytest.fixture() def threader(): + """ + Call a sync function after a delay. Finalize the thread afterwards. + + Always put the threader setup **under** the chronometer or timer. + Otherwise, the code can seem to be executed faster than the sleep — + because there will be time spent on sleeping between the thread has started + and the chronometer/timer made its initial measurement of time: 0.01s or so. + """ threads = [] def start_fn(delay, fn): @@ -87,46 +77,63 @@ def thread_fn(): thread.join() -async def test_nonthreadsafe_indeed_fails(timer, awakener, threader, event_queue, event_queue_loop): +# This test relies on an assumption that nothing is happening in the event loop +# without new `await` statements or scheduled events/timers, i.e. nothing breaks +# the event loop's sleep the same way as a threadsafe injection does. Because of +# this, the 0.2s sync activity does not wake up the loop, only the 0.5s does. +@pytest.mark.looptime(False) +async def test_nonthreadsafe_indeed_fails(chronometer, threader, event_queue): + loop = asyncio.get_running_loop() + thread_was_called = threading.Event() def thread_fn(): + thread_was_called.set() event_queue.put_nowait(object()) - awakener(0.7) - threader(0.3, thread_fn) - - with timer: + with chronometer, looptime.Chronometer(loop.time) as loopometer: + threader(0.5, lambda: loop.call_soon_threadsafe(lambda: None)) + threader(0.2, thread_fn) await event_queue.get() - assert 0.6 <= timer.seconds <= 0.8 + assert 0.5 <= chronometer.seconds < 0.6 + assert 0.5 <= loopometer.seconds < 0.6 + assert thread_was_called.is_set() -async def test_threadsafe_indeed_works(timer, awakener, threader, event_queue, event_queue_loop): +@pytest.mark.looptime(False) +async def test_threadsafe_indeed_works(chronometer, threader, event_queue): + loop = asyncio.get_running_loop() + thread_was_called = threading.Event() def thread_fn(): - asyncio.run_coroutine_threadsafe(event_queue.put(object()), loop=event_queue_loop) - - awakener(0.7) - threader(0.3, thread_fn) + thread_was_called.set() + asyncio.run_coroutine_threadsafe(event_queue.put(object()), loop=loop) - with timer: + with chronometer, looptime.Chronometer(loop.time) as loopometer: + threader(0.5, lambda: loop.call_soon_threadsafe(lambda: None)) + threader(0.2, thread_fn) await event_queue.get() - assert 0.2 <= timer.seconds <= 0.4 + assert 0.2 <= chronometer.seconds < 0.3 + assert 0.2 <= loopometer.seconds < 0.3 + assert thread_was_called.is_set() -async def test_queueing_is_threadsafe(timer, awakener, threader, event_queue, event_queue_loop, - settings_via_contextvar): +@pytest.mark.looptime(False) +@pytest.mark.usefixtures('event_queue_loop', 'settings_via_contextvar') +async def test_queueing_is_threadsafe(chronometer, threader, event_queue): + loop = asyncio.get_running_loop() + thread_was_called = threading.Event() def thread_fn(): + thread_was_called.set() event(OBJ1, type='type1', reason='reason1', message='message1') - awakener(0.7) - threader(0.3, thread_fn) - - with timer: + with chronometer, looptime.Chronometer(loop.time) as loopometer: + threader(0.5, lambda: loop.call_soon_threadsafe(lambda: None)) + threader(0.2, thread_fn) await event_queue.get() - # TODO revert back 0.6 -> 0.4? (why is it 0.43-0.52s in GHA python 3.12 full-auth only?) - # probably caused by a bad vm/container randomly selected with "noisy neighbours". - assert 0.2 <= timer.seconds <= 0.6 + assert 0.2 <= chronometer.seconds < 0.3 + assert 0.2 <= loopometer.seconds < 0.3 + assert thread_was_called.is_set() diff --git a/tests/primitives/test_conditions.py b/tests/primitives/test_conditions.py index cf2be557f..97caa0bfd 100644 --- a/tests/primitives/test_conditions.py +++ b/tests/primitives/test_conditions.py @@ -5,20 +5,21 @@ from kopf._cogs.aiokits.aiobindings import condition_chain -async def test_no_triggering(): +async def test_no_triggering(looptime): source = asyncio.Condition() target = asyncio.Condition() task = asyncio.create_task(condition_chain(source, target)) try: with pytest.raises(asyncio.TimeoutError): async with target: - await asyncio.wait_for(target.wait(), timeout=0.1) + await asyncio.wait_for(target.wait(), timeout=1.23) + assert looptime == 1.23 finally: task.cancel() await asyncio.wait([task]) -async def test_triggering(timer): +async def test_triggering(looptime): source = asyncio.Condition() target = asyncio.Condition() task = asyncio.create_task(condition_chain(source, target)) @@ -29,13 +30,12 @@ async def delayed_trigger(): source.notify_all() loop = asyncio.get_running_loop() - loop.call_later(0.1, asyncio.create_task, delayed_trigger()) + loop.call_later(1.23, asyncio.create_task, delayed_trigger()) - with timer: - async with target: - await target.wait() + async with target: + await target.wait() - assert 0.1 <= timer.seconds <= 0.2 + assert looptime == 1.23 finally: task.cancel() diff --git a/tests/primitives/test_containers.py b/tests/primitives/test_containers.py index b158b4859..983fca51d 100644 --- a/tests/primitives/test_containers.py +++ b/tests/primitives/test_containers.py @@ -5,82 +5,78 @@ from kopf._cogs.aiokits.aiovalues import Container -async def test_empty_by_default(): +async def test_empty_by_default(looptime): container = Container() with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(container.wait(), timeout=0.1) + await asyncio.wait_for(container.wait(), timeout=9) + assert looptime == 9 -async def test_does_not_wake_up_when_reset(timer): +async def test_does_not_wake_up_when_reset(looptime): container = Container() async def reset_it(): await container.reset() loop = asyncio.get_running_loop() - loop.call_later(0.05, asyncio.create_task, reset_it()) + loop.call_later(1, asyncio.create_task, reset_it()) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(container.wait(), timeout=0.1) + await asyncio.wait_for(container.wait(), timeout=9) + assert looptime == 9 -async def test_wakes_up_when_preset(timer): +async def test_wakes_up_when_preset(looptime): container = Container() await container.set(123) - with timer: - result = await container.wait() - - assert timer.seconds <= 0.1 + result = await container.wait() + assert looptime == 0 assert result == 123 -async def test_wakes_up_when_set(timer): +async def test_wakes_up_when_set(looptime): container = Container() async def set_it(): await container.set(123) loop = asyncio.get_running_loop() - loop.call_later(0.1, asyncio.create_task, set_it()) - - with timer: - result = await container.wait() + loop.call_later(9, asyncio.create_task, set_it()) - assert 0.1 <= timer.seconds <= 0.2 + result = await container.wait() + assert looptime == 9 assert result == 123 -async def test_iterates_when_set(timer): +async def test_iterates_when_set(looptime): container = Container() async def set_it(v): await container.set(v) loop = asyncio.get_running_loop() - loop.call_later(0.1, asyncio.create_task, set_it(123)) - loop.call_later(0.2, asyncio.create_task, set_it(234)) + loop.call_later(6, asyncio.create_task, set_it(123)) + loop.call_later(9, asyncio.create_task, set_it(234)) values = [] - with timer: - async for value in container.as_changed(): - values.append(value) - if value == 234: - break + async for value in container.as_changed(): + values.append(value) + if value == 234: + break - assert 0.2 <= timer.seconds <= 0.3 + assert looptime == 9 assert values == [123, 234] -async def test_iterates_when_preset(timer): +async def test_iterates_when_preset(looptime): container = Container() await container.set(123) values = [] - with timer: - async for value in container.as_changed(): - values.append(value) - break + async for value in container.as_changed(): + values.append(value) + break - assert timer.seconds <= 0.1 + assert looptime == 0 assert values == [123] diff --git a/tests/primitives/test_flags.py b/tests/primitives/test_flags.py index f9db49ff2..008c5d14d 100644 --- a/tests/primitives/test_flags.py +++ b/tests/primitives/test_flags.py @@ -133,26 +133,26 @@ async def test_waiting_of_none_does_nothing(): await wait_flag(None) -async def test_waiting_for_unraised_times_out(flag, timer): - with pytest.raises(asyncio.TimeoutError), timer: - await asyncio.wait_for(wait_flag(flag), timeout=0.1) - assert timer.seconds >= 0.099 # uvloop finishes it earlier than 0.1 +async def test_waiting_for_unraised_times_out(flag, looptime): + # Beware: sync primitives consume the real time. + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(wait_flag(flag), timeout=0.123) + assert looptime == 0.123 -async def test_waiting_for_preraised_is_instant(flag, timer): +async def test_waiting_for_preraised_is_instant(flag, looptime): await raise_flag(flag) # tested separately above - with timer: - await wait_flag(flag) - assert timer.seconds < 0.5 # near-instant, plus code overhead + await wait_flag(flag) + assert looptime == 0 -async def test_waiting_for_raised_during_the_wait(flag, timer): +async def test_waiting_for_raised_during_the_wait(flag, looptime): async def raise_delayed(delay: float) -> None: await asyncio.sleep(delay) await raise_flag(flag) # tested separately above - asyncio.create_task(raise_delayed(0.2)) - with timer: - await wait_flag(flag) - assert 0.2 <= timer.seconds < 0.5 # near-instant once raised + # Beware: sync primitives consume the real time. + asyncio.create_task(raise_delayed(0.123)) + await wait_flag(flag) + assert looptime == 0.123 diff --git a/tests/primitives/test_toggles.py b/tests/primitives/test_toggles.py index d670bbd96..ca852904a 100644 --- a/tests/primitives/test_toggles.py +++ b/tests/primitives/test_toggles.py @@ -37,50 +37,48 @@ async def test_turning_off(): assert toggle.is_off() -async def test_waiting_until_on_fails_when_not_turned_on(): +async def test_waiting_until_on_fails_when_not_turned_on(looptime): toggle = Toggle(False) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(toggle.wait_for(True), timeout=0.1) - + await asyncio.wait_for(toggle.wait_for(True), timeout=1.23) assert toggle.is_off() + assert looptime == 1.23 -async def test_waiting_until_off_fails_when_not_turned_off(): +async def test_waiting_until_off_fails_when_not_turned_off(looptime): toggle = Toggle(True) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(toggle.wait_for(False), timeout=0.1) - + await asyncio.wait_for(toggle.wait_for(False), timeout=1.23) assert toggle.is_on() + assert looptime == 1.23 -async def test_waiting_until_on_wakes_when_turned_on(timer): +async def test_waiting_until_on_wakes_when_turned_on(looptime): toggle = Toggle(False) async def delayed_turning_on(delay: float): await asyncio.sleep(delay) await toggle.turn_to(True) - with timer: - asyncio.create_task(delayed_turning_on(0.05)) - await toggle.wait_for(True) + asyncio.create_task(delayed_turning_on(9)) + await toggle.wait_for(True) assert toggle.is_on() - assert timer.seconds < 0.5 # approx. 0.05 plus some code overhead + assert looptime == 9 -async def test_waiting_until_off_wakes_when_turned_off(timer): +async def test_waiting_until_off_wakes_when_turned_off(looptime): toggle = Toggle(True) async def delayed_turning_off(delay: float): await asyncio.sleep(delay) await toggle.turn_to(False) - with timer: - asyncio.create_task(delayed_turning_off(0.05)) - await toggle.wait_for(False) + asyncio.create_task(delayed_turning_off(9)) + await toggle.wait_for(False) assert toggle.is_off() - assert timer.seconds < 0.5 # approx. 0.05 plus some code overhead + assert looptime == 9 async def test_secures_against_usage_as_a_boolean(): diff --git a/tests/primitives/test_togglesets.py b/tests/primitives/test_togglesets.py index 8cb71324b..6b8ea6b32 100644 --- a/tests/primitives/test_togglesets.py +++ b/tests/primitives/test_togglesets.py @@ -195,25 +195,27 @@ async def test_all_toggles_must_be_off_for_anytoggleset_to_be_off(fn): @pytest.mark.parametrize('fn', [all, any]) -async def test_waiting_until_on_fails_when_not_turned_on(fn): +async def test_waiting_until_on_fails_when_not_turned_on(fn, looptime): toggleset = ToggleSet(fn) await toggleset.make_toggle(False) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(toggleset.wait_for(True), timeout=0.1) + await asyncio.wait_for(toggleset.wait_for(True), timeout=1.23) assert toggleset.is_off() + assert looptime == 1.23 @pytest.mark.parametrize('fn', [all, any]) -async def test_waiting_until_off_fails_when_not_turned_off(fn): +async def test_waiting_until_off_fails_when_not_turned_off(fn, looptime): toggleset = ToggleSet(fn) await toggleset.make_toggle(True) with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(toggleset.wait_for(False), timeout=0.1) + await asyncio.wait_for(toggleset.wait_for(False), timeout=1.23) assert toggleset.is_on() + assert looptime == 1.23 @pytest.mark.parametrize('fn', [all, any]) -async def test_waiting_until_on_wakes_when_turned_on(fn, timer): +async def test_waiting_until_on_wakes_when_turned_on(fn, looptime): toggleset = ToggleSet(fn) toggle = await toggleset.make_toggle(False) @@ -221,16 +223,15 @@ async def delayed_turning_on(delay: float): await asyncio.sleep(delay) await toggle.turn_to(True) - with timer: - asyncio.create_task(delayed_turning_on(0.05)) - await asyncio.wait_for(toggleset.wait_for(True), timeout=1.0) + asyncio.create_task(delayed_turning_on(9)) + await toggleset.wait_for(True) assert toggleset.is_on() - assert timer.seconds < 0.5 # approx. 0.05 plus some code overhead + assert looptime == 9 @pytest.mark.parametrize('fn', [all, any]) -async def test_waiting_until_off_wakes_when_turned_off(fn, timer): +async def test_waiting_until_off_wakes_when_turned_off(fn, looptime): toggleset = ToggleSet(fn) toggle = await toggleset.make_toggle(True) @@ -238,12 +239,11 @@ async def delayed_turning_off(delay: float): await asyncio.sleep(delay) await toggle.turn_to(False) - with timer: - asyncio.create_task(delayed_turning_off(0.05)) - await asyncio.wait_for(toggleset.wait_for(False), timeout=1.0) + asyncio.create_task(delayed_turning_off(9)) + await toggleset.wait_for(False) assert toggleset.is_off() - assert timer.seconds < 0.5 # approx. 0.05 plus some code overhead + assert looptime == 9 @pytest.mark.parametrize('fn', [all, any]) diff --git a/tests/reactor/test_queueing.py b/tests/reactor/test_queueing.py index f5d59e496..608865f72 100644 --- a/tests/reactor/test_queueing.py +++ b/tests/reactor/test_queueing.py @@ -46,7 +46,7 @@ ]) @pytest.mark.usefixtures('watcher_limited') -async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor, +async def test_watchevent_demultiplexing(worker_mock, looptime, resource, processor, settings, stream, events, uids, cnts): """ Verify that every unique uid goes into its own queue+worker, which are never shared. """ @@ -60,17 +60,16 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor stream.close(namespace=None) # Run the watcher (near-instantly and test-blocking). - with timer: - await watcher( - namespace=None, - resource=resource, - settings=settings, - processor=processor, - ) + await watcher( + namespace=None, + resource=resource, + settings=settings, + processor=processor, + ) # Extra-check: verify that the real workers were not involved: # they would do batching, which is absent in the mocked workers. - assert timer.seconds < settings.batching.batch_window + assert looptime == 0 # The processor must not be called by the watcher, only by the worker. # But the worker (even if mocked) must be called & awaited by the watcher. @@ -122,32 +121,30 @@ async def test_watchevent_demultiplexing(worker_mock, timer, resource, processor ]) @pytest.mark.usefixtures('watcher_limited') -async def test_watchevent_batching(settings, resource, processor, timer, - stream, events, uids, vals): +async def test_watchevent_batching(settings, resource, processor, + stream, events, uids, vals, looptime): """ Verify that only the last event per uid is actually handled. """ # Override the default timeouts to make the tests faster. - settings.batching.idle_timeout = 100 # should not be involved, fail if it is - settings.batching.exit_timeout = 100 # should exit instantly, fail if it didn't - settings.batching.batch_window = 0.3 # the time period being tested (make bigger than overhead) + settings.batching.idle_timeout = 999 # should not be involved, fail if it is + settings.batching.exit_timeout = 999 # should exit instantly, fail if it didn't + settings.batching.batch_window = 123 # the time period being tested # Inject the events of unique objects - to produce a few streams/workers. stream.feed(events, namespace=None) stream.close(namespace=None) # Run the watcher (near-instantly and test-blocking). - with timer: - await watcher( - namespace=None, - resource=resource, - settings=settings, - processor=processor, - ) + await watcher( + namespace=None, + resource=resource, + settings=settings, + processor=processor, + ) # Should be batched strictly once (never twice). Note: multiple uids run concurrently, # so they all are batched in parallel, and the timing remains the same. - assert timer.seconds > settings.batching.batch_window * 1 - assert timer.seconds < settings.batching.batch_window * 2 + assert looptime == 123 # Was the processor called at all? Awaited as needed for async fns? assert processor.await_count > 0 @@ -184,10 +181,10 @@ async def test_garbage_collection_of_streams( ): # Override the default timeouts to make the tests faster. - settings.batching.exit_timeout = 100 # should exit instantly, fail if it didn't - settings.batching.idle_timeout = .05 # finish workers faster, but not as fast as batching - settings.batching.batch_window = .01 # minimize the effects of batching (not our interest) - settings.watching.reconnect_backoff = 1.0 # to prevent src depletion + settings.batching.exit_timeout = 999 # should exit instantly, fail if it didn't + settings.batching.idle_timeout = 5 # finish workers faster, but not as fast as batching + settings.batching.batch_window = 1 # minimize the effects of batching (not our interest) + settings.watching.reconnect_backoff = 100 # to prevent src depletion # Inject the events of unique objects - to produce a few streams/workers. stream.feed(events, namespace=None) @@ -196,7 +193,7 @@ async def test_garbage_collection_of_streams( # Give it a moment to populate the streams and spawn all the workers. # Intercept and remember _any_ seen dict of streams for further checks. while worker_spy.call_count < unique: - await asyncio.sleep(0.001) # give control to the loop + await asyncio.sleep(0) # give control to the loop streams = worker_spy.call_args_list[-1][1]['streams'] signaller: asyncio.Condition = worker_spy.call_args_list[0][1]['signaller'] @@ -223,7 +220,7 @@ async def test_garbage_collection_of_streams( # Let the workers to actually exit and gc their local scopes with variables. # The jobs can take a tiny moment more, but this is noticeable in the tests. - await asyncio.sleep(0.1) + await asyncio.sleep(0) # For PyPy: force the gc! (GC can be delayed in PyPy, unlike in CPython.) # https://doc.pypy.org/en/latest/cpython_differences.html#differences-related-to-garbage-collection-strategies diff --git a/tests/references/test_backbone.py b/tests/references/test_backbone.py index 767c352db..2060a3b9a 100644 --- a/tests/references/test_backbone.py +++ b/tests/references/test_backbone.py @@ -47,23 +47,23 @@ async def test_refill_is_cumulative_ie_does_not_reset(): assert set(backbone) == {NAMESPACES, EVENTS} -async def test_waiting_for_absent_resources_never_ends(timer): +async def test_waiting_for_absent_resources_never_ends(looptime): backbone = Backbone() with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(backbone.wait_for(NAMESPACES), timeout=0.1) + await asyncio.wait_for(backbone.wait_for(NAMESPACES), timeout=1.23) + assert looptime == 1.23 -async def test_waiting_for_preexisting_resources_ends_instantly(timer): +async def test_waiting_for_preexisting_resources_ends_instantly(looptime): resource = Resource('', 'v1', 'namespaces') backbone = Backbone() await backbone.fill(resources=[resource]) - with timer: - found_resource = await backbone.wait_for(NAMESPACES) - assert timer.seconds < 0.1 + found_resource = await backbone.wait_for(NAMESPACES) + assert looptime == 0 assert found_resource == resource -async def test_waiting_for_delayed_resources_ends_once_delivered(timer): +async def test_waiting_for_delayed_resources_ends_once_delivered(looptime): resource = Resource('', 'v1', 'namespaces') backbone = Backbone() @@ -71,9 +71,8 @@ async def delayed_injection(delay: float): await asyncio.sleep(delay) await backbone.fill(resources=[resource]) - task = asyncio.create_task(delayed_injection(0.1)) - with timer: - found_resource = await backbone.wait_for(NAMESPACES) + task = asyncio.create_task(delayed_injection(123)) + found_resource = await backbone.wait_for(NAMESPACES) await task - assert 0.1 < timer.seconds < 0.11 + assert looptime == 123 assert found_resource == resource diff --git a/tests/test_async.py b/tests/test_async.py index 14792cd7b..904962235 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -6,24 +6,18 @@ _async_was_executed = False -async def test_async_tests_are_enabled(timer): +async def test_async_tests_are_enabled(): global _async_was_executed _async_was_executed = True # asserted in a sync-test below. - with timer as t: - await asyncio.sleep(0.5) - assert t.seconds > 0.5 # real sleep - - -async def test_async_mocks_are_enabled(timer, mocker): +async def test_async_mocks_are_enabled(mocker, looptime): p = mocker.patch('asyncio.sleep') - with timer as t: - await asyncio.sleep(1.0) + await asyncio.sleep(1.0) assert p.call_count > 0 assert p.await_count > 0 - assert t.seconds < 0.01 # mocked sleep + assert looptime == 0 def test_async_test_was_executed_and_awaited(): diff --git a/tests/timing/test_sleeping.py b/tests/timing/test_sleeping.py index dfc8d77e7..4bfc8fcb3 100644 --- a/tests/timing/test_sleeping.py +++ b/tests/timing/test_sleeping.py @@ -5,24 +5,21 @@ from kopf._cogs.aiokits.aiotime import sleep -async def test_the_only_delay_is_awaited(timer): - with timer: - unslept = await sleep(0.10) - assert 0.10 <= timer.seconds < 0.11 +async def test_the_only_delay_is_awaited(looptime): + unslept = await sleep(123) + assert looptime == 123 assert unslept is None -async def test_the_shortest_delay_is_awaited(timer): - with timer: - unslept = await sleep([0.10, 0.20]) - assert 0.10 <= timer.seconds < 0.11 +async def test_the_shortest_delay_is_awaited(looptime): + unslept = await sleep([123, 456]) + assert looptime == 123 assert unslept is None -async def test_specific_delays_only_are_awaited(timer): - with timer: - unslept = await sleep([0.10, None]) - assert 0.10 <= timer.seconds < 0.11 +async def test_specific_delays_only_are_awaited(looptime): + unslept = await sleep([123, None]) + assert looptime == 123 assert unslept is None @@ -31,10 +28,9 @@ async def test_specific_delays_only_are_awaited(timer): pytest.param([-100, -10], id='all-negative'), pytest.param(-10, id='alone'), ]) -async def test_negative_delays_skip_sleeping(timer, delays): - with timer: - unslept = await sleep(delays) - assert timer.seconds < 0.01 +async def test_negative_delays_skip_sleeping(looptime, delays): + unslept = await sleep(delays) + assert looptime == 0 assert unslept is None @@ -42,36 +38,32 @@ async def test_negative_delays_skip_sleeping(timer, delays): pytest.param([], id='empty-list'), pytest.param([None], id='list-of-none'), ]) -async def test_no_delays_skip_sleeping(timer, delays): - with timer: - unslept = await sleep(delays) - assert timer.seconds < 0.01 +async def test_no_delays_skip_sleeping(looptime, delays): + unslept = await sleep(delays) + assert looptime == 0 assert unslept is None -async def test_by_event_set_before_time_comes(timer): +async def test_by_event_set_before_time_comes(looptime): event = asyncio.Event() - asyncio.get_running_loop().call_later(0.07, event.set) - with timer: - unslept = await sleep(0.10, event) + asyncio.get_running_loop().call_later(7, event.set) + unslept = await sleep(10, event) assert unslept is not None - assert 0.02 <= unslept <= 0.04 - assert 0.06 <= timer.seconds <= 0.08 + assert unslept == 3 + assert looptime == 7 -async def test_with_zero_time_and_event_initially_cleared(timer): +async def test_with_zero_time_and_event_initially_cleared(looptime): event = asyncio.Event() event.clear() - with timer: - unslept = await sleep(0, event) - assert timer.seconds <= 0.01 + unslept = await sleep(0, event) + assert looptime == 0 assert unslept is None -async def test_with_zero_time_and_event_initially_set(timer): +async def test_with_zero_time_and_event_initially_set(looptime): event = asyncio.Event() event.set() - with timer: - unslept = await sleep(0, event) - assert timer.seconds <= 0.01 + unslept = await sleep(0, event) + assert looptime == 0 assert not unslept # 0/None; undefined for such case: both goals reached. diff --git a/tests/timing/test_throttling.py b/tests/timing/test_throttling.py index d8a2a3588..8a8a6e9e3 100644 --- a/tests/timing/test_throttling.py +++ b/tests/timing/test_throttling.py @@ -6,10 +6,7 @@ from kopf._core.actions.throttlers import Throttler, throttled - -@pytest.fixture(autouse=True) -async def clock(mocker): - return mocker.patch.object(asyncio.get_running_loop(), 'time', return_value=0) +pytestmark = pytest.mark.looptime(start=1000) @pytest.fixture(autouse=True) @@ -164,12 +161,11 @@ async def test_renews_on_repeated_failure(sleep): assert sleep.mock_calls == [call(234, wakeup=None)] -async def test_interruption(clock, sleep): +async def test_interruption(sleep): wakeup = asyncio.Event() logger = logging.getLogger() throttler = Throttler() - clock.return_value = 1000 # simulated "now" sleep.return_value = 55 # simulated sleep time left async with throttled(throttler=throttler, logger=logger, delays=[123, 234], wakeup=wakeup): raise Exception() @@ -180,18 +176,17 @@ async def test_interruption(clock, sleep): assert sleep.mock_calls == [call(123, wakeup=wakeup)] -async def test_continuation_with_success(clock, sleep): +async def test_continuation_with_success(sleep): wakeup = asyncio.Event() logger = logging.getLogger() throttler = Throttler() - clock.return_value = 1000 # simulated "now" sleep.return_value = 55 # simulated sleep time left async with throttled(throttler=throttler, logger=logger, delays=[123, 234], wakeup=wakeup): raise Exception() + await asyncio.sleep(77) sleep.reset_mock() - clock.return_value = 1077 # simulated "now" sleep.return_value = None # simulated sleep time left async with throttled(throttler=throttler, logger=logger, delays=[...], wakeup=wakeup): pass @@ -202,18 +197,17 @@ async def test_continuation_with_success(clock, sleep): assert sleep.mock_calls == [call(123 - 77, wakeup=wakeup)] -async def test_continuation_with_error(clock, sleep): +async def test_continuation_with_error(sleep): wakeup = asyncio.Event() logger = logging.getLogger() throttler = Throttler() - clock.return_value = 1000 # simulated "now" sleep.return_value = 55 # simulated sleep time left async with throttled(throttler=throttler, logger=logger, delays=[123, 234], wakeup=wakeup): raise Exception() + await asyncio.sleep(77) sleep.reset_mock() - clock.return_value = 1077 # simulated "now" sleep.return_value = None # simulated sleep time left async with throttled(throttler=throttler, logger=logger, delays=[...], wakeup=wakeup): raise Exception() @@ -224,18 +218,17 @@ async def test_continuation_with_error(clock, sleep): assert sleep.mock_calls == [call(123 - 77, wakeup=wakeup), call(234, wakeup=wakeup)] -async def test_continuation_when_overdue(clock, sleep): +async def test_continuation_when_overdue(sleep): wakeup = asyncio.Event() logger = logging.getLogger() throttler = Throttler() - clock.return_value = 1000 # simulated "now" sleep.return_value = 55 # simulated sleep time left async with throttled(throttler=throttler, logger=logger, delays=[123, 234], wakeup=wakeup): raise Exception() + await asyncio.sleep(1000) sleep.reset_mock() - clock.return_value = 2000 # simulated "now" sleep.return_value = None # simulated sleep time left async with throttled(throttler=throttler, logger=logger, delays=[...], wakeup=wakeup): raise Exception() diff --git a/tests/utilities/aiotasks/test_scheduler.py b/tests/utilities/aiotasks/test_scheduler.py index 430d26d82..3b809ea74 100644 --- a/tests/utilities/aiotasks/test_scheduler.py +++ b/tests/utilities/aiotasks/test_scheduler.py @@ -5,8 +5,6 @@ from kopf._cogs.aiokits.aiotasks import Scheduler -CODE_OVERHEAD = 0.01 - async def f(mock, *args): try: @@ -14,7 +12,7 @@ async def f(mock, *args): for arg in args: if isinstance(arg, asyncio.Event): arg.set() - elif isinstance(arg, float): + elif isinstance(arg, (int, float)): await asyncio.sleep(arg) elif callable(arg): arg() @@ -24,70 +22,62 @@ async def f(mock, *args): mock('finished') -async def test_empty_scheduler_lifecycle(timer): - with timer: - scheduler = Scheduler() - assert scheduler.empty() - await scheduler.wait() - assert scheduler.empty() - await scheduler.close() - assert scheduler.empty() - assert timer.seconds < CODE_OVERHEAD +async def test_empty_scheduler_lifecycle(looptime): + scheduler = Scheduler() + assert scheduler.empty() + await scheduler.wait() + assert scheduler.empty() + await scheduler.close() + assert scheduler.empty() + assert looptime == 0 -async def test_task_spawning_and_graceful_finishing(timer): +async def test_task_spawning_and_graceful_finishing(looptime): mock = Mock() flag1 = asyncio.Event() flag2 = asyncio.Event() scheduler = Scheduler() - result = await scheduler.spawn(f(mock, flag1, 0.1, flag2)) + result = await scheduler.spawn(f(mock, flag1, 123, flag2)) assert result is None - with timer: - await flag1.wait() - assert timer.seconds < CODE_OVERHEAD - assert mock.call_args[0][0] == 'started' + await flag1.wait() + assert looptime == 0 + assert mock.call_args_list[0][0][0] == 'started' - with timer: - await flag2.wait() - assert timer.seconds > 0.1 - assert timer.seconds < 0.1 + CODE_OVERHEAD - assert mock.call_args[0][0] == 'finished' + await flag2.wait() + assert looptime == 123 + assert mock.call_args_list[1][0][0] == 'finished' await scheduler.close() -async def test_task_spawning_and_cancellation(timer): +async def test_task_spawning_and_cancellation(looptime): mock = Mock() flag1 = asyncio.Event() flag2 = asyncio.Event() scheduler = Scheduler() - result = await scheduler.spawn(f(mock, flag1, 1.0, flag2)) + result = await scheduler.spawn(f(mock, flag1, 123, flag2)) assert result is None - with timer: - await flag1.wait() - assert timer.seconds < CODE_OVERHEAD - assert mock.call_args[0][0] == 'started' + await flag1.wait() + assert looptime == 0 + assert mock.call_args_list[0][0][0] == 'started' - with timer: - await scheduler.close() - assert timer.seconds < CODE_OVERHEAD # near-instant - assert mock.call_args[0][0] == 'cancelled' + await scheduler.close() + assert looptime == 0 + assert mock.call_args_list[1][0][0] == 'cancelled' async def test_no_tasks_are_accepted_after_closing(): scheduler = Scheduler() await scheduler.close() - assert scheduler._closed assert scheduler._spawning_task.done() assert scheduler._cleaning_task.done() - with pytest.raises(RuntimeError, match=r"Cannot add new coroutines"): - await scheduler.spawn(f(Mock(), 1.0)) + await scheduler.spawn(f(Mock(), 123)) async def test_successes_are_not_reported(): @@ -121,7 +111,7 @@ async def test_exceptions_are_reported(): assert exception_handler.call_args[0][0] is exception -async def test_tasks_are_parallel_if_limit_is_not_reached(timer): +async def test_tasks_are_parallel_if_limit_is_not_reached(looptime): """ time: ////////----------------------0.1s------------------0.2s--/// task1: ->spawn->start->sleep->finish->| @@ -133,24 +123,19 @@ async def test_tasks_are_parallel_if_limit_is_not_reached(timer): task2_finished = asyncio.Event() scheduler = Scheduler(limit=2) - with timer: - await scheduler.spawn(f(Mock(), task1_started, 0.1, task1_finished)) - await scheduler.spawn(f(Mock(), task2_started, 0.1, task2_finished)) - assert timer.seconds < CODE_OVERHEAD # i.e. spawning is not not blocking + await scheduler.spawn(f(Mock(), task1_started, 9, task1_finished)) + await scheduler.spawn(f(Mock(), task2_started, 9, task2_finished)) + assert looptime == 0 # i.e. spawning is not not blocking - with timer: - await task1_finished.wait() - assert task2_started.is_set() - await task2_finished.wait() - - # TODO: LATER: code coverage takes even more code overhead. Redesign when switch to looptime. - assert timer.seconds > 0.1 - assert timer.seconds < 0.1 + CODE_OVERHEAD * 8 + await task1_finished.wait() + assert task2_started.is_set() + await task2_finished.wait() + assert looptime == 9 await scheduler.close() -async def test_tasks_are_pending_if_limit_is_reached(timer): +async def test_tasks_are_pending_if_limit_is_reached(looptime): """ time: ////////----------------------0.1s------------------0.2s--/// task1: ->spawn->start->sleep->finish->| @@ -162,17 +147,13 @@ async def test_tasks_are_pending_if_limit_is_reached(timer): task2_finished = asyncio.Event() scheduler = Scheduler(limit=1) - with timer: - await scheduler.spawn(f(Mock(), task1_started, 0.1, task1_finished)) - await scheduler.spawn(f(Mock(), task2_started, 0.1, task2_finished)) - assert timer.seconds < CODE_OVERHEAD # i.e. spawning is not not blocking - - with timer: - await task1_finished.wait() - assert not task2_started.is_set() - await task2_finished.wait() + await scheduler.spawn(f(Mock(), task1_started, 9, task1_finished)) + await scheduler.spawn(f(Mock(), task2_started, 9, task2_finished)) + assert looptime == 0 # i.e. spawning is not not blocking - assert timer.seconds > 0.2 - assert timer.seconds < 0.2 + CODE_OVERHEAD * 2 + await task1_finished.wait() + assert not task2_started.is_set() + await task2_finished.wait() + assert looptime == 18 await scheduler.close() diff --git a/tests/utilities/aiotasks/test_task_selection.py b/tests/utilities/aiotasks/test_task_selection.py index 8de7d9e1c..c5449f077 100644 --- a/tests/utilities/aiotasks/test_task_selection.py +++ b/tests/utilities/aiotasks/test_task_selection.py @@ -7,7 +7,7 @@ async def test_alltasks_exclusion(): flag = asyncio.Event() task1 = asyncio.create_task(flag.wait()) task2 = asyncio.create_task(flag.wait()) - done, pending = await asyncio.wait([task1, task2], timeout=0.01) + done, pending = await asyncio.wait([task1, task2], timeout=0.01) # let them start assert not done tasks = await all_tasks(ignored=[task2]) diff --git a/tests/utilities/aiotasks/test_task_stopping.py b/tests/utilities/aiotasks/test_task_stopping.py index c61bfe9c9..a2adb6857 100644 --- a/tests/utilities/aiotasks/test_task_stopping.py +++ b/tests/utilities/aiotasks/test_task_stopping.py @@ -17,25 +17,27 @@ async def stuck() -> None: await asyncio.Event().wait() -async def test_stop_with_no_tasks(assert_logs, caplog): +async def test_stop_with_no_tasks(assert_logs, caplog, looptime): logger = logging.getLogger() caplog.set_level(0) done, pending = await stop([], title='sample', logger=logger) assert not done assert not pending assert_logs(["Sample tasks stopping is skipped: no tasks given."]) + assert looptime == 0 -async def test_stop_with_no_tasks_when_quiet(assert_logs, caplog): +async def test_stop_with_no_tasks_when_quiet(assert_logs, caplog, looptime): logger = logging.getLogger() caplog.set_level(0) done, pending = await stop([], title='sample', logger=logger, quiet=True) assert not done assert not pending assert not caplog.messages + assert looptime == 0 -async def test_stop_immediately_with_finishing(assert_logs, caplog): +async def test_stop_immediately_with_finishing(assert_logs, caplog, looptime): logger = logging.getLogger() caplog.set_level(0) task1 = asyncio.create_task(simple()) @@ -46,9 +48,10 @@ async def test_stop_immediately_with_finishing(assert_logs, caplog): assert_logs(["Sample tasks are stopped: finishing normally"]) assert task1.cancelled() assert task2.cancelled() + assert looptime == 0 -async def test_stop_immediately_with_cancelling(assert_logs, caplog): +async def test_stop_immediately_with_cancelling(assert_logs, caplog, looptime): logger = logging.getLogger() caplog.set_level(0) task1 = asyncio.create_task(simple()) @@ -59,27 +62,31 @@ async def test_stop_immediately_with_cancelling(assert_logs, caplog): assert_logs(["Sample tasks are stopped: cancelling normally"]) assert task1.cancelled() assert task2.cancelled() + assert looptime == 0 @pytest.mark.parametrize('cancelled', [False, True]) -async def test_stop_iteratively(assert_logs, caplog, cancelled): +async def test_stop_iteratively(assert_logs, caplog, cancelled, looptime): logger = logging.getLogger() caplog.set_level(0) task1 = asyncio.create_task(simple()) task2 = asyncio.create_task(stuck()) - stask = asyncio.create_task(stop([task1, task2], title='sample', logger=logger, interval=0.01, cancelled=cancelled)) + stask = asyncio.create_task(stop([task1, task2], title='sample', logger=logger, interval=1, cancelled=cancelled)) + assert looptime == 0 - done, pending = await asyncio.wait({stask}, timeout=0.011) + done, pending = await asyncio.wait({stask}, timeout=10) assert not done assert task1.done() assert not task2.done() + assert looptime == 10 task2.cancel() - done, pending = await asyncio.wait({stask}, timeout=0.011) + done, pending = await asyncio.wait({stask}, timeout=10) assert done assert task1.done() assert task2.done() + assert looptime == 10 # not 20! assert_logs([ r"Sample tasks are not stopped: (finishing|cancelling) normally; tasks left: \{