Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions piker/brokers/ib/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ async def update_ledger_from_api_trades(
trade_entries: list[dict[str, Any]],
client: Union[Client, MethodProxy],

) -> dict[str, pp.Transaction]:
) -> tuple[
dict[str, pp.Transaction],
dict[str, dict],
]:

conf = get_config()

Expand Down Expand Up @@ -326,14 +329,12 @@ async def update_ledger_from_api_trades(

# write recent session's trades to the user's (local) ledger file.
records: dict[str, pp.Transactions] = {}
for acctid, trades_by_id in entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)

for acctid, trades_by_id in entries.items():
# normalize to transaction form
records[acctid] = norm_trade_records(trades_by_id)

return records
return records, entries


async def update_and_audit_msgs(
Expand Down Expand Up @@ -518,10 +519,14 @@ async def open_stream(
new_trades = {}
for account, proxy in proxies.items():
trades = await proxy.trades()
new_trades.update(await update_ledger_from_api_trades(
(
records_by_acct,
ledger_entries,
) = await update_ledger_from_api_trades(
trades,
proxy,
))
)
new_trades.update(records_by_acct)

for acctid, trans in new_trades.items():
for t in trans:
Expand Down Expand Up @@ -573,6 +578,16 @@ async def open_stream(
tuple(name for name in accounts_def if name in accounts),
))

# TODO: maybe just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?

# write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)

async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
Expand Down Expand Up @@ -609,10 +624,11 @@ async def emit_pp_update(
proxy = proxies[acctid]

acctname = acctid.strip('ib.')
records = (await update_ledger_from_api_trades(
records_by_acct, ledger_entries = await update_ledger_from_api_trades(
[trade_entry],
proxy,
))[acctname]
)
records = records_by_acct[acctname]
r = records[0]

# update and load all positions from `pps.toml`, cross check with
Expand All @@ -627,6 +643,12 @@ async def emit_pp_update(
ledger_reload={r.bsuid: r.fqsn},
)

# NOTE: write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)

for pos in filter(
bool,
[active.get(r.bsuid), closed.get(r.bsuid)]
Expand Down Expand Up @@ -760,7 +782,7 @@ async def deliver_trade_events(
{
'contract': asdict(fill.contract),
'execution': asdict(fill.execution),
'commissionReport': asdict(fill.commissionReport),
# 'commissionReport': asdict(fill.commissionReport),
# supposedly server fill time?
'broker_time': execu.time,
'name': 'ib',
Expand Down Expand Up @@ -813,14 +835,22 @@ async def deliver_trade_events(
trade_entry = ids2fills.setdefault(execid, {})
fill_already_rx = bool(trade_entry)

# no fill msg has arrived yet so just fill out the
# cost report for now and when the fill arrives a pp
# msg can be emitted.
trade_entry.update(
{'commissionReport': asdict(cr)}
)
# only fire a pp msg update if,
# - we haven't already
# - the fill event has already arrived
# but it didn't yet have a commision report
# which we fill in now.
if (
fill_already_rx
and 'commissionReport' not in trade_entry
):
# no fill msg has arrived yet so just fill out the
# cost report for now and when the fill arrives a pp
# msg can be emitted.
trade_entry.update(
{'commissionReport': asdict(cr)}
)

if fill_already_rx:
await emit_pp_update(
ems_stream,
trade_entry,
Expand Down