Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions kopf/_cogs/aiokits/aioenums.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import enum
import threading
import time
from collections.abc import Awaitable, Generator
from typing import Generic, TypeVar

Expand Down Expand Up @@ -52,7 +51,7 @@ def is_set(self, reason: FlagReasonT | None = None) -> bool:

def set(self, reason: FlagReasonT | None = None) -> None:
reason = reason if reason is not None else self.reason # to keep existing values
self.when = self.when if self.when is not None else time.monotonic()
self.when = self.when if self.when is not None else asyncio.get_running_loop().time()
self.reason = reason if self.reason is None or reason is None else self.reason | reason
self.sync_event.set()
self.async_event.set() # it is thread-safe: always called in operator's event loop.
Expand Down
157 changes: 121 additions & 36 deletions kopf/_core/actions/progression.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,69 @@
and the overall handling routine should persist the handler status somewhere.

The states are persisted in a state storage: see `kopf._cogs.configs.progress`.
"""

MOCKED LOOP TIME:

**For testability,** we use ``basetime + timedelta(seconds=loop.time())``
to calculate the "now" moment instead of ``datetime.utcnow()``.

The "basetime" is an imaginary UTC time when the loop clock was zero (``0.0``)
and is calculated as ``datetime.utcnow() - timedelta(seconds=loop.time())``
(assuming these two calls are almost instant and the precision loss is low).

In normal run mode, the "basetime" remains constant for the entire life time
of an event loop, since both loop time and wall-clock time move forward with
the same speed: the calculation of "basetime" always produces the same result.

In test mode, the loop time is mocked and moves as events (e.g. sleeps) happen:
it can move (much) faster than the wall-clock time, e.g. 100s of loop seconds
in 1/100th of a wall-clock second; or it can freeze and not move at all.

PROBLEMATIC INACCURACY:

Because of a highly unprecise and everchanging component in the formula
of the "basetime" — the non-mockable UTC clock — the "basetime" calculation
can give different results at different times even if executed fast enough.

To reduce the inaccuracy introduced by sequential UTC time measurements,
we calculate the "basetime" once per every global state object created
and push it down to owned state objects of the individual handlers
in this halding cycle of this resource object in this unit-test.

That gives us sufficient accuracy while remaining simple enough, assuming that
there are no multiple concurrent global state objects per single unit-test.
_(An alternative would be to calculate the "basetime" on event loop creation
or to cache it per event loop in a global WeakDict, but that is an overkill.)_

SUFFICIENT ACCURACY:

With this approach and ``looptime``__, we can detach from the wall-clock time
in tests and simulate the time's rapid movement into the future by "recovering"
the "now" moment as ``basetime + timedelta(seconds=loop.time())`` (see above) —
without wall-clock delays or hitting the issues with code execution overhead.

Note that there is no UTC clock involved now, only the controled loop clock,
so multiple sequential calculation will lead to predictable abd precise results,
especially when the loop clock is frozen (i.e. constant for a short duration).

__ https://github.com/nolar/looptime

USER PERSPECTIVE:

This time math is never exposed to users and never persisted in storages.
It is used only internally to decouple the operator routines from the system
clock and strictly couple it to the time of the loop.

IMPLEMENTATION DETAILS:

Q: Why do we store UTC time in the fields instead of the floats with loop time?
A: If we store floats in the fields, we need to do the math on every
fetching/storing operation, which introduces minor divergence in supposedly
constant data as stored in the external storages. Instead, we only calculate
the "now" moment. As a result, the precision loss is seen only at runtime checks
and is indistinguishanle from the loop clock sensitivity.
"""
import asyncio
import collections.abc
import copy
import dataclasses
Expand Down Expand Up @@ -40,6 +101,7 @@
"""

active: bool # whether it is used in done/delays [T] or only in counters/purges [F].
basetime: datetime.datetime # a moment when the loop time was zero
started: datetime.datetime
stopped: datetime.datetime | None = None # None means it is still running (e.g. delayed).
delayed: datetime.datetime | None = None # None means it is finished (succeeded/failed).
Expand All @@ -57,33 +119,42 @@

@property
def sleeping(self) -> bool:
ts = self.delayed
now = datetime.datetime.now(tz=datetime.timezone.utc)
return not self.finished and ts is not None and ts > now
now = self.basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time())
return not self.finished and self.delayed is not None and self.delayed > now

@property
def awakened(self) -> bool:
return bool(not self.finished and not self.sleeping)

@property
def runtime(self) -> datetime.timedelta:
return datetime.datetime.now(tz=datetime.timezone.utc) - self.started
now = self.basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time())
return now - self.started

@classmethod
def from_scratch(cls, *, purpose: str | None = None) -> "HandlerState":
return cls(
active=True,
started=datetime.datetime.now(datetime.timezone.utc),
purpose=purpose,
)
def from_scratch(
cls,
*,
basetime: datetime.datetime,
purpose: str | None = None,
) -> "HandlerState":
now = basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time())
return cls(active=True, basetime=basetime, started=now, purpose=purpose)

@classmethod
def from_storage(cls, __d: progress.ProgressRecord) -> "HandlerState":
def from_storage(
cls,
__d: progress.ProgressRecord,
*,
basetime: datetime.datetime,
) -> "HandlerState":
now = basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time())
return cls(
active=False,
started=_parse_iso8601(__d.get('started')) or datetime.datetime.now(datetime.timezone.utc),
stopped=_parse_iso8601(__d.get('stopped')),
delayed=_parse_iso8601(__d.get('delayed')),
basetime=basetime,
started=parse_iso8601(__d.get('started')) or now,
stopped=parse_iso8601(__d.get('stopped')),
delayed=parse_iso8601(__d.get('delayed')),
purpose=__d.get('purpose') if __d.get('purpose') else None,
retries=__d.get('retries') or 0,
success=__d.get('success') or False,
Expand All @@ -95,9 +166,9 @@

def for_storage(self) -> progress.ProgressRecord:
return progress.ProgressRecord(
started=None if self.started is None else _format_iso8601(self.started),
stopped=None if self.stopped is None else _format_iso8601(self.stopped),
delayed=None if self.delayed is None else _format_iso8601(self.delayed),
started=None if self.started is None else format_iso8601(self.started),
stopped=None if self.stopped is None else format_iso8601(self.stopped),
delayed=None if self.delayed is None else format_iso8601(self.delayed),
purpose=None if self.purpose is None else str(self.purpose),
retries=None if self.retries is None else int(self.retries),
success=None if self.success is None else bool(self.success),
Expand All @@ -123,10 +194,11 @@
self,
outcome: execution.Outcome,
) -> "HandlerState":
now = datetime.datetime.now(datetime.timezone.utc)
now = self.basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time())
cls = type(self)
return cls(
active=self.active,
basetime=self.basetime,
purpose=self.purpose,
started=self.started,
stopped=self.stopped if self.stopped is not None else now if outcome.final else None,
Expand Down Expand Up @@ -159,19 +231,25 @@
"""
_states: Mapping[ids.HandlerId, HandlerState]

