Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
220981e
Add 1m ohlc sample rate support to `Client.bars()`; frame query is 1 day
goodboy Sep 15, 2022
2a866dd
Make history routines `timeframe` aware
goodboy Sep 15, 2022
bf7d5e9
Make `marketstore` storage api timeframe aware
goodboy Sep 15, 2022
fce7055
Make `binance` history api accept a timeframe
goodboy Sep 15, 2022
e7ec01b
Pass in default history time of 1 min
goodboy Sep 15, 2022
6b34c9e
Temporarily disable error on pos size mismatch
goodboy Sep 17, 2022
72dfeb2
Pass back interal cancel scope from data reset task
goodboy Sep 17, 2022
25b90af
Add `timeframe` input to `kraken` history api
goodboy Sep 17, 2022
7396624
Rework history frame request concurrency
goodboy Sep 28, 2022
61ca5f7
Drop `trimeter`-ized concurrent history querying
goodboy Sep 28, 2022
54567d3
More correct no-data output handling
goodboy Sep 28, 2022
811d21e
Explicit fast chart naming, auto-yrange the fast chart on increment
goodboy Sep 28, 2022
ede67ed
Return history-frame duration from `.bars()`
goodboy Sep 29, 2022
23d0353
Drop duplicate frame request
goodboy Sep 29, 2022
90a395a
Support no-disconnect on `open_aio_clients()` exit
goodboy Sep 29, 2022
daebb78
Re-request quote feed on data reset events
goodboy Sep 29, 2022
a11f20f
Fix `piker services`; `tractor.run()` is done..
goodboy Sep 29, 2022
55dc27a
Subtract duration instead of passing to `.subtract()` (facepalm)
goodboy Sep 30, 2022
27bd3c0
Comment format tweak
goodboy Oct 3, 2022
c7f57b9
Add back adhoc symbol lookup support, some exchs info is off
goodboy Oct 16, 2022
330d162
Add data-reset-task global state var
goodboy Oct 19, 2022
956c7d3
Add concurrent multi-time-frame history loading
goodboy Oct 21, 2022
143e86a
Handle super annoying mkts query bug..
goodboy Oct 26, 2022
b7ef059
Drop remaining timeframe scanning from `.read_ohlcv()`
goodboy Oct 26, 2022
f7ec663
Only get dbus user on sudo-user-present
goodboy Oct 26, 2022
0000d9a
Handle backends with no 1s OHLC history
goodboy Oct 26, 2022
5b63585
Pack multi-chart region linking into helper
goodboy Oct 26, 2022
4ca7817
Use feed-shm offsets in fill-arrow indexing arithmetic
goodboy Oct 26, 2022
dc1edee
Do tsdb backloading to shm concurrently
goodboy Oct 26, 2022
2f7b272
Make `ib` client's `.get_head_time()` (only) expect an fqsn
goodboy Oct 26, 2022
553d055
Raise `DataUnavailable` when a contract's 'earliest time' is hit
goodboy Oct 26, 2022
a1a24da
Make `binance` reject 1s OHLC history requests
goodboy Oct 26, 2022
286228c
Only wait on backfill if provider supports timeframe
goodboy Oct 26, 2022
2b231ba
Lul, fix timeframe key when writing history
goodboy Oct 26, 2022
610fb5f
Drop `NoData` handler, just let it bubble
goodboy Oct 26, 2022
d5b357b
Raise `DataUnavailable` on >= 6 no data error events
goodboy Oct 27, 2022
fb4f173
Drop key error again
goodboy Oct 27, 2022
df16726
Just wipe wrong timeframe filled tsdb colseries for now
goodboy Oct 28, 2022
ceca0d9
Order ledger entries by processed datetime
goodboy Oct 7, 2022
1fadf58
Add todo for order duration setting `goodTillDuration`
goodboy Oct 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,8 @@ async def open_piker_runtime(

) -> Optional[tractor._portal.Portal]:
'''
Start a piker actor who's runtime will automatically
sync with existing piker actors in local network
based on configuration.
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.

'''
global _services
Expand Down
9 changes: 8 additions & 1 deletion piker/brokers/binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
import wsproto

