Skip to content

Commit 3765c61

Browse files
committed
Update ledger *after* pps updates from new trades
Addressing same issue as in #350 where we need to compute position updates using the *first read* from the ledger **before** we update it to make sure `Position.lifo_update()` gets called and **not skipped** because new trades were read as clears entries but haven't actually been included in update calcs yet.. aka we call `Position.lifo_update()`. Main change here is to convert `update_ledger()` into a context mngr so that the ledger write is committed after pps updates using `pp.update_pps_conf()`.. This is basically a hotfix to #346 as well.
1 parent cb7a9b9 commit 3765c61

1 file changed

Lines changed: 27 additions & 21 deletions

File tree

piker/brokers/kraken/broker.py

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
Order api and machinery
1919
2020
'''
21-
from contextlib import asynccontextmanager as acm
21+
from contextlib import (
22+
asynccontextmanager as acm,
23+
contextmanager as cm,
24+
)
2225
from functools import partial
2326
from itertools import chain, count
2427
from pprint import pformat
@@ -260,13 +263,13 @@ async def trades_dialogue(
260263
log.info(
261264
f'Loaded {len(trades)} trades from account `{acc_name}`'
262265
)
263-
trans = await update_ledger(acctid, trades)
264-
active, closed = pp.update_pps_conf(
265-
'kraken',
266-
acctid,
267-
trade_records=trans,
268-
ledger_reload={}.fromkeys(t.bsuid for t in trans),
269-
)
266+
with open_ledger(acctid, trades) as trans:
267+
active, closed = pp.update_pps_conf(
268+
'kraken',
269+
acctid,
270+
trade_records=trans,
271+
ledger_reload={}.fromkeys(t.bsuid for t in trans),
272+
)
270273

271274
position_msgs: list[dict] = []
272275
pps: dict[int, pp.Position]
@@ -426,14 +429,14 @@ async def handle_order_updates(
426429
await ems_stream.send(filled_msg.dict())
427430

428431
# update ledger and position tracking
429-
trans = await update_ledger(acctid, trades)
430-
active, closed = pp.update_pps_conf(
431-
'kraken',
432-
acctid,
433-
trade_records=trans,
434-
ledger_reload={}.fromkeys(
435-
t.bsuid for t in trans),
436-
)
432+
with open_ledger(acctid, trades) as trans:
433+
active, closed = pp.update_pps_conf(
434+
'kraken',
435+
acctid,
436+
trade_records=trans,
437+
ledger_reload={}.fromkeys(
438+
t.bsuid for t in trans),
439+
)
437440

438441
# emit any new pp msgs to ems
439442
for pos in filter(
@@ -743,7 +746,8 @@ def norm_trade_records(
743746
return records
744747

745748

746-
async def update_ledger(
749+
@cm
750+
def open_ledger(
747751
acctid: str,
748752
trade_entries: list[dict[str, Any]],
749753

@@ -756,8 +760,10 @@ async def update_ledger(
756760
'kraken',
757761
acctid,
758762
) as ledger:
759-
ledger.update(trade_entries)
760763

761-
# normalize to transaction form
762-
records = norm_trade_records(trade_entries)
763-
return records
764+
# normalize to transaction form
765+
records = norm_trade_records(trade_entries)
766+
yield records
767+
768+
# update on exit
769+
ledger.update(trade_entries)

0 commit comments

Comments
 (0)