# Eliminate even the smallest microsecond-scale deviations by using the shared base time.
# The deviations can come from UTC wall-clock time slowly moving during the run (CPU overhead).
basetime: datetime.datetime

def __init__(
self,
__src: Mapping[ids.HandlerId, HandlerState],
*,
basetime: datetime.datetime,
purpose: str | None = None,
):
super().__init__()
self._states = dict(__src)
self.purpose = purpose
self.basetime = basetime

@classmethod
def from_scratch(cls) -> "State":
return cls({})
return cls({}, basetime=_get_basetime())

@classmethod
def from_storage(
Expand All @@ -181,13 +259,14 @@
storage: progress.ProgressStorage,
handlers: Iterable[execution.Handler],
) -> "State":
basetime = _get_basetime()
handler_ids = {handler.id for handler in handlers}
handler_states: dict[ids.HandlerId, HandlerState] = {}
for handler_id in handler_ids:
content = storage.fetch(key=handler_id, body=body)
if content is not None:
handler_states[handler_id] = HandlerState.from_storage(content)
return cls(handler_states)
handler_states[handler_id] = HandlerState.from_storage(content, basetime=basetime)
return cls(handler_states, basetime=basetime)

def with_purpose(
self,
Expand All @@ -198,7 +277,7 @@
for handler in handlers:
handler_states[handler.id] = handler_states[handler.id].with_purpose(purpose)
cls = type(self)
return cls(handler_states, purpose=purpose)
return cls(handler_states, basetime=self.basetime, purpose=purpose)

def with_handlers(
self,
Expand All @@ -207,11 +286,12 @@
handler_states: dict[ids.HandlerId, HandlerState] = dict(self)
for handler in handlers:
if handler.id not in handler_states:
handler_states[handler.id] = HandlerState.from_scratch(purpose=self.purpose)
handler_states[handler.id] = HandlerState.from_scratch(
basetime=self.basetime, purpose=self.purpose)
else:
handler_states[handler.id] = handler_states[handler.id].as_active()
cls = type(self)
return cls(handler_states, purpose=self.purpose)
return cls(handler_states, basetime=self.basetime, purpose=self.purpose)

def with_outcomes(
self,
Expand All @@ -226,15 +306,15 @@
handler_id: (handler_state if handler_id not in outcomes else
handler_state.with_outcome(outcomes[handler_id]))
for handler_id, handler_state in self._states.items()
}, purpose=self.purpose)
}, basetime=self.basetime, purpose=self.purpose)

