Skip to content

Commit 5b5fd4a

Browse files
committed
Add rate limiting for single-order reconciliation queries
1 parent 705b327 commit 5b5fd4a

File tree

6 files changed

+261
-8
lines changed

6 files changed

+261
-8
lines changed

RELEASES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ This will be the final release with support for Python 3.11.
77
### Enhancements
88
- Added support for `OrderBookDepth10` requests (#2955), thanks @faysou
99
- Added support for quotes from book depths (#2977), thanks @faysou
10+
- Added execution engine rate limiting for single-order reconciliation queries
1011
- Added Renko bar aggregator (#2941), thanks @faysou
1112
- Added `time_range_generator` for on the fly data data subscriptions (#2952), thanks @faysou
1213
- Added `__repr__` to `NewsEvent` (#2958), thanks @MK27MK

docs/concepts/live.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,8 @@ The execution engine reuses a single retry counter (`_recon_check_retries`) for
215215

216216
When the open-order loop exhausts its retries, the engine issues one targeted `GenerateOrderStatusReport` probe before applying a terminal state. If the venue returns the order, reconciliation proceeds and the retry counter resets automatically.
217217

218+
**Single-order query protection**: To prevent rate limit exhaustion when many orders need individual queries, the engine limits single-order queries per reconciliation cycle via `max_single_order_queries_per_cycle` (default: 10). When this limit is reached, remaining orders are deferred to the next cycle. Additionally, the engine adds a configurable delay (`single_order_query_delay_ms`, default: 100ms) between single-order queries to further prevent rate limiting. This ensures the system can handle scenarios where bulk queries fail for hundreds of orders without overwhelming the venue API.
219+
218220
Orders that age beyond `open_check_lookback_mins` rely on this targeted probe. Keep the lookback generous for venues with short history windows, and consider increasing `open_check_threshold_ms` if venue timestamps lag the local clock so recently updated orders are not marked missing prematurely.
219221

220222
This ensures the trading node maintains a consistent execution state even under unreliable conditions.
@@ -228,8 +230,10 @@ This ensures the trading node maintains a consistent execution state even under
228230
| `open_check_open_only` | True | When enabled, only open orders are requested during checks; if disabled, full order history is fetched (resource-intensive). |
229231
| `open_check_lookback_mins` | 60 min | Lookback window (minutes) for order status polling during continuous reconciliation. Only orders modified within this window are considered. |
230232
| `open_check_threshold_ms` | 5,000 ms | Minimum time since the order's last cached event before open-order checks act on venue discrepancies (missing, mismatched status, etc.). |
231-
| `open_check_missing_retries` | 5 retries | Maximum retries before resolving an order that is open in cache but not found at venue. Prevents false positives from race conditions. |
232-
| `reconciliation_startup_delay_secs` | 10.0 s | Additional delay (seconds) applied *after* startup reconciliation completes before starting continuous reconciliation loop. Provides time for additional system stabilization. |
233+
| `open_check_missing_retries` | 5 retries | Maximum retries before resolving an order that is open in cache but not found at venue. Prevents false positives from race conditions. |
234+
| `max_single_order_queries_per_cycle` | 10 | Maximum number of single-order queries per reconciliation cycle. Prevents rate limit exhaustion when many orders fail bulk query checks. |
235+
| `single_order_query_delay_ms` | 100 ms | Delay (milliseconds) between single-order queries to prevent rate limit exhaustion. |
236+
| `reconciliation_startup_delay_secs` | 10.0 s | Additional delay (seconds) applied *after* startup reconciliation completes before starting continuous reconciliation loop. Provides time for additional system stabilization. |
233237
| `own_books_audit_interval_secs` | None | Sets the interval (in seconds) between audits of own order books against public ones. Verifies synchronization and logs errors for inconsistencies. |
234238

235239
:::warning

docs/integrations/bitmex.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ Exceeding BitMEX rate limits returns HTTP 429 and may trigger temporary IP bans;
368368
All requests automatically consume both the global burst bucket and the rolling minute bucket. Endpoints that have their own minute quota (e.g. `/api/v1/order`) also queue against that per-route key, so repeated calls with different parameters still share a single rate bucket.
369369

370370
:::info
371-
For more details on rate limiting, see the official documentation: <https://www.bitmex.com/app/restAPI#Rate-Limits>.
371+
For more details on rate limiting, see the [BitMEX API documentation on rate limits](https://www.bitmex.com/app/restAPI#Limits).
372372
:::
373373

374374
### Rate-limit headers

nautilus_trader/live/config.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,7 @@ class LiveExecEngineConfig(ExecEngineConfig, frozen=True):
122122
The interval (seconds) between checks for open orders at the venue.
123123
If there is a discrepancy then an order status report is generated and reconciled.
124124
A recommended setting is between 5-10 seconds, consider API rate limits and the additional
125-
request weights.
126-
If no value is specified then the open order checking task is not started.
125+
request weights. If no value is specified then the open order checking task is not started.
127126
open_check_open_only : bool, default True
128127
If True, the **check_open_orders** requests only currently open orders from the venue.
129128
If False, it requests the entire order history, which can be a heavy API call.
@@ -138,6 +137,11 @@ class LiveExecEngineConfig(ExecEngineConfig, frozen=True):
138137
The maximum number of retries before resolving an order that is open in cache but
139138
not found at the venue. This prevents race conditions where orders are resolved too
140139
quickly due to network delays or venue processing time.
140+
max_single_order_queries_per_cycle : PositiveInt, default 10
141+
The maximum number of single-order queries to perform per reconciliation cycle.
142+
Prevents rate limit exhaustion when many orders fail bulk query checks.
143+
single_order_query_delay_ms : NonNegativeInt, default 100
144+
The delay (milliseconds) between single-order queries to prevent rate limit exhaustion.
141145
reconciliation_startup_delay_secs : PositiveFloat, default 10.0
142146
The additional delay (seconds) applied AFTER startup reconciliation
143147
completes before starting the continuous reconciliation loop. This provides time
@@ -193,6 +197,8 @@ class LiveExecEngineConfig(ExecEngineConfig, frozen=True):
193197
open_check_lookback_mins: PositiveInt = 60
194198
open_check_threshold_ms: NonNegativeInt = 5_000
195199
open_check_missing_retries: NonNegativeInt = 5
200+
max_single_order_queries_per_cycle: PositiveInt = 10
201+
single_order_query_delay_ms: NonNegativeInt = 100
196202
reconciliation_startup_delay_secs: PositiveFloat = 10.0
197203
purge_closed_orders_interval_mins: PositiveInt | None = None
198204
purge_closed_orders_buffer_mins: NonNegativeInt | None = None

nautilus_trader/live/execution_engine.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ def __init__(
180180
self.open_check_lookback_mins: int = config.open_check_lookback_mins
181181
self.open_check_threshold_ms: int = config.open_check_threshold_ms
182182
self.open_check_missing_retries: int = config.open_check_missing_retries
183+
self.max_single_order_queries_per_cycle: int = config.max_single_order_queries_per_cycle
184+
self.single_order_query_delay_ms: int = config.single_order_query_delay_ms
183185
self.reconciliation_startup_delay_secs: float = config.reconciliation_startup_delay_secs
184186
self.purge_closed_orders_interval_mins = config.purge_closed_orders_interval_mins
185187
self.purge_closed_orders_buffer_mins = config.purge_closed_orders_buffer_mins
@@ -205,6 +207,8 @@ def __init__(
205207
self._log.info(f"{config.open_check_lookback_mins=}", LogColor.BLUE)
206208
self._log.info(f"{config.open_check_threshold_ms=}", LogColor.BLUE)
207209
self._log.info(f"{config.open_check_missing_retries=}", LogColor.BLUE)
210+
self._log.info(f"{config.max_single_order_queries_per_cycle=}", LogColor.BLUE)
211+
self._log.info(f"{config.single_order_query_delay_ms=}", LogColor.BLUE)
208212
self._log.info(f"{config.reconciliation_startup_delay_secs=}", LogColor.BLUE)
209213
self._log.info(f"{config.purge_closed_orders_interval_mins=}", LogColor.BLUE)
210214
self._log.info(f"{config.purge_closed_orders_buffer_mins=}", LogColor.BLUE)
@@ -633,14 +637,14 @@ async def _resolve_order_not_found_at_venue(self, order: Order) -> None:
633637
no record of it, which typically means the order was never successfully placed
634638
or was rejected.
635639
636-
Before marking as rejected, performs a targeted query to check if the order
640+
Before marking as rejected, performs a single-order query to check if the order
637641
exists but was missed due to API timing/processing delays.
638642
639643
"""
640644
ts_now = self._clock.timestamp_ns()
641645

642646
self._log.debug(
643-
f"Performing targeted query for {order.client_order_id!r} before marking as REJECTED",
647+
f"Performing single-order query for {order.client_order_id!r} before marking as REJECTED",
644648
LogColor.BLUE,
645649
)
646650

@@ -1032,6 +1036,10 @@ async def _check_orders_consistency(self) -> None:
10321036
missing_at_venue: set[ClientOrderId] = open_order_ids - venue_reported_ids
10331037
ts_now = self._clock.timestamp_ns()
10341038

1039+
# Track targeted queries to prevent rate limit exhaustion
1040+
targeted_queries_count = 0
1041+
logged_limit_warning = False
1042+
10351043
for client_order_id in missing_at_venue:
10361044
order = self._cache.order(client_order_id)
10371045
if order is None:
@@ -1058,12 +1066,43 @@ async def _check_orders_consistency(self) -> None:
10581066

10591067
retries = self._recon_check_retries.get(client_order_id, 0)
10601068
if retries >= self.open_check_missing_retries:
1069+
if targeted_queries_count >= self.max_single_order_queries_per_cycle:
1070+
self._recon_check_retries[client_order_id] = retries + 1
1071+
1072+
if not logged_limit_warning:
1073+
# Count how many orders at threshold are being deferred
1074+
orders_at_threshold_remaining = (
1075+
sum(
1076+
1
1077+
for cid in missing_at_venue
1078+
if self._recon_check_retries.get(cid, 0)
1079+
>= self.open_check_missing_retries
1080+
)
1081+
- targeted_queries_count
1082+
)
1083+
self._log.warning(
1084+
f"Reached max single-order queries ({self.max_single_order_queries_per_cycle}) "
1085+
f"this cycle, deferring {orders_at_threshold_remaining} order(s) at threshold to next cycle",
1086+
LogColor.YELLOW,
1087+
)
1088+
logged_limit_warning = True
1089+
1090+
continue # Skip query but continue processing other orders
1091+
10611092
self._log.warning(
1062-
f"Order {client_order_id!r} not found at venue after {retries} retries, performing targeted query",
1093+
f"Order {client_order_id!r} not found at venue after {retries} retries, performing single-order query",
10631094
LogColor.YELLOW,
10641095
)
10651096
self._clear_recon_tracking(client_order_id, drop_last_query=False)
10661097
await self._resolve_order_not_found_at_venue(order)
1098+
targeted_queries_count += 1
1099+
1100+
# Add delay between single-order queries (skip after final query)
1101+
if (
1102+
targeted_queries_count < self.max_single_order_queries_per_cycle
1103+
and self.single_order_query_delay_ms > 0
1104+
):
1105+
await asyncio.sleep(self.single_order_query_delay_ms / 1000.0)
10671106
else:
10681107
self._recon_check_retries[client_order_id] = retries + 1
10691108
self._log.debug(

tests/integration_tests/live/test_live_reconciliation.py

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,3 +736,206 @@ async def test_concurrent_order_reconciliation(
736736
assert orders[2].filled_qty == Quantity.from_int(30_000) # Verify complete fill
737737
assert orders[3].status == OrderStatus.REJECTED # Venue reported REJECTED
738738
assert orders[4].status == OrderStatus.CANCELED # Venue reported CANCELED
739+
740+
741+
@pytest.mark.asyncio()
742+
async def test_targeted_query_limiting(
743+
msgbus,
744+
cache,
745+
clock,
746+
trader_id,
747+
account_id,
748+
order_factory,
749+
):
750+
"""
751+
Test that single-order queries are limited per cycle to prevent rate limit
752+
exhaustion.
753+
754+
Simulates a scenario where:
755+
1. Many orders fail the bulk query check
756+
2. Single-order queries are needed for each order
757+
3. System limits queries per cycle to prevent rate limit errors
758+
759+
"""
760+
# Arrange - Configure engine with low limits for testing
761+
config = LiveExecEngineConfig(
762+
open_check_interval_secs=1.0,
763+
open_check_open_only=False, # Full history mode so missing orders are detected
764+
max_single_order_queries_per_cycle=3, # Low limit for testing
765+
single_order_query_delay_ms=50, # Small delay for testing
766+
open_check_missing_retries=0, # Immediately trigger single-order queries
767+
)
768+
769+
exec_engine = LiveExecutionEngine(
770+
loop=asyncio.get_running_loop(),
771+
msgbus=msgbus,
772+
cache=cache,
773+
clock=clock,
774+
config=config,
775+
)
776+
777+
exec_client = MockLiveExecutionClient(
778+
loop=asyncio.get_running_loop(),
779+
client_id=ClientId(SIM.value),
780+
venue=SIM,
781+
account_type=AccountType.CASH,
782+
base_currency=USD,
783+
instrument_provider=InstrumentProvider(),
784+
msgbus=msgbus,
785+
cache=cache,
786+
clock=clock,
787+
)
788+
789+
exec_engine.register_client(exec_client)
790+
exec_engine.start()
791+
792+
# Create 10 orders and add them to cache as ACCEPTED
793+
orders = []
794+
for i in range(10):
795+
order = order_factory.limit(
796+
instrument_id=AUDUSD_SIM.id,
797+
order_side=OrderSide.BUY,
798+
quantity=AUDUSD_SIM.make_qty(100),
799+
price=AUDUSD_SIM.make_price(1.0),
800+
)
801+
cache.add_order(order)
802+
order.apply(TestEventStubs.order_submitted(order))
803+
order.apply(TestEventStubs.order_accepted(order))
804+
cache.update_order(order)
805+
orders.append(order)
806+
807+
# Mock returns empty reports (all orders "missing at venue")
808+
# No reports added to exec_client, so generate_order_status_reports returns []
809+
810+
# Act - Run check_orders_consistency which should limit single-order queries
811+
await exec_engine._check_orders_consistency()
812+
813+
# Assert - Only 3 single-order queries should have been attempted (max_single_order_queries_per_cycle)
814+
# Since single-order queries return None, orders should be resolved as REJECTED
815+
await eventually(lambda: len([o for o in orders if o.status == OrderStatus.REJECTED]) == 3)
816+
817+
# Run another cycle to process more orders
818+
await exec_engine._check_orders_consistency()
819+
await eventually(lambda: len([o for o in orders if o.status == OrderStatus.REJECTED]) == 6)
820+
821+
# Run one more cycle
822+
await exec_engine._check_orders_consistency()
823+
await eventually(lambda: len([o for o in orders if o.status == OrderStatus.REJECTED]) == 9)
824+
825+
# Final cycle for the last order
826+
await exec_engine._check_orders_consistency()
827+
await eventually(lambda: len([o for o in orders if o.status == OrderStatus.REJECTED]) == 10)
828+
829+
# Cleanup
830+
exec_engine.stop()
831+
await eventually(lambda: exec_engine.is_stopped)
832+
833+
834+
@pytest.mark.asyncio()
835+
async def test_targeted_query_limiting_with_retry_accumulation(
836+
msgbus,
837+
cache,
838+
clock,
839+
trader_id,
840+
account_id,
841+
order_factory,
842+
):
843+
"""
844+
Test that orders accumulate retries even when max_single_order_queries_per_cycle is
845+
reached, ensuring reconciliation progresses over multiple cycles.
846+
847+
Simulates a scenario where:
848+
1. Many orders need reconciliation simultaneously
849+
2. Rate limits prevent querying all at once
850+
3. Orders continue accumulating retries while waiting
851+
4. All orders eventually get reconciled
852+
853+
"""
854+
# Arrange - Configure with realistic retry threshold
855+
config = LiveExecEngineConfig(
856+
open_check_interval_secs=1.0,
857+
open_check_open_only=False, # Full history mode
858+
max_single_order_queries_per_cycle=3, # Low limit for testing
859+
single_order_query_delay_ms=10, # Small delay for testing
860+
open_check_missing_retries=5, # Realistic retry threshold
861+
)
862+
863+
exec_engine = LiveExecutionEngine(
864+
loop=asyncio.get_running_loop(),
865+
msgbus=msgbus,
866+
cache=cache,
867+
clock=clock,
868+
config=config,
869+
)
870+
871+
exec_client = MockLiveExecutionClient(
872+
loop=asyncio.get_running_loop(),
873+
client_id=ClientId(SIM.value),
874+
venue=SIM,
875+
account_type=AccountType.CASH,
876+
base_currency=USD,
877+
instrument_provider=InstrumentProvider(),
878+
msgbus=msgbus,
879+
cache=cache,
880+
clock=clock,
881+
)
882+
883+
exec_engine.register_client(exec_client)
884+
exec_engine.start()
885+
886+
# Create 10 orders, all ACCEPTED (missing at venue)
887+
orders = []
888+
for i in range(10):
889+
order = order_factory.limit(
890+
instrument_id=AUDUSD_SIM.id,
891+
order_side=OrderSide.BUY,
892+
quantity=AUDUSD_SIM.make_qty(100),
893+
price=AUDUSD_SIM.make_price(1.0),
894+
)
895+
cache.add_order(order)
896+
order.apply(TestEventStubs.order_submitted(order))
897+
order.apply(TestEventStubs.order_accepted(order))
898+
cache.update_order(order)
899+
orders.append(order)
900+
901+
# Cycle 1: All orders get retry count 1 (none ready for query yet)
902+
await exec_engine._check_orders_consistency()
903+
for order in orders:
904+
assert exec_engine._recon_check_retries.get(order.client_order_id, 0) == 1
905+
assert all(o.status == OrderStatus.ACCEPTED for o in orders)
906+
907+
# Cycle 2-5: Retry counts accumulate to 5 (threshold)
908+
for cycle in range(2, 6):
909+
await exec_engine._check_orders_consistency()
910+
for order in orders:
911+
assert exec_engine._recon_check_retries.get(order.client_order_id, 0) == cycle
912+
913+
# Cycle 6: All 10 orders now at threshold (5), but only 3 can be queried
914+
# First 3 get queried and resolved, remaining 7 increment to retry count 6
915+
await exec_engine._check_orders_consistency()
916+
await eventually(lambda: len([o for o in orders if o.status == OrderStatus.REJECTED]) == 3)
917+
918+
# Check that the remaining 7 orders have retry count incremented
919+
for order in orders:
920+
if order.status == OrderStatus.ACCEPTED:
921+
# These hit the limit, got retries incremented but not queried
922+
assert exec_engine._recon_check_retries.get(order.client_order_id, 0) == 6
923+
924+
# Cycle 7: 3 more get queried (total 6 resolved), remaining 4 at retry 7
925+
await exec_engine._check_orders_consistency()
926+
await eventually(lambda: len([o for o in orders if o.status == OrderStatus.REJECTED]) == 6)
927+
928+
# Cycle 8: 3 more (total 9), 1 remaining at retry 8
929+
await exec_engine._check_orders_consistency()
930+
await eventually(lambda: len([o for o in orders if o.status == OrderStatus.REJECTED]) == 9)
931+
932+
# Cycle 9: Last order resolved
933+
await exec_engine._check_orders_consistency()
934+
await eventually(lambda: len([o for o in orders if o.status == OrderStatus.REJECTED]) == 10)
935+
936+
# All orders eventually processed
937+
await eventually(lambda: all(o.status == OrderStatus.REJECTED for o in orders))
938+
939+
# Cleanup
940+
exec_engine.stop()
941+
await eventually(lambda: exec_engine.is_stopped)

0 commit comments

Comments
 (0)