from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ._util import (
resproc,
SymbolNotFound,
DataUnavailable,
)
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data.types import Struct
Expand Down Expand Up @@ -388,6 +392,7 @@ async def open_history_client(
async with open_cached_client('binance') as client:

async def get_ohlc(
timeframe: float,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,

Expand All @@ -396,6 +401,8 @@ async def get_ohlc(
datetime, # start
datetime, # end
]:
if timeframe != 60:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is how we indicate that a brokerd can't deliver 1s OHLC.

raise DataUnavailable('Only 1m bars are supported')

array = await client.bars(
symbol,
Expand Down
151 changes: 95 additions & 56 deletions piker/brokers/ib/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import trio
import tractor
from tractor import to_asyncio
import pendulum
import ib_insync as ibis
from ib_insync.contract import (
Contract,
Expand All @@ -52,6 +53,7 @@
from ib_insync.order import Order
from ib_insync.ticker import Ticker
from ib_insync.objects import (
BarDataList,
Position,
Fill,
Execution,
Expand All @@ -78,26 +80,11 @@
'h': ' hours',
}

_time_frames = {
'1s': '1 Sec',
'5s': '5 Sec',
'30s': '30 Sec',
'1m': 'OneMinute',
'2m': 'TwoMinutes',
'3m': 'ThreeMinutes',
'4m': 'FourMinutes',
'5m': 'FiveMinutes',
'10m': 'TenMinutes',
'15m': 'FifteenMinutes',
'20m': 'TwentyMinutes',
'30m': 'HalfHour',
'1h': 'OneHour',
'2h': 'TwoHours',
'4h': 'FourHours',
'D': 'OneDay',
'W': 'OneWeek',
'M': 'OneMonth',
'Y': 'OneYear',
_bar_sizes = {
1: '1 Sec',
60: '1 min',
60*60: '1 hour',
24*60*60: '1 day',
}

_show_wap_in_history: bool = False
Expand Down Expand Up @@ -199,7 +186,8 @@ def __init__(self):
'lb.nymex', # random len lumber

# metals
'xauusd.cmdty', # gold spot
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
'xauusd.cmdty', # london gold spot ^
'gc.nymex',
'mgc.nymex', # micro

Expand Down Expand Up @@ -257,14 +245,12 @@ def __init__(self):
'PSE',
}

# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924

_enters = 0


def bars_to_np(bars: list) -> np.ndarray:
'''
Convert a "bars list thing" (``BarsList`` type from ibis)
Convert a "bars list thing" (``BarDataList`` type from ibis)
into a numpy struct array.

'''
Expand All @@ -284,6 +270,27 @@ def bars_to_np(bars: list) -> np.ndarray:
return nparr


# NOTE: pacing violations exist for higher sample rates:
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations
# Also see note on duration limits being lifted on 1m+ periods,
# but they say "use with discretion":
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd
_samplings: dict[int, tuple[str, str]] = {
1: (
'1 secs',
f'{int(2e3)} S',
pendulum.duration(seconds=2e3),
),
# TODO: benchmark >1 D duration on query to see if
# throughput can be made faster during backfilling.
60: (
'1 min',
'1 D',
pendulum.duration(days=1),
),
}


class Client:
'''
IB wrapped for our broker backend API.
Expand Down Expand Up @@ -338,19 +345,32 @@ async def bars(
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00",
end_dt: Union[datetime, str] = "",

sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
# ohlc sample period in seconds
sample_period_s: int = 1,

# optional "duration of time" equal to the
# length of the returned history frame.
duration: Optional[str] = None,

**kwargs,

) -> list[dict[str, Any]]:
) -> tuple[BarDataList, np.ndarray, pendulum.Duration]:
'''
Retreive OHLCV bars for a fqsn over a range to the present.

'''
# See API docs here:
# https://interactivebrokers.github.io/tws-api/historical_data.html
bars_kwargs = {'whatToShow': 'TRADES'}
bars_kwargs.update(kwargs)
bar_size, duration, dt_duration = _samplings[sample_period_s]

global _enters
# log.info(f'REQUESTING BARS {_enters} @ end={end_dt}')
print(f'REQUESTING BARS {_enters} @ end={end_dt}')
print(
f"REQUESTING {duration}'s worth {bar_size} BARS\n"
f'{_enters} @ end={end_dt}"'
)

if not end_dt:
end_dt = ''
Expand All @@ -360,30 +380,20 @@ async def bars(
contract = (await self.find_contracts(fqsn))[0]
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))

# _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime=end_dt,
formatDate=2,

# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``

# OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs',

# durationStr='{count} S'.format(count=15000 * 5),
# durationStr='{count} D'.format(count=1),
# barSizeSetting='5 secs',

durationStr='{count} S'.format(count=period_count),
# barSizeSetting='5 secs',
barSizeSetting='1 secs',
barSizeSetting=bar_size,

# barSizeSetting='1 min',
# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
durationStr=duration,

# always use extended hours
useRTH=False,
Expand All @@ -394,11 +404,21 @@ async def bars(
# whatToShow='TRADES',
)
if not bars:
# TODO: raise underlying error here
raise ValueError(f"No bars retreived for {fqsn}?")
# NOTE: there's 2 cases here to handle (and this should be
# read alongside the implementation of
# ``.reqHistoricalDataAsync()``):
# - no data is returned for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no
# way to detect a timeout.

nparr = bars_to_np(bars)
return bars, nparr
return bars, nparr, dt_duration

async def con_deats(
self,
Expand Down Expand Up @@ -463,7 +483,7 @@ async def search_symbols(
self,
pattern: str,
# how many contracts to search "up to"
upto: int = 6,
upto: int = 16,
asdicts: bool = True,

) -> dict[str, ContractDetails]:
Expand Down Expand Up @@ -498,6 +518,16 @@ async def search_symbols(

exch = tract.exchange
if exch not in _exch_skip_list:

# try to lookup any contracts from our adhoc set
# since often the exchange/venue is named slightly
# different (eg. BRR.CMECRYPTO` instead of just
# `.CME`).
info = _adhoc_symbol_map.get(sym)
if info:
con_kwargs, bars_kwargs = info
exch = con_kwargs['exchange']

# try get all possible contracts for symbol as per,
# https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut
con = ibis.Future(
Expand Down Expand Up @@ -748,11 +778,14 @@ async def find_contracts(

async def get_head_time(
self,
contract: Contract,
fqsn: str,

) -> datetime:
"""Return the first datetime stamp for ``contract``.
'''
Return the first datetime stamp for ``contract``.

"""
'''
contract = (await self.find_contracts(fqsn))[0]
return await self.ib.reqHeadTimeStampAsync(
contract,
whatToShow='TRADES',
Expand Down Expand Up @@ -822,9 +855,7 @@ async def get_quote(
# async to be consistent for the client proxy, and cuz why not.
def submit_limit(
self,
# ignored since ib doesn't support defining your
# own order id
oid: str,
oid: str, # ignored since doesn't support defining your own
symbol: str,
price: float,
action: str,
Expand All @@ -840,6 +871,9 @@ def submit_limit(
'''
Place an order and return integer request id provided by client.

Relevant docs:
- https://interactivebrokers.github.io/tws-api/order_limitations.html

'''
try:
contract = self._contracts[symbol]
Expand All @@ -865,6 +899,9 @@ def submit_limit(
optOutSmartRouting=True,
routeMarketableToBbo=True,
designatedLocation='SMART',
# TODO: make all orders GTC?
# https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba
# goodTillDate=f"yyyyMMdd-HH:mm:ss",
),
)
except AssertionError: # errrg insync..
Expand Down Expand Up @@ -1066,6 +1103,7 @@ async def load_aio_clients(
# retry a few times to get the client going..
connect_retries: int = 3,
connect_timeout: float = 0.5,
disconnect_on_exit: bool = True,

) -> dict[str, Client]:
'''
Expand Down Expand Up @@ -1207,10 +1245,11 @@ async def load_aio_clients(
finally:
# TODO: for re-scans we'll want to not teardown clients which
# are up and stable right?
for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect()
_client_cache.pop((host, port), None)
if disconnect_on_exit:
for acct, client in _accounts2clients.items():
log.info(f'Disconnecting {acct}@{client}')
client.ib.disconnect()
_client_cache.pop((host, port), None)


async def load_clients_for_trio(
Expand Down
Loading