def without_successes(self) -> "State":
cls = type(self)
return cls({
handler_id: handler_state
for handler_id, handler_state in self._states.items()
if not handler_state.success # i.e. failures & in-progress/retrying
})
}, basetime=self.basetime)

def store(
self,
Expand Down Expand Up @@ -332,9 +412,9 @@
processing routine, based on all delays of different origin:
e.g. postponed daemons, stopping daemons, temporarily failed handlers.
"""
now = datetime.datetime.now(datetime.timezone.utc)
now = self.basetime + datetime.timedelta(seconds=asyncio.get_running_loop().time())
return [
max(0, (handler_state.delayed - now).total_seconds()) if handler_state.delayed else 0
max(0.0, (handler_state.delayed - now).total_seconds()) if handler_state.delayed else 0
for handler_state in self._states.values()
if handler_state.active and not handler_state.finished
]
Expand Down Expand Up @@ -374,24 +454,29 @@


@overload
def _format_iso8601(val: None) -> None: ...
def format_iso8601(val: None) -> None: ...


@overload
def _format_iso8601(val: datetime.datetime) -> str: ...
def format_iso8601(val: datetime.datetime) -> str: ...


def _format_iso8601(val: datetime.datetime | None) -> str | None:
def format_iso8601(val: datetime.datetime | None) -> str | None:
return None if val is None else val.isoformat(timespec='microseconds')


@overload
def _parse_iso8601(val: None) -> None: ...
def parse_iso8601(val: None) -> None: ...


@overload
def _parse_iso8601(val: str) -> datetime.datetime: ...
def parse_iso8601(val: str) -> datetime.datetime: ...


def parse_iso8601(val: str | None) -> datetime.datetime | None:
return None if val is None else iso8601.parse_date(val, default_timezone=None)


def _parse_iso8601(val: str | None) -> datetime.datetime | None:
return None if val is None else iso8601.parse_date(val) # always TZ-aware
def _get_basetime() -> datetime.datetime:
loop = asyncio.get_running_loop()
return datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta(seconds=loop.time())
8 changes: 4 additions & 4 deletions kopf/_core/actions/throttlers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import contextlib
import dataclasses
import time
from collections.abc import AsyncGenerator, Iterable, Iterator

from kopf._cogs.aiokits import aiotime
Expand All @@ -28,11 +27,12 @@ async def throttled(
"""
A helper to throttle any arbitrary operation.
"""
clock = asyncio.get_running_loop().time

# The 1st sleep: if throttling is already active, but was interrupted by a queue replenishment.
# It is needed to properly process the latest known event after the successful sleep.
if throttler.active_until is not None:
remaining_time = throttler.active_until - time.monotonic()
remaining_time = throttler.active_until - clock()
unslept_time = await aiotime.sleep(remaining_time, wakeup=wakeup)
if unslept_time is None:
logger.info("Throttling is over. Switching back to normal operations.")
Expand Down Expand Up @@ -61,7 +61,7 @@ async def throttled(
delay = next(throttler.source_of_delays, throttler.last_used_delay)
if delay is not None:
throttler.last_used_delay = delay
throttler.active_until = time.monotonic() + delay
throttler.active_until = clock() + delay
logger.exception(f"Throttling for {delay} seconds due to an unexpected error: {e!r}")

else:
Expand All @@ -72,7 +72,7 @@ async def throttled(
# The 2nd sleep: if throttling has been just activated (i.e. there was a fresh error).
# It is needed to have better logging/sleeping without workers exiting for "no events".
if throttler.active_until is not None and should_run:
remaining_time = throttler.active_until - time.monotonic()
remaining_time = throttler.active_until - clock()
unslept_time = await aiotime.sleep(remaining_time, wakeup=wakeup)
if unslept_time is None:
throttler.active_until = None
Expand Down
Loading
Loading