Conversation
piker/_cacheables.py
Outdated
| # maybe_open_ctx() below except it uses an async exit statck. | ||
| # ideally wer pick one or the other. | ||
| @asynccontextmanager | ||
| async def open_cached_client( |
There was a problem hiding this comment.
this is old code that i think should be ported to the maybe_open_ctx() below but for broker backend client instances.
There was a problem hiding this comment.
boo yeah. worked like a charm 🏄🏼
piker/_cacheables.py
Outdated
| def get_and_use() -> AsyncIterable[T]: | ||
| # key error must bubble here | ||
| feed = cache.ctxs[key] | ||
| log.info(f'Reusing cached feed for {key}') |
There was a problem hiding this comment.
i guess this should be some representation format instead of "feed" 😂
| if cache_hit: | ||
| # add a new broadcast subscription for the quote stream | ||
| # if this feed is likely already in use | ||
| async with feed.stream.subscribe() as bstream: |
There was a problem hiding this comment.
This is the part that requires goodboy/tractor#229 tokio style broadcasting.
|
Just updated |
|
Probably also keeping 👀 on down the road is the cachetools project. |
|
Added |
| uid = ctx.chan.uid | ||
| fqsn = f'{symbol}.{brokername}' | ||
|
|
||
| async for msg in stream: |
There was a problem hiding this comment.
This implements "feed pausing" - the beauty of 2 way streamzz 🏄🏼♀️
Try out he new broadcast channels from `tractor` for data feeds we already have cached. Any time there's a cache hit we load the cached feed and just slap a broadcast receiver on it for the local consumer task.
Maybe i've finally learned my lesson that exit stacks and per task ctx manager caching is just not trionic.. Use the approach we've taken for the daemon service manager as well: create a process global nursery for each unique ctx manager we wish to cache and simply tear it down when the number of consumers goes to zero. This seems to resolve all prior issues and gets us error-free cached feeds!
…ed..." Think this was fixed by passing through `**kwargs` in `maybe_open_feed()`, the shielding for fsp respawns wasn't being properly passed through.. This reverts commit 2f1455d.
piker/_cacheables.py
Outdated
| ctx_key = id(mngr) | ||
|
|
||
| # TODO: does this need to be a tractor "root nursery"? | ||
| async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n: |
There was a problem hiding this comment.
I'm still slightly unclear how the teardown part of lifetime works here; pretty sure it's going to tear down when the first consumer task is complete instead of when the last arriving consumer does.
In order to get the latter behavior we might need to have an actor global nursery that's brought up with the runtime / the consumer process?
Not sure this absolutely must be addressed right now since usually the creator task stays up as long as the app / daemon which is using the feed.
|
|
||
|
|
||
| @asynccontextmanager | ||
| async def open_sample_step_stream( |
There was a problem hiding this comment.
Allows us to actor-cache the OHLC step event stream per delay (since you likely will want the same event for all local consumers.
| brokername, | ||
| [sym], | ||
| loglevel=loglevel, | ||
| **kwargs, |
There was a problem hiding this comment.
This `kwargs* pass through was critical...
guilledk
left a comment
There was a problem hiding this comment.
Love to see the new _broadcast machinery. Also _cacheables looking sexy!
In order to ensure the lifetime of the feed can in fact be kept open until the last consumer task has completed we need to maintain a lifetime which is hierarchically greater then all consumer tasks. This solution is somewhat hacky but seems to work well: we just use the `tractor` actor's "service nursery" (the one normally used to invoke rpc tasks) to launch the task which will start and keep open the target cached async context manager. To make this more "proper" we may want to offer a "root nursery" in all piker actors that is exposed through some singleton api or even introduce a public api for it into `tractor` directly.
iamzoltan
left a comment
There was a problem hiding this comment.
Looks kosher to me. Digging the improvements!
|
|
||
|
|
||
| @asynccontextmanager | ||
| async def open_cached_client( |
piker/_cacheables.py
Outdated
| ''' | ||
| lock = trio.Lock() | ||
| users: int = 0 | ||
| values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {} |
There was a problem hiding this comment.
woops, stack is tossed now.
0ee755d to
4527d4a
Compare
Initial data feed caching over
piker.data.feed.open_feed()using the newmaybe_open_feed().Adds a new
_cacheables.pywhich contains a bunch of helpers for cache-y things.This relies on goodboy/tractor#229 in order to give multiple actor-local task consumers broadcast access to quote streams.
Putting this up to get eyes on it and see if there's any reason not to start building streaming apis under this paradigm.
Still TODO:
Feed.index_stream()per actor, which makes me also wonder if we should provide newFeedinstances on cache hits?or can we just wrap it and overrideended up using the new._index_streamor is that to mutate-ymaybe_cache_ctx()mngr introduced in this PRbrokerd-always-pushes to a tasks always pull model here?uniform_rate_sendas abroadcast_receiver()potentially? in which case i think we can drop all the timing logic and just let sent quotes queue up and then pull on a fixed period? i may be thinking about this wrong..pause/resumemessage to the endpoint to add remove the subscription dynamicallytaking the-> Expose FSP streams asFeedapi to our fsp subsystem (this will likely get delayed to a new PR)Feeds #216