Skip to content

Commit a4c67ea

Browse files
committed
Re-request quote feed on data reset events
When a network outage or data feed connection is reset often the `ib_insync` task will hang until some kind of (internal?) timeout takes place or, in some (worst) cases it never re-establishes (the event stream) and thus the backend needs to restart or the live feed will never resume.. In order to avoid this issue once and for all this patch implements an additional (extremely simple) task that is started with the real-time feed and simply waits for any market data reset events; when detected restarts the `open_aio_quote_stream()` call in a loop using a surrounding cancel scope. Been meaning to implement this for ages and it's finally working!
1 parent 655f352 commit a4c67ea

File tree

1 file changed

+87
-54
lines changed

1 file changed

+87
-54
lines changed

piker/brokers/ib/feed.py

Lines changed: 87 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,9 @@ async def _setup_quote_stream(
483483

484484
to_trio.send_nowait(None)
485485

486-
async with load_aio_clients() as accts2clients:
486+
async with load_aio_clients(
487+
disconnect_on_exit=False,
488+
) as accts2clients:
487489
caccount_name, client = get_preferred_data_client(accts2clients)
488490
contract = contract or (await client.find_contract(symbol))
489491
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@@ -563,7 +565,8 @@ async def open_aio_quote_stream(
563565
from_aio = _quote_streams.get(symbol)
564566
if from_aio:
565567

566-
# if we already have a cached feed deliver a rx side clone to consumer
568+
# if we already have a cached feed deliver a rx side clone
569+
# to consumer
567570
async with broadcast_receiver(
568571
from_aio,
569572
2**6,
@@ -754,67 +757,97 @@ def mk_init_msgs() -> dict[str, dict]:
754757
await trio.sleep_forever()
755758
return # we never expect feed to come up?
756759

757-
async with open_aio_quote_stream(
758-
symbol=sym,
759-
contract=con,
760-
) as stream:
761-
762-
# ugh, clear ticks since we've consumed them
763-
# (ahem, ib_insync is stateful trash)
764-
first_ticker.ticks = []
760+
cs: Optional[trio.CancelScope] = None
761+
startup: bool = True
762+
while (
763+
startup
764+
or cs.cancel_called
765+
):
766+
with trio.CancelScope() as cs:
767+
async with (
768+
trio.open_nursery() as nurse,
769+
open_aio_quote_stream(
770+
symbol=sym,
771+
contract=con,
772+
) as stream,
773+
):
774+
# ugh, clear ticks since we've consumed them
775+
# (ahem, ib_insync is stateful trash)
776+
first_ticker.ticks = []
765777

766-
task_status.started((init_msgs, first_quote))
778+
# only on first entry at feed boot up
779+
if startup:
780+
startup = False
781+
task_status.started((init_msgs, first_quote))
767782

768-
async with aclosing(stream):
769-
if syminfo.get('no_vlm', False):
783+
# start a stream restarter task which monitors the
784+
# data feed event.
785+
async def reset_on_feed():
770786

771-
# generally speaking these feeds don't
772-
# include vlm data.
773-
atype = syminfo['asset_type']
774-
log.info(
775-
f'Non-vlm asset {sym}@{atype}, skipping quote poll...'
776-
)
787+
# TODO: this seems to be surpressed from the
788+
# traceback in ``tractor``?
789+
# assert 0
777790

778-
else:
779-
# wait for real volume on feed (trading might be closed)
780-
while True:
781-
ticker = await stream.receive()
782-
783-
# for a real volume contract we rait for the first
784-
# "real" trade to take place
785-
if (
786-
# not calc_price
787-
# and not ticker.rtTime
788-
not ticker.rtTime
789-
):
790-
# spin consuming tickers until we get a real
791-
# market datum
792-
log.debug(f"New unsent ticker: {ticker}")
793-
continue
794-
else:
795-
log.debug("Received first real volume tick")
796-
# ugh, clear ticks since we've consumed them
797-
# (ahem, ib_insync is truly stateful trash)
798-
ticker.ticks = []
791+
rt_ev = proxy.status_event(
792+
'Market data farm connection is OK:usfarm'
793+
)
794+
await rt_ev.wait()
795+
cs.cancel() # cancel called should now be set
799796

800-
# XXX: this works because we don't use
801-
# ``aclosing()`` above?
802-
break
797+
nurse.start_soon(reset_on_feed)
803798

804-
quote = normalize(ticker)
805-
log.debug(f"First ticker received {quote}")
799+
async with aclosing(stream):
800+
if syminfo.get('no_vlm', False):
806801

807-
# tell caller quotes are now coming in live
808-
feed_is_live.set()
802+
# generally speaking these feeds don't
803+
# include vlm data.
804+
atype = syminfo['asset_type']
805+
log.info(
806+
f'No-vlm {sym}@{atype}, skipping quote poll'
807+
)
809808

810-
# last = time.time()
811-
async for ticker in stream:
812-
quote = normalize(ticker)
813-
await send_chan.send({quote['fqsn']: quote})
809+
else:
810+
# wait for real volume on feed (trading might be
811+
# closed)
812+
while True:
813+
ticker = await stream.receive()
814+
815+
# for a real volume contract we rait for
816+
# the first "real" trade to take place
817+
if (
818+
# not calc_price
819+
# and not ticker.rtTime
820+
not ticker.rtTime
821+
):
822+
# spin consuming tickers until we
823+
# get a real market datum
824+
log.debug(f"New unsent ticker: {ticker}")
825+
continue
826+
else:
827+
log.debug("Received first volume tick")
828+
# ugh, clear ticks since we've
829+
# consumed them (ahem, ib_insync is
830+
# truly stateful trash)
831+
ticker.ticks = []
832+
833+
# XXX: this works because we don't use
834+
# ``aclosing()`` above?
835+
break
836+
837+
quote = normalize(ticker)
838+
log.debug(f"First ticker received {quote}")
839+
840+
# tell caller quotes are now coming in live
841+
feed_is_live.set()
842+
843+
# last = time.time()
844+
async for ticker in stream:
845+
quote = normalize(ticker)
846+
await send_chan.send({quote['fqsn']: quote})
814847

815-
# ugh, clear ticks since we've consumed them
816-
ticker.ticks = []
817-
# last = time.time()
848+
# ugh, clear ticks since we've consumed them
849+
ticker.ticks = []
850+
# last = time.time()
818851

819852

820853
async def data_reset_hack(

0 commit comments

Comments
 (0)