Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from contextlib import asynccontextmanager as acm
from collections import defaultdict

from pydantic import BaseModel
from msgspec import Struct
import tractor
import trio
from trio_typing import TaskStatus
import tractor

from .log import get_logger, get_console_log
from .brokers import get_brokermod
Expand All @@ -47,16 +47,13 @@
]


class Services(BaseModel):
class Services(Struct):

actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}

class Config:
arbitrary_types_allowed = True

async def start_service_task(
self,
name: str,
Expand Down
18 changes: 11 additions & 7 deletions piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import numpy as np
import tractor
from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto

from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data.types import Struct
from ..data._web_bs import open_autorecon_ws, NoBsWs

log = get_logger(__name__)
Expand Down Expand Up @@ -79,12 +79,14 @@


# https://binance-docs.github.io/apidocs/spot/en/#exchange-information
class Pair(BaseModel):
class Pair(Struct, frozen=True):
symbol: str
status: str

baseAsset: str
baseAssetPrecision: int
cancelReplaceAllowed: bool
allowTrailingStop: bool
quoteAsset: str
quotePrecision: int
quoteAssetPrecision: int
Expand Down Expand Up @@ -287,7 +289,7 @@ async def get_client() -> Client:


# validation type
class AggTrade(BaseModel):
class AggTrade(Struct):
e: str # Event type
E: int # Event time
s: str # Symbol
Expand Down Expand Up @@ -341,7 +343,9 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:

elif msg.get('e') == 'aggTrade':

# validate
# NOTE: this is purely for a definition, ``msgspec.Struct``
# does not runtime-validate until you decode/encode.
# see: https://jcristharif.com/msgspec/structs.html#type-validation
msg = AggTrade(**msg)

# TODO: type out and require this quote format
Expand All @@ -352,8 +356,8 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': msg.p,
'size': msg.q,
'price': float(msg.p),
'size': float(msg.q),
'broker_ts': msg.T,
}],
}
Expand Down Expand Up @@ -448,7 +452,7 @@ async def stream_quotes(
d = cache[sym.upper()]
syminfo = Pair(**d) # validation

si = sym_infos[sym] = syminfo.dict()
si = sym_infos[sym] = syminfo.to_dict()

# XXX: after manually inspecting the response format we
# just directly pick out the info we need
Expand Down
21 changes: 10 additions & 11 deletions piker/brokers/ib/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
).dict())
))
continue

client = _accounts2clients.get(account)
Expand All @@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
).dict())
))
continue

if action in {'buy', 'sell'}:
Expand All @@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason='Order already active?',
).dict())
))

# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
Expand All @@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
account=account,
).dict()
)
)

elif action == 'cancel':
Expand Down Expand Up @@ -559,7 +558,7 @@ async def open_stream(
cids2pps,
validate=True,
)
all_positions.extend(msg.dict() for msg in msgs)
all_positions.extend(msg for msg in msgs)

if not all_positions and cids2pps:
raise RuntimeError(
Expand Down Expand Up @@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0]
break

await ems_stream.send(msg.dict())
await ems_stream.send(msg)


async def deliver_trade_events(
Expand Down Expand Up @@ -743,7 +742,7 @@ async def deliver_trade_events(

broker_details={'name': 'ib'},
)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)

case 'fill':

Expand Down Expand Up @@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'],

)
await ems_stream.send(msg.dict())
await ems_stream.send(msg)

# 2 cases:
# - fill comes first or
Expand Down Expand Up @@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict())
# await ems_stream.send(msg)

case 'event':

Expand All @@ -891,7 +890,7 @@ async def deliver_trade_events(
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
# log.info(f"TWS triggered trade\n{pformat(msg)}")

# msg.reqid = 'tws-' + str(-1 * reqid)

Expand Down
24 changes: 12 additions & 12 deletions piker/brokers/kraken/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
)

import pendulum
from pydantic import BaseModel
import trio
import tractor
import wsproto
Expand All @@ -47,6 +46,7 @@
BrokerdPosition,
BrokerdStatus,
)
from piker.data.types import Struct
from . import log
from .api import (
Client,
Expand All @@ -62,7 +62,7 @@
)


class Trade(BaseModel):
class Trade(Struct):
'''
Trade class that helps parse and validate ownTrades stream

Expand Down Expand Up @@ -110,7 +110,7 @@ async def handle_order_requests(
'https://github.com/pikers/piker/issues/299'
),

).dict())
))
continue

# validate
Expand All @@ -136,7 +136,7 @@ async def handle_order_requests(
symbol=order.symbol,
reason="Failed order submission",
broker_details=resp
).dict()
)
)
else:
# TODO: handle multiple orders (cancels?)
Expand All @@ -161,7 +161,7 @@ async def handle_order_requests(
# account the made the order
account=order.account

).dict()
)
)

elif action == 'cancel':
Expand Down Expand Up @@ -189,7 +189,7 @@ async def handle_order_requests(
symbol=msg.symbol,
reason="Failed order cancel",
broker_details=resp
).dict()
)
)

if not error:
Expand Down Expand Up @@ -217,7 +217,7 @@ async def handle_order_requests(
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
)
)

else: # order cancel success case.
Expand All @@ -230,7 +230,7 @@ async def handle_order_requests(
status='cancelled',
reason='Order cancelled',
broker_details={'name': 'kraken'}
).dict()
)
)
else:
log.error(f'Unknown order command: {request_msg}')
Expand Down Expand Up @@ -330,7 +330,7 @@ async def trades_dialogue(
avg_price=p.be_price,
currency='',
)
position_msgs.append(msg.dict())
position_msgs.append(msg)

await ctx.started(
(position_msgs, [acc_name])
Expand Down Expand Up @@ -408,7 +408,7 @@ async def trades_dialogue(
broker_details={'name': 'kraken'},
broker_time=broker_time
)
await ems_stream.send(fill_msg.dict())
await ems_stream.send(fill_msg)

filled_msg = BrokerdStatus(
reqid=reqid,
Expand All @@ -432,7 +432,7 @@ async def trades_dialogue(
# https://github.com/pikers/piker/issues/296
remaining=0,
)
await ems_stream.send(filled_msg.dict())
await ems_stream.send(filled_msg)

# update ledger and position tracking
trans = await update_ledger(acctid, trades)
Expand Down Expand Up @@ -469,7 +469,7 @@ async def trades_dialogue(
# TODO
# currency=''
)
await ems_stream.send(pp_msg.dict())
await ems_stream.send(pp_msg)

case [
trades_msgs,
Expand Down
6 changes: 3 additions & 3 deletions piker/brokers/kraken/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
from pydantic import BaseModel
from trio_typing import TaskStatus
import tractor
import trio
Expand All @@ -45,6 +44,7 @@
)
from piker.log import get_console_log
from piker.data import ShmArray
from piker.data.types import Struct
from piker.data._web_bs import open_autorecon_ws, NoBsWs
from . import log
from .api import (
Expand All @@ -54,7 +54,7 @@


# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(BaseModel):
class Pair(Struct):
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
aclass_base: str # asset class of base component
Expand Down Expand Up @@ -316,7 +316,7 @@ async def stream_quotes(
sym = sym.upper()

si = Pair(**await client.symbol_info(sym)) # validation
syminfo = si.dict()
syminfo = si.to_dict()
syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
syminfo['asset_type'] = 'crypto'
Expand Down
Loading