diff --git a/kopf/_cogs/aiokits/aioenums.py b/kopf/_cogs/aiokits/aioenums.py index 11747eb86..6bcaf4ecb 100644 --- a/kopf/_cogs/aiokits/aioenums.py +++ b/kopf/_cogs/aiokits/aioenums.py @@ -1,7 +1,6 @@ import asyncio import enum import threading -import time from collections.abc import Awaitable, Generator from typing import Generic, TypeVar @@ -52,7 +51,7 @@ def is_set(self, reason: FlagReasonT | None = None) -> bool: def set(self, reason: FlagReasonT | None = None) -> None: reason = reason if reason is not None else self.reason # to keep existing values - self.when = self.when if self.when is not None else time.monotonic() + self.when = self.when if self.when is not None else asyncio.get_running_loop().time() self.reason = reason if self.reason is None or reason is None else self.reason | reason self.sync_event.set() self.async_event.set() # it is thread-safe: always called in operator's event loop. diff --git a/kopf/_core/actions/progression.py b/kopf/_core/actions/progression.py index f2271498f..2e4a40d36 100644 --- a/kopf/_core/actions/progression.py +++ b/kopf/_core/actions/progression.py @@ -9,8 +9,69 @@ and the overall handling routine should persist the handler status somewhere. The states are persisted in a state storage: see `kopf._cogs.configs.progress`. -""" +MOCKED LOOP TIME: + +**For testability,** we use ``basetime + timedelta(seconds=loop.time())`` +to calculate the "now" moment instead of ``datetime.utcnow()``. + +The "basetime" is an imaginary UTC time when the loop clock was zero (``0.0``) +and is calculated as ``datetime.utcnow() - timedelta(seconds=loop.time())`` +(assuming these two calls are almost instant and the precision loss is low). + +In normal run mode, the "basetime" remains constant for the entire life time +of an event loop, since both loop time and wall-clock time move forward with +the same speed: the calculation of "basetime" always produces the same result. + +In test mode, the loop time is mocked and moves as events (e.g. sleeps) happen: +it can move (much) faster than the wall-clock time, e.g. 100s of loop seconds +in 1/100th of a wall-clock second; or it can freeze and not move at all. + +PROBLEMATIC INACCURACY: + +Because of a highly unprecise and everchanging component in the formula +of the "basetime" — the non-mockable UTC clock — the "basetime" calculation +can give different results at different times even if executed fast enough. + +To reduce the inaccuracy introduced by sequential UTC time measurements, +we calculate the "basetime" once per every global state object created +and push it down to owned state objects of the individual handlers +in this halding cycle of this resource object in this unit-test. + +That gives us sufficient accuracy while remaining simple enough, assuming that +there are no multiple concurrent global state objects per single unit-test. +_(An alternative would be to calculate the "basetime" on event loop creation +or to cache it per event loop in a global WeakDict, but that is an overkill.)_ + +SUFFICIENT ACCURACY: + +With this approach and ``looptime``__, we can detach from the wall-clock time +in tests and simulate the time's rapid movement into the future by "recovering" +the "now" moment as ``basetime + timedelta(seconds=loop.time())`` (see above) — +without wall-clock delays or hitting the issues with code execution overhead. + +Note that there is no UTC clock involved now, only the controled loop clock, +so multiple sequential calculation will lead to predictable abd precise results, +especially when the loop clock is frozen (i.e. constant for a short duration). + +__ https://github.com/nolar/looptime + +USER PERSPECTIVE: + +This time math is never exposed to users and never persisted in storages. +It is used only internally to decouple the operator routines from the system +clock and strictly couple it to the time of the loop. + +IMPLEMENTATION DETAILS: + +Q: Why do we store UTC time in the fields instead of the floats with loop time? +A: If we store floats in the fields, we need to do the math on every +fetching/storing operation, which introduces minor divergence in supposedly +constant data as stored in the external storages. Instead, we only calculate +the "now" moment. As a result, the precision loss is seen only at runtime checks +and is indistinguishanle from the loop clock sensitivity. +""" +import asyncio import collections.abc import copy import dataclasses @@ -40,6 +101,7 @@ class HandlerState(execution.HandlerState): """ active: bool # whether it is used in done/delays [T] or only in counters/purges [F]. + basetime: datetime.datetime # a moment when the loop time was zero started: datetime.datetime stopped: datetime.datetime | None = None # None means it is still running (e.g. delayed). delayed: datetime.datetime | None = None # None means it is finished (succeeded/failed). @@ -57,9 +119,8 @@ def finished(self) -> bool: @property def sleeping(self) -> bool: - ts = self.delayed - now = datetime.datetime.now(tz=datetime.timezone.utc) - return not self.finished and ts is not None and ts > now + now = self.basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time()) + return not self.finished and self.delayed is not None and self.delayed > now @property def awakened(self) -> bool: @@ -67,23 +128,33 @@ def awakened(self) -> bool: @property def runtime(self) -> datetime.timedelta: - return datetime.datetime.now(tz=datetime.timezone.utc) - self.started + now = self.basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time()) + return now - self.started @classmethod - def from_scratch(cls, *, purpose: str | None = None) -> "HandlerState": - return cls( - active=True, - started=datetime.datetime.now(datetime.timezone.utc), - purpose=purpose, - ) + def from_scratch( + cls, + *, + basetime: datetime.datetime, + purpose: str | None = None, + ) -> "HandlerState": + now = basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time()) + return cls(active=True, basetime=basetime, started=now, purpose=purpose) @classmethod - def from_storage(cls, __d: progress.ProgressRecord) -> "HandlerState": + def from_storage( + cls, + __d: progress.ProgressRecord, + *, + basetime: datetime.datetime, + ) -> "HandlerState": + now = basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time()) return cls( active=False, - started=_parse_iso8601(__d.get('started')) or datetime.datetime.now(datetime.timezone.utc), - stopped=_parse_iso8601(__d.get('stopped')), - delayed=_parse_iso8601(__d.get('delayed')), + basetime=basetime, + started=parse_iso8601(__d.get('started')) or now, + stopped=parse_iso8601(__d.get('stopped')), + delayed=parse_iso8601(__d.get('delayed')), purpose=__d.get('purpose') if __d.get('purpose') else None, retries=__d.get('retries') or 0, success=__d.get('success') or False, @@ -95,9 +166,9 @@ def from_storage(cls, __d: progress.ProgressRecord) -> "HandlerState": def for_storage(self) -> progress.ProgressRecord: return progress.ProgressRecord( - started=None if self.started is None else _format_iso8601(self.started), - stopped=None if self.stopped is None else _format_iso8601(self.stopped), - delayed=None if self.delayed is None else _format_iso8601(self.delayed), + started=None if self.started is None else format_iso8601(self.started), + stopped=None if self.stopped is None else format_iso8601(self.stopped), + delayed=None if self.delayed is None else format_iso8601(self.delayed), purpose=None if self.purpose is None else str(self.purpose), retries=None if self.retries is None else int(self.retries), success=None if self.success is None else bool(self.success), @@ -123,10 +194,11 @@ def with_outcome( self, outcome: execution.Outcome, ) -> "HandlerState": - now = datetime.datetime.now(datetime.timezone.utc) + now = self.basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time()) cls = type(self) return cls( active=self.active, + basetime=self.basetime, purpose=self.purpose, started=self.started, stopped=self.stopped if self.stopped is not None else now if outcome.final else None, @@ -159,19 +231,25 @@ class State(execution.State): """ _states: Mapping[ids.HandlerId, HandlerState] + # Eliminate even the smallest microsecond-scale deviations by using the shared base time. + # The deviations can come from UTC wall-clock time slowly moving during the run (CPU overhead). + basetime: datetime.datetime + def __init__( self, __src: Mapping[ids.HandlerId, HandlerState], *, + basetime: datetime.datetime, purpose: str | None = None, ): super().__init__() self._states = dict(__src) self.purpose = purpose + self.basetime = basetime @classmethod def from_scratch(cls) -> "State": - return cls({}) + return cls({}, basetime=_get_basetime()) @classmethod def from_storage( @@ -181,13 +259,14 @@ def from_storage( storage: progress.ProgressStorage, handlers: Iterable[execution.Handler], ) -> "State": + basetime = _get_basetime() handler_ids = {handler.id for handler in handlers} handler_states: dict[ids.HandlerId, HandlerState] = {} for handler_id in handler_ids: content = storage.fetch(key=handler_id, body=body) if content is not None: - handler_states[handler_id] = HandlerState.from_storage(content) - return cls(handler_states) + handler_states[handler_id] = HandlerState.from_storage(content, basetime=basetime) + return cls(handler_states, basetime=basetime) def with_purpose( self, @@ -198,7 +277,7 @@ def with_purpose( for handler in handlers: handler_states[handler.id] = handler_states[handler.id].with_purpose(purpose) cls = type(self) - return cls(handler_states, purpose=purpose) + return cls(handler_states, basetime=self.basetime, purpose=purpose) def with_handlers( self, @@ -207,11 +286,12 @@ def with_handlers( handler_states: dict[ids.HandlerId, HandlerState] = dict(self) for handler in handlers: if handler.id not in handler_states: - handler_states[handler.id] = HandlerState.from_scratch(purpose=self.purpose) + handler_states[handler.id] = HandlerState.from_scratch( + basetime=self.basetime, purpose=self.purpose) else: handler_states[handler.id] = handler_states[handler.id].as_active() cls = type(self) - return cls(handler_states, purpose=self.purpose) + return cls(handler_states, basetime=self.basetime, purpose=self.purpose) def with_outcomes( self, @@ -226,7 +306,7 @@ def with_outcomes( handler_id: (handler_state if handler_id not in outcomes else handler_state.with_outcome(outcomes[handler_id])) for handler_id, handler_state in self._states.items() - }, purpose=self.purpose) + }, basetime=self.basetime, purpose=self.purpose) def without_successes(self) -> "State": cls = type(self) @@ -234,7 +314,7 @@ def without_successes(self) -> "State": handler_id: handler_state for handler_id, handler_state in self._states.items() if not handler_state.success # i.e. failures & in-progress/retrying - }) + }, basetime=self.basetime) def store( self, @@ -332,9 +412,9 @@ def delays(self) -> Collection[float]: processing routine, based on all delays of different origin: e.g. postponed daemons, stopping daemons, temporarily failed handlers. """ - now = datetime.datetime.now(datetime.timezone.utc) + now = self.basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time()) return [ - max(0, (handler_state.delayed - now).total_seconds()) if handler_state.delayed else 0 + max(0.0, (handler_state.delayed - now).total_seconds()) if handler_state.delayed else 0 for handler_state in self._states.values() if handler_state.active and not handler_state.finished ] @@ -374,24 +454,29 @@ def deliver_results( @overload -def _format_iso8601(val: None) -> None: ... +def format_iso8601(val: None) -> None: ... @overload -def _format_iso8601(val: datetime.datetime) -> str: ... +def format_iso8601(val: datetime.datetime) -> str: ... -def _format_iso8601(val: datetime.datetime | None) -> str | None: +def format_iso8601(val: datetime.datetime | None) -> str | None: return None if val is None else val.isoformat(timespec='microseconds') @overload -def _parse_iso8601(val: None) -> None: ... +def parse_iso8601(val: None) -> None: ... @overload -def _parse_iso8601(val: str) -> datetime.datetime: ... +def parse_iso8601(val: str) -> datetime.datetime: ... + + +def parse_iso8601(val: str | None) -> datetime.datetime | None: + return None if val is None else iso8601.parse_date(val, default_timezone=None) -def _parse_iso8601(val: str | None) -> datetime.datetime | None: - return None if val is None else iso8601.parse_date(val) # always TZ-aware +def _get_basetime() -> datetime.datetime: + loop = asyncio.get_running_loop() + return datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta(seconds=loop.time()) diff --git a/kopf/_core/actions/throttlers.py b/kopf/_core/actions/throttlers.py index fc945b8f9..7bbccf422 100644 --- a/kopf/_core/actions/throttlers.py +++ b/kopf/_core/actions/throttlers.py @@ -1,7 +1,6 @@ import asyncio import contextlib import dataclasses -import time from collections.abc import AsyncGenerator, Iterable, Iterator from kopf._cogs.aiokits import aiotime @@ -28,11 +27,12 @@ async def throttled( """ A helper to throttle any arbitrary operation. """ + clock = asyncio.get_running_loop().time # The 1st sleep: if throttling is already active, but was interrupted by a queue replenishment. # It is needed to properly process the latest known event after the successful sleep. if throttler.active_until is not None: - remaining_time = throttler.active_until - time.monotonic() + remaining_time = throttler.active_until - clock() unslept_time = await aiotime.sleep(remaining_time, wakeup=wakeup) if unslept_time is None: logger.info("Throttling is over. Switching back to normal operations.") @@ -61,7 +61,7 @@ async def throttled( delay = next(throttler.source_of_delays, throttler.last_used_delay) if delay is not None: throttler.last_used_delay = delay - throttler.active_until = time.monotonic() + delay + throttler.active_until = clock() + delay logger.exception(f"Throttling for {delay} seconds due to an unexpected error: {e!r}") else: @@ -72,7 +72,7 @@ async def throttled( # The 2nd sleep: if throttling has been just activated (i.e. there was a fresh error). # It is needed to have better logging/sleeping without workers exiting for "no events". if throttler.active_until is not None and should_run: - remaining_time = throttler.active_until - time.monotonic() + remaining_time = throttler.active_until - clock() unslept_time = await aiotime.sleep(remaining_time, wakeup=wakeup) if unslept_time is None: throttler.active_until = None diff --git a/kopf/_core/engines/daemons.py b/kopf/_core/engines/daemons.py index 308fdaaee..1c0e9be5c 100644 --- a/kopf/_core/engines/daemons.py +++ b/kopf/_core/engines/daemons.py @@ -23,7 +23,6 @@ import abc import asyncio import dataclasses -import time import warnings from collections.abc import Collection, Iterable, Mapping, MutableMapping, Sequence @@ -35,6 +34,10 @@ from kopf._core.intents import causes, handlers as handlers_, stoppers +def _loop_time() -> float: + return asyncio.get_running_loop().time() + + @dataclasses.dataclass(frozen=True) class Daemon: task: aiotasks.Task # a guarding task of the daemon. @@ -47,7 +50,7 @@ class Daemon: class DaemonsMemory: # For background and timed threads/tasks (invoked with the kwargs of the last-seen body). live_fresh_body: bodies.Body | None = None - idle_reset_time: float = dataclasses.field(default_factory=time.monotonic) + idle_reset_time: float = dataclasses.field(default_factory=_loop_time) forever_stopped: set[ids.HandlerId] = dataclasses.field(default_factory=set) running_daemons: dict[ids.HandlerId, Daemon] = dataclasses.field(default_factory=dict) @@ -182,7 +185,7 @@ async def stop_daemons( (as by their surrounding circumstances: deletion handlers and finalizers). """ delays: list[float] = [] - now = time.monotonic() + now = asyncio.get_running_loop().time() for daemon in list(daemons.values()): logger = daemon.logger stopper = daemon.stopper @@ -537,6 +540,7 @@ async def _timer( await aiotime.sleep(handler.initial_delay, wakeup=stopper.async_event) # Similar to activities (in-memory execution), but applies patches on every attempt. + clock = asyncio.get_running_loop().time state = progression.State.from_scratch().with_handlers([handler]) while not stopper.is_set(): # NB: ignore state.done! it is checked below explicitly. @@ -548,14 +552,14 @@ async def _timer( # Both `now` and `last_seen_time` are moving targets: the last seen time is updated # on every watch-event received, and prolongs the sleep. The sleep is never shortened. if handler.idle is not None: - while not stopper.is_set() and time.monotonic() - memory.idle_reset_time < handler.idle: - delay = memory.idle_reset_time + handler.idle - time.monotonic() + while not stopper.is_set() and clock() - memory.idle_reset_time < handler.idle: + delay = memory.idle_reset_time + handler.idle - clock() await aiotime.sleep(delay, wakeup=stopper.async_event) if stopper.is_set(): continue # Remember the start time for the sharp timing and idle-time-waster below. - started = time.monotonic() + started = clock() # Execute the handler as usually, in-memory, but handle its outcome on every attempt. outcomes = await execution.execute_handlers_once( @@ -585,7 +589,7 @@ async def _timer( # |-----|-----|-----|-----|-----|-----|---> (interval=5, sharp=True) # [slow_handler]....[slow_handler]....[slow... elif handler.interval is not None and handler.sharp: - passed_duration = time.monotonic() - started + passed_duration = clock() - started remaining_delay = handler.interval - (passed_duration % handler.interval) await aiotime.sleep(remaining_delay, wakeup=stopper.async_event) diff --git a/kopf/_core/reactor/processing.py b/kopf/_core/reactor/processing.py index 5ad9b390e..5ffdf04b0 100644 --- a/kopf/_core/reactor/processing.py +++ b/kopf/_core/reactor/processing.py @@ -14,7 +14,6 @@ and therefore do not trigger the user-defined handlers. """ import asyncio -import time from collections.abc import Collection from kopf._cogs.aiokits import aiotoggles @@ -329,7 +328,7 @@ async def process_spawning_cause( if memory.daemons_memory.live_fresh_body is None: memory.daemons_memory.live_fresh_body = cause.body if cause.reset: - memory.daemons_memory.idle_reset_time = time.monotonic() + memory.daemons_memory.idle_reset_time = asyncio.get_running_loop().time() if finalizers.is_deletion_ongoing(cause.body): stopping_delays = await daemons.stop_daemons( diff --git a/tests/basic-structs/test_memories.py b/tests/basic-structs/test_memories.py index 9f0c3f459..9ce87ae93 100644 --- a/tests/basic-structs/test_memories.py +++ b/tests/basic-structs/test_memories.py @@ -11,7 +11,7 @@ } -def test_creation_with_defaults(): +async def test_creation_with_defaults(): ResourceMemory() diff --git a/tests/handling/daemons/conftest.py b/tests/handling/daemons/conftest.py index fc6f2ae51..a6f8e9220 100644 --- a/tests/handling/daemons/conftest.py +++ b/tests/handling/daemons/conftest.py @@ -99,15 +99,16 @@ async def background_daemon_killer(settings, memories, operator_paused): @pytest.fixture() -def frozen_time(): +async def frozen_time(): """ A helper to simulate time movements to step over long sleeps/timeouts. """ - # TODO LATER: Either freezegun should support the system clock, or find something else. - with freezegun.freeze_time("2020-01-01T00:00:00") as frozen: + with freezegun.freeze_time("2020-01-01 00:00:00") as frozen: # Use freezegun-supported time instead of system clocks -- for testing purposes only. # NB: Patch strictly after the time is frozen -- to use fake_time(), not real time(). - with patch('time.monotonic', time.time), patch('time.perf_counter', time.time): + # NB: StdLib's event loops use time.monotonic(), but uvloop uses its own C-level time, + # so patch the loop object directly instead of its implied underlying implementation. + with patch.object(asyncio.get_running_loop(), 'time', time.time): yield frozen diff --git a/tests/handling/indexing/test_index_exclusion.py b/tests/handling/indexing/test_index_exclusion.py index 87c3cb431..ecc8c6c64 100644 --- a/tests/handling/indexing/test_index_exclusion.py +++ b/tests/handling/indexing/test_index_exclusion.py @@ -1,4 +1,5 @@ import asyncio +import datetime import logging import freezegun @@ -30,9 +31,12 @@ async def test_successes_are_removed_from_the_indexing_state( resource, namespace, settings, registry, memories, indexers, caplog, event_type, handlers): caplog.set_level(logging.DEBUG) + + # Any "future" time works and affects nothing as long as it is the same + basetime = datetime.datetime.now(tz=datetime.timezone.utc) body = {'metadata': {'namespace': namespace, 'name': 'name1'}} record = ProgressRecord(success=True) - state = State({HandlerId('unrelated'): HandlerState.from_storage(record)}) + state = State({HandlerId('unrelated'): HandlerState.from_storage(record, basetime=basetime)}, basetime=basetime) memory = await memories.recall(raw_body=body) memory.indexing_memory.indexing_state = state handlers.index_mock.side_effect = 123 @@ -56,9 +60,12 @@ async def test_successes_are_removed_from_the_indexing_state( async def test_temporary_failures_with_no_delays_are_reindexed( resource, namespace, settings, registry, memories, indexers, index, caplog, event_type, handlers): caplog.set_level(logging.DEBUG) + + # Any "future" time works and affects nothing as long as it is the same + basetime = datetime.datetime.now(tz=datetime.timezone.utc) body = {'metadata': {'namespace': namespace, 'name': 'name1'}} record = ProgressRecord(delayed=None) - state = State({HandlerId('index_fn'): HandlerState.from_storage(record)}) + state = State({HandlerId('index_fn'): HandlerState.from_storage(record, basetime=basetime)}, basetime=basetime) memory = await memories.recall(raw_body=body) memory.indexing_memory.indexing_state = state await process_resource_event( @@ -81,9 +88,12 @@ async def test_temporary_failures_with_no_delays_are_reindexed( async def test_temporary_failures_with_expired_delays_are_reindexed( resource, namespace, settings, registry, memories, indexers, index, caplog, event_type, handlers): caplog.set_level(logging.DEBUG) + + # Any "future" time works and affects nothing as long as it is the same + basetime = datetime.datetime.now(tz=datetime.timezone.utc) body = {'metadata': {'namespace': namespace, 'name': 'name1'}} - record = ProgressRecord(delayed='2020-12-31T23:59:59.000000') - state = State({HandlerId('index_fn'): HandlerState.from_storage(record)}) + record = ProgressRecord(delayed='2020-12-31T23:59:59.000000Z') + state = State({HandlerId('index_fn'): HandlerState.from_storage(record, basetime=basetime)}, basetime=basetime) memory = await memories.recall(raw_body=body) memory.indexing_memory.indexing_state = state await process_resource_event( @@ -105,9 +115,12 @@ async def test_temporary_failures_with_expired_delays_are_reindexed( async def test_permanent_failures_are_not_reindexed( resource, namespace, settings, registry, memories, indexers, index, caplog, event_type, handlers): caplog.set_level(logging.DEBUG) + + # Any "future" time works and affects nothing as long as it is the same + basetime = datetime.datetime.now(tz=datetime.timezone.utc) body = {'metadata': {'namespace': namespace, 'name': 'name1'}} record = ProgressRecord(failure=True) - state = State({HandlerId('index_fn'): HandlerState.from_storage(record)}) + state = State({HandlerId('index_fn'): HandlerState.from_storage(record, basetime=basetime)}, basetime=basetime) memory = await memories.recall(raw_body=body) memory.indexing_memory.indexing_state = state await process_resource_event( diff --git a/tests/handling/test_delays.py b/tests/handling/test_delays.py index 7ab019b81..91848f337 100644 --- a/tests/handling/test_delays.py +++ b/tests/handling/test_delays.py @@ -1,8 +1,8 @@ import asyncio +import datetime import logging import freezegun -import iso8601 import pytest import kopf @@ -75,10 +75,13 @@ async def test_delayed_handlers_sleep( caplog, assert_logs, k8s_mocked, now, delayed_iso, delay): caplog.set_level(logging.DEBUG) + # Any "future" time works and affects nothing as long as it is the same + basetime = datetime.datetime.now(tz=datetime.timezone.utc) + # Simulate the original persisted state of the resource. # Make sure the finalizer is added since there are mandatory deletion handlers. record = ProgressRecord(started='2000-01-01T00:00:00', delayed=delayed_iso) # a long time ago - state_dict = HandlerState.from_storage(record).as_in_storage() + state_dict = HandlerState.from_storage(record, basetime=basetime).as_in_storage() event_type = None if cause_reason == Reason.RESUME else 'irrelevant' event_body = { 'metadata': {'finalizers': [settings.persistence.finalizer]}, diff --git a/tests/handling/test_multistep.py b/tests/handling/test_multistep.py index 16545b239..5a146666e 100644 --- a/tests/handling/test_multistep.py +++ b/tests/handling/test_multistep.py @@ -82,8 +82,8 @@ async def test_2nd_step_finishes_the_handlers(caplog, event_body = { 'metadata': {'finalizers': [settings.persistence.finalizer]}, 'status': {'kopf': {'progress': { - name1: {'started': '1979-01-01T00:00:00', 'success': True}, - name2: {'started': '1979-01-01T00:00:00'}, + name1: {'started': '1979-01-01T00:00:00Z', 'success': True}, + name2: {'started': '1979-01-01T00:00:00Z'}, }}} } event_body['metadata'] |= deletion_ts diff --git a/tests/handling/test_retrying_limits.py b/tests/handling/test_retrying_limits.py index 6bc4e7bba..bff463e27 100644 --- a/tests/handling/test_retrying_limits.py +++ b/tests/handling/test_retrying_limits.py @@ -16,7 +16,7 @@ # The extrahandlers are needed to prevent the cycle ending and status purging. @pytest.mark.parametrize('cause_type', HANDLER_REASONS) @pytest.mark.parametrize('now, ts', [ - ['2099-12-31T23:59:59', '2020-01-01T00:00:00'], + ['2099-12-31T23:59:59Z', '2020-01-01T00:00:00Z'], ], ids=['slow']) async def test_timed_out_handler_fails( registry, settings, handlers, extrahandlers, resource, cause_mock, cause_type, diff --git a/tests/handling/test_timing_consistency.py b/tests/handling/test_timing_consistency.py index 81d001ab8..4ceb9a7b0 100644 --- a/tests/handling/test_timing_consistency.py +++ b/tests/handling/test_timing_consistency.py @@ -35,10 +35,10 @@ async def test_consistent_awakening(registry, settings, resource, k8s_mocked, mo """ # Simulate that the object is scheduled to be awakened between the watch-event and sleep. - ts0 = iso8601.parse_date('2019-12-30T10:56:43') - tsA_triggered = "2019-12-30T10:56:42.999999" - ts0_scheduled = "2019-12-30T10:56:43.000000" - tsB_delivered = "2019-12-30T10:56:43.000001" + ts0 = iso8601.parse_date('2019-12-30T10:56:43Z') + tsA_triggered = "2019-12-30T10:56:42.999999Z" + ts0_scheduled = "2019-12-30T10:56:43.000000Z" + tsB_delivered = "2019-12-30T10:56:43.000001Z" # A dummy handler: it will not be selected for execution anyway, we just need to have it. @kopf.on.create(*resource, id='some-id') diff --git a/tests/lifecycles/test_handler_selection.py b/tests/lifecycles/test_handler_selection.py index aca6bdacd..6b0a2e79f 100644 --- a/tests/lifecycles/test_handler_selection.py +++ b/tests/lifecycles/test_handler_selection.py @@ -12,7 +12,7 @@ kopf.lifecycles.shuffled, kopf.lifecycles.asap, ]) -def test_with_empty_input(lifecycle): +async def test_with_empty_input(lifecycle): state = State.from_scratch() handlers = [] selected = lifecycle(handlers, state=state) @@ -79,7 +79,7 @@ def test_shuffled_takes_them_all(): assert set(selected) == {handler1, handler2, handler3} -def test_asap_takes_the_first_one_when_no_retries(mocker): +async def test_asap_takes_the_first_one_when_no_retries(mocker): handler1 = mocker.Mock(id='id1', spec_set=['id']) handler2 = mocker.Mock(id='id2', spec_set=['id']) handler3 = mocker.Mock(id='id3', spec_set=['id']) @@ -92,7 +92,7 @@ def test_asap_takes_the_first_one_when_no_retries(mocker): assert selected[0] is handler1 -def test_asap_takes_the_least_retried(mocker): +async def test_asap_takes_the_least_retried(mocker): handler1 = mocker.Mock(id='id1', spec_set=['id']) handler2 = mocker.Mock(id='id2', spec_set=['id']) handler3 = mocker.Mock(id='id3', spec_set=['id']) diff --git a/tests/persistence/test_iso8601_times.py b/tests/persistence/test_iso8601_times.py new file mode 100644 index 000000000..73f17c505 --- /dev/null +++ b/tests/persistence/test_iso8601_times.py @@ -0,0 +1,43 @@ +import datetime + +import pytest + +from kopf._core.actions.progression import format_iso8601, parse_iso8601 + +UTC = datetime.timezone.utc +WEST11 = datetime.timezone(datetime.timedelta(hours=-11)) +EAST11 = datetime.timezone(datetime.timedelta(hours=11)) + + +@pytest.mark.parametrize('timestamp, expected', [ + (None, None), + (datetime.datetime(2000, 1, 1), '2000-01-01T00:00:00.000000'), + (datetime.datetime(2000, 1, 1, 9, 8, 7, 654321), '2000-01-01T09:08:07.654321'), + (datetime.datetime(2000, 1, 1, tzinfo=UTC), '2000-01-01T00:00:00.000000+00:00'), + (datetime.datetime(2000, 1, 1, 9, 8, 7, 654321, tzinfo=UTC), '2000-01-01T09:08:07.654321+00:00'), + (datetime.datetime(2000, 1, 1, 0, 0, 0, tzinfo=WEST11), '2000-01-01T00:00:00.000000-11:00'), + (datetime.datetime(2000, 1, 1, 9, 8, 7, 654321, tzinfo=WEST11), '2000-01-01T09:08:07.654321-11:00'), + (datetime.datetime(2000, 1, 1, 0, 0, 0, tzinfo=EAST11), '2000-01-01T00:00:00.000000+11:00'), + (datetime.datetime(2000, 1, 1, 9, 8, 7, 654321, tzinfo=EAST11), '2000-01-01T09:08:07.654321+11:00'), +]) +def test_iso8601_formatting(timestamp, expected): + result = format_iso8601(timestamp) + assert result == expected + + +@pytest.mark.parametrize('timestamp, expected', [ + (None, None), + ('2000-01-01T00:00:00.000000', datetime.datetime(2000, 1, 1)), + ('2000-01-01T09:08:07.654321', datetime.datetime(2000, 1, 1, 9, 8, 7, 654321)), + ('2000-01-01T00:00:00.000000Z', datetime.datetime(2000, 1, 1, tzinfo=UTC)), + ('2000-01-01T09:08:07.654321Z', datetime.datetime(2000, 1, 1, 9, 8, 7, 654321, tzinfo=UTC)), + ('2000-01-01T00:00:00.000000+00:00', datetime.datetime(2000, 1, 1, tzinfo=UTC)), + ('2000-01-01T09:08:07.654321+00:00', datetime.datetime(2000, 1, 1, 9, 8, 7, 654321, tzinfo=UTC)), + ('2000-01-01T00:00:00.000000-11:00', datetime.datetime(2000, 1, 1, tzinfo=WEST11)), + ('2000-01-01T09:08:07.654321-11:00', datetime.datetime(2000, 1, 1, 9, 8, 7, 654321, tzinfo=WEST11)), + ('2000-01-01T00:00:00.000000+11:00', datetime.datetime(2000, 1, 1, tzinfo=EAST11)), + ('2000-01-01T09:08:07.654321+11:00', datetime.datetime(2000, 1, 1, 9, 8, 7, 654321, tzinfo=EAST11)), +]) +def test_iso8601_parsing(timestamp, expected): + result = parse_iso8601(timestamp) + assert result == expected diff --git a/tests/persistence/test_states.py b/tests/persistence/test_states.py index d0b0b1b12..04c05078a 100644 --- a/tests/persistence/test_states.py +++ b/tests/persistence/test_states.py @@ -41,7 +41,7 @@ def handler(): # -def test_created_empty_from_scratch(): +async def test_created_empty_from_scratch(): state = State.from_scratch() assert len(state) == 0 assert state.purpose is None @@ -58,7 +58,7 @@ def test_created_empty_from_scratch(): ({'status': {'kopf': {}}}), ({'status': {'kopf': {'progress': {}}}}), ]) -def test_created_empty_from_empty_storage_with_handlers(storage, handler, body): +async def test_created_empty_from_empty_storage_with_handlers(storage, handler, body): state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) assert len(state) == 0 assert state.purpose is None @@ -74,7 +74,7 @@ def test_created_empty_from_empty_storage_with_handlers(storage, handler, body): ({'status': {'kopf': {'progress': {'some-id': {'success': True}}}}}), ({'status': {'kopf': {'progress': {'some-id': {'failure': True}}}}}), ]) -def test_created_empty_from_filled_storage_without_handlers(storage, handler, body): +async def test_created_empty_from_filled_storage_without_handlers(storage, handler, body): state = State.from_storage(body=Body(body), handlers=[], storage=storage) assert len(state) == 0 assert state.purpose is None @@ -90,21 +90,21 @@ def test_created_empty_from_filled_storage_without_handlers(storage, handler, bo # -def test_created_from_storage_as_passive(storage, handler): +async def test_created_from_storage_as_passive(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) assert len(state) == 1 assert state['some-id'].active is False -def test_created_from_handlers_as_active(storage, handler): +async def test_created_from_handlers_as_active(storage, handler): state = State.from_scratch() state = state.with_handlers([handler]) assert len(state) == 1 assert state['some-id'].active is True -def test_switched_from_passive_to_active(storage, handler): +async def test_switched_from_passive_to_active(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {'purpose': None}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) @@ -112,7 +112,7 @@ def test_switched_from_passive_to_active(storage, handler): assert state['some-id'].active is True -def test_passed_through_with_outcomes_when_passive(storage, handler): +async def test_passed_through_with_outcomes_when_passive(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {'purpose': None}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_outcomes({'some-id': Outcome(final=True)}) @@ -120,7 +120,7 @@ def test_passed_through_with_outcomes_when_passive(storage, handler): assert state['some-id'].active is False -def test_passed_through_with_outcomes_when_active(storage, handler): +async def test_passed_through_with_outcomes_when_active(storage, handler): state = State.from_scratch() state = state.with_handlers([handler]) state = state.with_outcomes({'some-id': Outcome(final=True)}) @@ -128,14 +128,14 @@ def test_passed_through_with_outcomes_when_active(storage, handler): assert state['some-id'].active is True -def test_passive_states_are_not_used_in_done_calculation(storage, handler): +async def test_passive_states_are_not_used_in_done_calculation(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) assert len(state) == 1 assert state.done is True # because the unfinished handler state is ignored -def test_active_states_are_used_in_done_calculation(storage, handler): +async def test_active_states_are_used_in_done_calculation(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) @@ -144,7 +144,7 @@ def test_active_states_are_used_in_done_calculation(storage, handler): @freezegun.freeze_time(TS0) -def test_passive_states_are_not_used_in_delays_calculation(storage, handler): +async def test_passive_states_are_not_used_in_delays_calculation(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {'delayed': TS1_ISO}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) assert len(state) == 1 @@ -152,7 +152,7 @@ def test_passive_states_are_not_used_in_delays_calculation(storage, handler): @freezegun.freeze_time(TS0) -def test_active_states_are_used_in_delays_calculation(storage, handler): +async def test_active_states_are_used_in_delays_calculation(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {'delayed': TS1_ISO}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) @@ -165,7 +165,7 @@ def test_active_states_are_used_in_delays_calculation(storage, handler): # -def test_created_from_purposeless_storage(storage, handler): +async def test_created_from_purposeless_storage(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {'purpose': None}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) assert len(state) == 1 @@ -174,7 +174,7 @@ def test_created_from_purposeless_storage(storage, handler): @pytest.mark.parametrize('reason', HANDLER_REASONS) -def test_created_from_purposeful_storage(storage, handler, reason): +async def test_created_from_purposeful_storage(storage, handler, reason): body = {'status': {'kopf': {'progress': {'some-id': {'purpose': reason.value}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) assert len(state) == 1 @@ -183,7 +183,7 @@ def test_created_from_purposeful_storage(storage, handler, reason): @pytest.mark.parametrize('reason', HANDLER_REASONS) -def test_enriched_with_handlers_keeps_the_original_purpose(handler, reason): +async def test_enriched_with_handlers_keeps_the_original_purpose(handler, reason): state = State.from_scratch() state = state.with_purpose(reason) state = state.with_handlers([handler]) @@ -191,7 +191,7 @@ def test_enriched_with_handlers_keeps_the_original_purpose(handler, reason): @pytest.mark.parametrize('reason', HANDLER_REASONS) -def test_enriched_with_outcomes_keeps_the_original_purpose(reason): +async def test_enriched_with_outcomes_keeps_the_original_purpose(reason): state = State.from_scratch() state = state.with_purpose(reason) state = state.with_outcomes({}) @@ -199,7 +199,7 @@ def test_enriched_with_outcomes_keeps_the_original_purpose(reason): @pytest.mark.parametrize('reason', HANDLER_REASONS) -def test_repurposed_before_handlers(handler, reason): +async def test_repurposed_before_handlers(handler, reason): state = State.from_scratch() state = state.with_purpose(reason).with_handlers([handler]) assert len(state) == 1 @@ -208,7 +208,7 @@ def test_repurposed_before_handlers(handler, reason): @pytest.mark.parametrize('reason', HANDLER_REASONS) -def test_repurposed_after_handlers(handler, reason): +async def test_repurposed_after_handlers(handler, reason): state = State.from_scratch() state = state.with_handlers([handler]).with_purpose(reason) assert len(state) == 1 @@ -217,7 +217,7 @@ def test_repurposed_after_handlers(handler, reason): @pytest.mark.parametrize('reason', HANDLER_REASONS) -def test_repurposed_with_handlers(handler, reason): +async def test_repurposed_with_handlers(handler, reason): state = State.from_scratch() state = state.with_handlers([handler]).with_purpose(reason, handlers=[handler]) assert len(state) == 1 @@ -226,7 +226,7 @@ def test_repurposed_with_handlers(handler, reason): @pytest.mark.parametrize('reason', HANDLER_REASONS) -def test_repurposed_not_affecting_the_existing_handlers_from_scratch(handler, reason): +async def test_repurposed_not_affecting_the_existing_handlers_from_scratch(handler, reason): state = State.from_scratch() state = state.with_handlers([handler]).with_purpose(reason).with_handlers([handler]) assert len(state) == 1 @@ -235,7 +235,7 @@ def test_repurposed_not_affecting_the_existing_handlers_from_scratch(handler, re @pytest.mark.parametrize('reason', HANDLER_REASONS) -def test_repurposed_not_affecting_the_existing_handlers_from_storage(storage, handler, reason): +async def test_repurposed_not_affecting_the_existing_handlers_from_storage(storage, handler, reason): body = {'status': {'kopf': {'progress': {'some-id': {'purpose': None}}}}} state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]).with_purpose(reason).with_handlers([handler]) @@ -259,7 +259,7 @@ def test_repurposed_not_affecting_the_existing_handlers_from_storage(storage, ha # All combinations except for same-to-same (it is not an "extra" then). (a, b) for a in HANDLER_REASONS for b in HANDLER_REASONS if a != b ]) -def test_with_handlers_irrelevant_to_the_purpose( +async def test_with_handlers_irrelevant_to_the_purpose( storage, handler, body, expected_extras, stored_reason, processed_reason): body['status']['kopf']['progress']['some-id']['purpose'] = stored_reason.value state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) @@ -278,7 +278,7 @@ def test_with_handlers_irrelevant_to_the_purpose( (StateCounters(1, 0, 0), True, [], {'status': {'kopf': {'progress': {'some-id': {'success': True}}}}}), ]) @pytest.mark.parametrize('reason', HANDLER_REASONS) -def test_with_handlers_relevant_to_the_purpose( +async def test_with_handlers_relevant_to_the_purpose( storage, handler, body, expected_counts, expected_done, expected_delays, reason): body['status']['kopf']['progress']['some-id']['purpose'] = reason.value state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) @@ -297,7 +297,7 @@ def test_with_handlers_relevant_to_the_purpose( ]) @pytest.mark.parametrize('reason', HANDLER_REASONS) @freezegun.freeze_time(TS0) -def test_with_handlers_relevant_to_the_purpose_and_delayed( +async def test_with_handlers_relevant_to_the_purpose_and_delayed( storage, handler, body, expected_counts, expected_done, expected_delays, reason): body['status']['kopf']['progress']['some-id']['delayed'] = TS1_ISO body['status']['kopf']['progress']['some-id']['purpose'] = reason.value @@ -312,7 +312,7 @@ def test_with_handlers_relevant_to_the_purpose_and_delayed( @pytest.mark.parametrize('reason', [Reason.CREATE, Reason.UPDATE, Reason.RESUME]) @freezegun.freeze_time(TS0) -def test_issue_601_deletion_supersedes_other_processing(storage, reason): +async def test_issue_601_deletion_supersedes_other_processing(storage, reason): body = {'status': {'kopf': {'progress': { 'fn1': {'purpose': reason.value, 'failure': True}, @@ -350,7 +350,7 @@ def test_issue_601_deletion_supersedes_other_processing(storage, reason): @freezegun.freeze_time(TS0) -def test_started_from_scratch(storage, handler): +async def test_started_from_scratch(storage, handler): patch = Patch() state = State.from_scratch() state = state.with_handlers([handler]) @@ -366,7 +366,7 @@ def test_started_from_scratch(storage, handler): (TSA_ISO, {'status': {'kopf': {'progress': {'some-id': {'started': TSA_ISO}}}}}), ]) @freezegun.freeze_time(TS0) -def test_started_from_storage(storage, handler, body, expected): +async def test_started_from_storage(storage, handler, body, expected): patch = Patch() state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state.store(body=Body({}), patch=patch, storage=storage) @@ -380,7 +380,7 @@ def test_started_from_storage(storage, handler, body, expected): (TSB_ISO, {'status': {'kopf': {'progress': {'some-id': {'started': TSB_ISO}}}}}), (TSA_ISO, {'status': {'kopf': {'progress': {'some-id': {'started': TSA_ISO}}}}}), ]) -def test_started_from_storage_is_preferred_over_from_scratch(storage, handler, body, expected): +async def test_started_from_storage_is_preferred_over_from_scratch(storage, handler, body, expected): with freezegun.freeze_time(TS0): state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) with freezegun.freeze_time(TS1): @@ -402,7 +402,7 @@ def test_started_from_storage_is_preferred_over_from_scratch(storage, handler, b (TS0 - TSA, {'status': {'kopf': {'progress': {'some-id': {'started': TSA_ISO}}}}}), ]) @freezegun.freeze_time(TS0) -def test_runtime(storage, handler, expected, body): +async def test_runtime(storage, handler, expected, body): state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) result = state[handler.id].runtime @@ -422,7 +422,7 @@ def test_runtime(storage, handler, expected, body): (True , {'status': {'kopf': {'progress': {'some-id': {'success': True}}}}}), (True , {'status': {'kopf': {'progress': {'some-id': {'failure': True}}}}}), ]) -def test_finished_flag(storage, handler, expected, body): +async def test_finished_flag(storage, handler, expected, body): state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) result = state[handler.id].finished @@ -457,7 +457,7 @@ def test_finished_flag(storage, handler, expected, body): (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TSA_ISO, 'failure': None}}}}}), ]) @freezegun.freeze_time(TS0) -def test_sleeping_flag(storage, handler, expected, body): +async def test_sleeping_flag(storage, handler, expected, body): state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) result = state[handler.id].sleeping @@ -492,7 +492,7 @@ def test_sleeping_flag(storage, handler, expected, body): (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TSA_ISO, 'failure': None}}}}}), ]) @freezegun.freeze_time(TS0) -def test_awakened_flag(storage, handler, expected, body): +async def test_awakened_flag(storage, handler, expected, body): state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) result = state[handler.id].awakened @@ -508,7 +508,7 @@ def test_awakened_flag(storage, handler, expected, body): (None, {'status': {'kopf': {'progress': {'some-id': {'delayed': None}}}}}), (TS0, {'status': {'kopf': {'progress': {'some-id': {'delayed': TS0_ISO}}}}}), ]) -def test_awakening_time(storage, handler, expected, body): +async def test_awakening_time(storage, handler, expected, body): state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) result = state[handler.id].delayed @@ -523,7 +523,7 @@ def test_awakening_time(storage, handler, expected, body): (0, {'status': {'kopf': {'progress': {'some-id': {'retries': None}}}}}), (6, {'status': {'kopf': {'progress': {'some-id': {'retries': 6}}}}}), ]) -def test_get_retry_count(storage, handler, expected, body): +async def test_get_retry_count(storage, handler, expected, body): state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) result = state[handler.id].retries @@ -536,7 +536,7 @@ def test_get_retry_count(storage, handler, expected, body): ({}, 1, TS1_ISO), ]) @freezegun.freeze_time(TS0) -def test_set_awake_time(storage, handler, expected, body, delay): +async def test_set_awake_time(storage, handler, expected, body, delay): patch = Patch() state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) @@ -559,7 +559,7 @@ def test_set_awake_time(storage, handler, expected, body, delay): (6, TS1_ISO, 1, {'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}}), ]) @freezegun.freeze_time(TS0) -def test_set_retry_time(storage, handler, expected_retries, expected_delayed, body, delay): +async def test_set_retry_time(storage, handler, expected_retries, expected_delayed, body, delay): patch = Patch() state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) @@ -574,7 +574,7 @@ def test_set_retry_time(storage, handler, expected_retries, expected_delayed, bo # -def test_subrefs_added_to_empty_state(storage, handler): +async def test_subrefs_added_to_empty_state(storage, handler): body = {} patch = Patch() outcome_subrefs = ['sub2/b', 'sub2/a', 'sub2', 'sub1', 'sub3'] @@ -587,7 +587,7 @@ def test_subrefs_added_to_empty_state(storage, handler): assert patch['status']['kopf']['progress']['some-id']['subrefs'] == expected_subrefs -def test_subrefs_added_to_preexisting_subrefs(storage, handler): +async def test_subrefs_added_to_preexisting_subrefs(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {'subrefs': ['sub9/2', 'sub9/1']}}}}} patch = Patch() outcome_subrefs = ['sub2/b', 'sub2/a', 'sub2', 'sub1', 'sub3'] @@ -600,7 +600,7 @@ def test_subrefs_added_to_preexisting_subrefs(storage, handler): assert patch['status']['kopf']['progress']['some-id']['subrefs'] == expected_subrefs -def test_subrefs_ignored_when_not_specified(storage, handler): +async def test_subrefs_ignored_when_not_specified(storage, handler): body = {} patch = Patch() outcome = Outcome(final=True, subrefs=[]) @@ -621,7 +621,7 @@ def test_subrefs_ignored_when_not_specified(storage, handler): (6, TS0_ISO, {'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}}), ]) @freezegun.freeze_time(TS0) -def test_store_failure(storage, handler, expected_retries, expected_stopped, body): +async def test_store_failure(storage, handler, expected_retries, expected_stopped, body): error = Exception('some-error') patch = Patch() state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) @@ -640,7 +640,7 @@ def test_store_failure(storage, handler, expected_retries, expected_stopped, bod (6, TS0_ISO, {'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}}), ]) @freezegun.freeze_time(TS0) -def test_store_success(storage, handler, expected_retries, expected_stopped, body): +async def test_store_success(storage, handler, expected_retries, expected_stopped, body): patch = Patch() state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) state = state.with_handlers([handler]) @@ -658,7 +658,7 @@ def test_store_success(storage, handler, expected_retries, expected_stopped, bod ('string', {'status': {'some-id': 'string'}}), ({'field': 'value'}, {'status': {'some-id': {'field': 'value'}}}), ]) -def test_store_result(handler, expected_patch, result): +async def test_store_result(handler, expected_patch, result): patch = Patch() outcomes = {handler.id: Outcome(final=True, result=result)} deliver_results(outcomes=outcomes, patch=patch) @@ -670,7 +670,7 @@ def test_store_result(handler, expected_patch, result): # -def test_purge_progress_when_exists_in_body(storage, handler): +async def test_purge_progress_when_exists_in_body(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}} patch = Patch() state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) @@ -678,7 +678,7 @@ def test_purge_progress_when_exists_in_body(storage, handler): assert patch == {'status': {'kopf': {'progress': {'some-id': None}}}} -def test_purge_progress_when_already_empty_in_body_and_patch(storage, handler): +async def test_purge_progress_when_already_empty_in_body_and_patch(storage, handler): body = {} patch = Patch() state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) @@ -686,7 +686,7 @@ def test_purge_progress_when_already_empty_in_body_and_patch(storage, handler): assert not patch -def test_purge_progress_when_already_empty_in_body_but_not_in_patch(storage, handler): +async def test_purge_progress_when_already_empty_in_body_but_not_in_patch(storage, handler): body = {} patch = Patch({'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}}) state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) @@ -694,7 +694,7 @@ def test_purge_progress_when_already_empty_in_body_but_not_in_patch(storage, han assert not patch -def test_purge_progress_when_known_at_restoration_only(storage, handler): +async def test_purge_progress_when_known_at_restoration_only(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}} patch = Patch() state = State.from_storage(body=Body(body), handlers=[handler], storage=storage) @@ -702,7 +702,7 @@ def test_purge_progress_when_known_at_restoration_only(storage, handler): assert patch == {'status': {'kopf': {'progress': {'some-id': None}}}} -def test_purge_progress_when_known_at_purge_only(storage, handler): +async def test_purge_progress_when_known_at_purge_only(storage, handler): body = {'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}} patch = Patch() state = State.from_storage(body=Body(body), handlers=[], storage=storage) @@ -710,7 +710,7 @@ def test_purge_progress_when_known_at_purge_only(storage, handler): assert patch == {'status': {'kopf': {'progress': {'some-id': None}}}} -def test_purge_progress_cascades_to_subrefs(storage, handler): +async def test_purge_progress_cascades_to_subrefs(storage, handler): body = {'status': {'kopf': {'progress': { 'some-id': {'subrefs': ['sub1', 'sub2', 'sub3']}, 'sub1': {}, @@ -728,7 +728,7 @@ def test_purge_progress_cascades_to_subrefs(storage, handler): }}}} -def test_original_body_is_not_modified_by_storing(storage, handler): +async def test_original_body_is_not_modified_by_storing(storage, handler): body = Body({}) patch = Patch() state = State.from_storage(body=body, handlers=[handler], storage=storage) diff --git a/tests/timing/test_throttling.py b/tests/timing/test_throttling.py index e6dc85378..d8a2a3588 100644 --- a/tests/timing/test_throttling.py +++ b/tests/timing/test_throttling.py @@ -8,8 +8,8 @@ @pytest.fixture(autouse=True) -def clock(mocker): - return mocker.patch('time.monotonic', return_value=0) +async def clock(mocker): + return mocker.patch.object(asyncio.get_running_loop(), 'time', return_value=0) @pytest.fixture(autouse=True)