Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
146c684
Cache `brokerd` feeds for reuse in clearing loop
goodboy Aug 9, 2021
a0660e5
Start top level cacheing apis module
goodboy Aug 9, 2021
0ce8057
Move feed cacheing to cache mod; put entry retreival into ctx mng
goodboy Aug 9, 2021
68ce5b3
Add lifo cache to new module; drop "utils", bleh
goodboy Aug 9, 2021
66f1d91
Let's abstractify: ->
goodboy Aug 9, 2021
7d5add1
Add an njs cache gist link
goodboy Aug 10, 2021
224dbbc
Drop feed refs
goodboy Aug 10, 2021
a7d3afc
Add a `maybe_open_feed()` which uses new broadcast chans
goodboy Aug 10, 2021
7d0f473
Use `maybe_open_feed()` in ems and fsp daemons
goodboy Aug 10, 2021
2202abc
Add (lack of proper) ring buffer note
goodboy Aug 10, 2021
310d8f4
Add disclaimer to old data mod
goodboy Aug 10, 2021
954dc6a
Fix missing cache hit bool element of return
goodboy Aug 12, 2021
71b50fd
Use broadcast chan for order client and avoid chan repacking
goodboy Aug 12, 2021
0c95160
TO SQUASH cached ctx.
goodboy Aug 13, 2021
c8e3208
Add super basic support for data feed "pausing"
goodboy Aug 13, 2021
2f5abaa
Add njs token bucket gist url
goodboy Aug 13, 2021
1e42f58
Add pause/resume feed api, delegate to msg stream for broadcast api
goodboy Aug 15, 2021
fe0d66e
Drop removed module import
goodboy Aug 16, 2021
2a9d24c
Remove dead OHLC index consumers from subs list on error
goodboy Aug 3, 2021
2f1455d
Lol, don't use `maybe_open_feed()` for now, it's totally borked...
goodboy Aug 18, 2021
ff322ae
Re-impl ctx-mng caching using `trio.Nursery.start()`
goodboy Aug 30, 2021
cae7f48
Revert "Lol, don't use `maybe_open_feed()` for now, it's totally bork…
goodboy Aug 30, 2021
bbcce0c
Facepalm^2: pass through kwargs
goodboy Aug 30, 2021
1184a4d
Cache sample step streams per actor
goodboy Aug 31, 2021
c368234
Use the actor's service nursery instead
goodboy Aug 31, 2021
2df16e1
Re-implement client caching using `maybe_open_ctx`
goodboy Sep 1, 2021
26cb7aa
Drop tractor stream shielding use
goodboy Sep 1, 2021
4527d4a
Allocate an event per context
goodboy Sep 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 0 additions & 66 deletions piker/_async_utils.py

This file was deleted.

195 changes: 195 additions & 0 deletions piker/_cacheables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""
Cacheing apis and toolz.

"""
# further examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8

from collections import OrderedDict
from typing import (
Any,
Hashable,
Optional,
TypeVar,
AsyncContextManager,
)
from contextlib import (
asynccontextmanager,
)

import trio
from trio_typing import TaskStatus
import tractor

from .brokers import get_brokermod
from .log import get_logger


T = TypeVar('T')
log = get_logger(__name__)


def async_lifo_cache(maxsize=128):
"""Async ``cache`` with a LIFO policy.

Implemented my own since no one else seems to have
a standard. I'll wait for the smarter people to come
up with one, but until then...
"""
cache = OrderedDict()

def decorator(fn):

async def wrapper(*args):
key = args
try:
return cache[key]
except KeyError:
if len(cache) >= maxsize:
# discard last added new entry
cache.popitem()

# do it
cache[key] = await fn(*args)
return cache[key]

return wrapper

return decorator


_cache: dict[str, 'Client'] = {} # noqa


class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use async resource.

'''
lock = trio.Lock()
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None

@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,

) -> None:
async with mng as value:

_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))


@asynccontextmanager
async def maybe_open_ctx(

key: Hashable,
mngr: AsyncContextManager[T],

) -> (bool, T):
'''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.

'''

await cache.lock.acquire()

ctx_key = id(mngr)

value = None
try:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
value = cache.values[key]
log.info(f'Reusing cached resource for {key}')
cache.users += 1
cache.lock.release()
yield True, value

except KeyError:
log.info(f'Allocating new feed for {key}')

# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.

# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = tractor.current_actor()._service_n

# TODO: does this need to be a tractor "root nursery"?
ln = cache.resources.get(ctx_key)
assert not ln

ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())

value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1
cache.lock.release()

yield False, value

finally:
cache.users -= 1

if cache.lock.locked():
cache.lock.release()

if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.warning(f'De-allocating resource for {key}')

# terminate mngr nursery
_, no_more_users = cache.resources[ctx_key]
no_more_users.set()


@asynccontextmanager
async def open_cached_client(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not bad right?

brokername: str,
) -> 'Client': # noqa
'''Get a cached broker client from the current actor's local vars.

If one has not been setup do it and cache it.

'''
brokermod = get_brokermod(brokername)
async with maybe_open_ctx(
key=brokername,
mngr=brokermod.get_client(),
) as (cache_hit, client):
yield client
85 changes: 0 additions & 85 deletions piker/brokers/api.py

This file was deleted.

2 changes: 1 addition & 1 deletion piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from pydantic import BaseModel
import wsproto

from .api import open_cached_client
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
Expand Down
2 changes: 1 addition & 1 deletion piker/brokers/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from ..log import get_logger
from . import get_brokermod
from .._daemon import maybe_spawn_brokerd
from .api import open_cached_client
from .._cacheables import open_cached_client


log = get_logger(__name__)
Expand Down
11 changes: 8 additions & 3 deletions piker/brokers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""
Real-time data feed machinery
"""
'''
NB: this is the old original implementation that was used way way back
when the project started with ``kivy``.

This code is left for reference but will likely be merged in
appropriately and removed.

'''
import time
from functools import partial
from dataclasses import dataclass, field
Expand Down
Loading