Skip to content

Commit 2d0f926

Browse files
committed
Update ledger from api immediately, cruft cleaning
1 parent a6aabb9 commit 2d0f926

File tree

1 file changed

+23
-39
lines changed

1 file changed

+23
-39
lines changed

piker/brokers/kraken/broker.py

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
from typing import (
3232
Any,
3333
AsyncIterator,
34-
# Optional,
3534
Union,
3635
)
3736

@@ -46,7 +45,6 @@
4645
Position,
4746
PpTable,
4847
Transaction,
49-
# update_pps_conf,
5048
open_trade_ledger,
5149
open_pps,
5250
)
@@ -391,6 +389,7 @@ async def trades_dialogue(
391389
# most recent 50 trades and assume that by ordering we
392390
# already have those records in the ledger.
393391
tids2trades = await client.get_trades()
392+
ledger_dict.update(tids2trades)
394393
api_trans = norm_trade_records(tids2trades)
395394

396395
# retrieve kraken reported balances
@@ -448,12 +447,13 @@ def has_pp(dst: str) -> Position | bool:
448447
ppmsgs = trades2pps(
449448
table,
450449
acctid,
451-
# new_trans,
452450
)
453451
await ctx.started((ppmsgs, [acc_name]))
454452

455453
# XXX: not fucking clue but putting this finally block
456454
# will suppress errors inside the direct await below!?!
455+
# likely something to do with the exist stack inside
456+
# the nobsws stuff...
457457
# try:
458458

459459
# Get websocket token for authenticated data stream
@@ -494,26 +494,19 @@ def has_pp(dst: str) -> Position | bool:
494494
)
495495

496496
# enter relay loop
497-
# try:
498-
try:
499-
await handle_order_updates(
500-
ws,
501-
stream,
502-
ems_stream,
503-
apiflows,
504-
ids,
505-
reqids2txids,
506-
table,
507-
api_trans,
508-
acctid,
509-
acc_name,
510-
token,
511-
)
512-
# except:
513-
# await tractor.breakpoint()
514-
finally:
515-
# always update ledger on exit
516-
ledger_dict.update(tids2trades)
497+
await handle_order_updates(
498+
ws,
499+
stream,
500+
ems_stream,
501+
apiflows,
502+
ids,
503+
reqids2txids,
504+
table,
505+
api_trans,
506+
acctid,
507+
acc_name,
508+
token,
509+
)
517510

518511

519512
async def handle_order_updates(
@@ -561,9 +554,13 @@ async def handle_order_updates(
561554
f'ownTrades update_{seq}:\n'
562555
f'{pformat(trades_msgs)}'
563556
)
557+
# XXX: a fix / todo
558+
# see the comment in the caller about weird error
559+
# suppression around a commented `try:`
564560
# assert 0
561+
565562
# format as tid -> trade event map
566-
# eg. msg
563+
# eg. received msg format,
567564
# [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047',
568565
# 'fee': '0.24776',
569566
# 'margin': '0.00000',
@@ -579,15 +576,10 @@ async def handle_order_updates(
579576
tid: trade
580577
for entry in trades_msgs
581578
for (tid, trade) in entry.items()
579+
580+
# don't re-process datums we've already seen
582581
if tid not in ledger_trans
583582
}
584-
585-
# if tid in ledger_trans:
586-
# # skip already seen transactions
587-
# log.info(f'Skipping already seen trade {trade}')
588-
# continue
589-
590-
# await tractor.breakpoint()
591583
for tid, trade in trades.items():
592584
txid = trade['ordertxid']
593585

@@ -642,11 +634,6 @@ async def handle_order_updates(
642634
)
643635
await ems_stream.send(filled_msg)
644636

645-
# if not trades:
646-
# # skip pp emissions if we have already
647-
# # processed all trades in this msg.
648-
# continue
649-
650637
new_trans = norm_trade_records(trades)
651638
ppmsgs = trades2pps(
652639
table,
@@ -897,9 +884,6 @@ async def handle_order_updates(
897884
chain = apiflows[reqid]
898885
chain.maps.append(event)
899886

900-
# pretxid = chain['txid']
901-
# print(f'pretxid: {pretxid}')
902-
903887
resps, errored = process_status(
904888
event,
905889
oid,

0 commit comments

Comments
 (0)