Skip to content

Commit 36289fd

Browse files
committed
Add BitMEX SubmitBroadcaster scaffolding
Not yet operational.
1 parent 8096e2b commit 36289fd

File tree

21 files changed

+2159
-147
lines changed

21 files changed

+2159
-147
lines changed

crates/adapters/bitmex/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ pub struct BitmexExecClientConfig {
177177
pub max_requests_per_second: Option<u32>,
178178
/// Maximum number of requests per minute (rolling window).
179179
pub max_requests_per_minute: Option<u32>,
180+
/// Number of HTTP clients in the submit broadcaster pool (defaults to 1).
181+
pub submitter_pool_size: Option<usize>,
182+
/// Number of HTTP clients in the cancel broadcaster pool (defaults to 1).
183+
pub canceller_pool_size: Option<usize>,
180184
}
181185

182186
impl Default for BitmexExecClientConfig {
@@ -197,6 +201,8 @@ impl Default for BitmexExecClientConfig {
197201
account_id: None,
198202
max_requests_per_second: Some(10),
199203
max_requests_per_minute: Some(120),
204+
submitter_pool_size: None,
205+
canceller_pool_size: None,
200206
}
201207
}
202208
}

crates/adapters/bitmex/src/execution/canceller.rs

Lines changed: 34 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
// lands so we can drop the per-call heap allocation
3232

3333
use std::{
34+
future::Future,
35+
pin::Pin,
3436
sync::{
3537
Arc,
3638
atomic::{AtomicBool, AtomicU64, Ordering},
@@ -75,48 +77,38 @@ trait CancelExecutor: Send + Sync {
7577
fn add_instrument(&self, instrument: InstrumentAny);
7678

7779
/// Performs a health check on the executor.
78-
fn health_check(
79-
&self,
80-
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>;
80+
fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
8181

8282
/// Cancels a single order.
8383
fn cancel_order(
8484
&self,
8585
instrument_id: InstrumentId,
8686
client_order_id: Option<ClientOrderId>,
8787
venue_order_id: Option<VenueOrderId>,
88-
) -> std::pin::Pin<
89-
Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
90-
>;
88+
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
9189

9290
/// Cancels multiple orders.
9391
fn cancel_orders(
9492
&self,
9593
instrument_id: InstrumentId,
9694
client_order_ids: Option<Vec<ClientOrderId>>,
9795
venue_order_ids: Option<Vec<VenueOrderId>>,
98-
) -> std::pin::Pin<
99-
Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
100-
>;
96+
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
10197

10298
/// Cancels all orders for an instrument.
10399
fn cancel_all_orders(
104100
&self,
105101
instrument_id: InstrumentId,
106102
order_side: Option<OrderSide>,
107-
) -> std::pin::Pin<
108-
Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
109-
>;
103+
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
110104
}
111105

112106
impl CancelExecutor for BitmexHttpClient {
113107
fn add_instrument(&self, instrument: InstrumentAny) {
114108
Self::add_instrument(self, instrument);
115109
}
116110

117-
fn health_check(
118-
&self,
119-
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>> {
111+
fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
120112
Box::pin(async move {
121113
Self::http_get_server_time(self)
122114
.await
@@ -130,9 +122,7 @@ impl CancelExecutor for BitmexHttpClient {
130122
instrument_id: InstrumentId,
131123
client_order_id: Option<ClientOrderId>,
132124
venue_order_id: Option<VenueOrderId>,
133-
) -> std::pin::Pin<
134-
Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
135-
> {
125+
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
136126
Box::pin(async move {
137127
Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
138128
})
@@ -143,9 +133,7 @@ impl CancelExecutor for BitmexHttpClient {
143133
instrument_id: InstrumentId,
144134
client_order_ids: Option<Vec<ClientOrderId>>,
145135
venue_order_ids: Option<Vec<VenueOrderId>>,
146-
) -> std::pin::Pin<
147-
Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
148-
> {
136+
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
149137
Box::pin(async move {
150138
Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
151139
})
@@ -154,10 +142,8 @@ impl CancelExecutor for BitmexHttpClient {
154142
fn cancel_all_orders(
155143
&self,
156144
instrument_id: InstrumentId,
157-
order_side: Option<nautilus_model::enums::OrderSide>,
158-
) -> std::pin::Pin<
159-
Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
160-
> {
145+
order_side: Option<OrderSide>,
146+
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
161147
Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
162148
}
163149
}
@@ -276,6 +262,14 @@ impl TransportClient {
276262
self.healthy.store(false, Ordering::Relaxed);
277263
}
278264

265+
fn get_cancel_count(&self) -> u64 {
266+
self.cancel_count.load(Ordering::Relaxed)
267+
}
268+
269+
fn get_error_count(&self) -> u64 {
270+
self.error_count.load(Ordering::Relaxed)
271+
}
272+
279273
async fn health_check(&self, timeout_secs: u64) -> bool {
280274
match tokio::time::timeout(
281275
Duration::from_secs(timeout_secs),
@@ -323,14 +317,6 @@ impl TransportClient {
323317
}
324318
}
325319
}
326-
327-
fn get_cancel_count(&self) -> u64 {
328-
self.cancel_count.load(Ordering::Relaxed)
329-
}
330-
331-
fn get_error_count(&self) -> u64 {
332-
self.error_count.load(Ordering::Relaxed)
333-
}
334320
}
335321

336322
/// Broadcasts cancel requests to multiple HTTP clients for redundancy.
@@ -627,7 +613,6 @@ impl CancelBroadcaster {
627613
) -> anyhow::Result<Option<OrderStatusReport>> {
628614
self.total_cancels.fetch_add(1, Ordering::Relaxed);
629615

630-
// Filter for healthy clients and clone them
631616
let transports_guard = self.transports.read().await;
632617
let healthy_transports: Vec<TransportClient> = transports_guard
633618
.iter()
@@ -641,7 +626,6 @@ impl CancelBroadcaster {
641626
anyhow::bail!("No healthy transport clients available");
642627
}
643628

644-
// Spawn tasks for all healthy clients
645629
let mut handles = Vec::new();
646630
for transport in healthy_transports {
647631
let handle = tokio::spawn(async move {
@@ -681,7 +665,6 @@ impl CancelBroadcaster {
681665
) -> anyhow::Result<Vec<OrderStatusReport>> {
682666
self.total_cancels.fetch_add(1, Ordering::Relaxed);
683667

684-
// Filter for healthy clients and clone them
685668
let transports_guard = self.transports.read().await;
686669
let healthy_transports: Vec<TransportClient> = transports_guard
687670
.iter()
@@ -695,7 +678,6 @@ impl CancelBroadcaster {
695678
anyhow::bail!("No healthy transport clients available");
696679
}
697680

698-
// Spawn tasks for all healthy clients
699681
let mut handles = Vec::new();
700682

701683
for transport in healthy_transports {
@@ -733,11 +715,10 @@ impl CancelBroadcaster {
733715
pub async fn broadcast_cancel_all(
734716
&self,
735717
instrument_id: InstrumentId,
736-
order_side: Option<nautilus_model::enums::OrderSide>,
718+
order_side: Option<OrderSide>,
737719
) -> anyhow::Result<Vec<OrderStatusReport>> {
738720
self.total_cancels.fetch_add(1, Ordering::Relaxed);
739721

740-
// Filter for healthy clients and clone them
741722
let transports_guard = self.transports.read().await;
742723
let healthy_transports: Vec<TransportClient> = transports_guard
743724
.iter()
@@ -751,7 +732,6 @@ impl CancelBroadcaster {
751732
anyhow::bail!("No healthy transport clients available");
752733
}
753734

754-
// Spawn tasks for all healthy clients
755735
let mut handles = Vec::new();
756736
for transport in healthy_transports {
757737
let handle = tokio::spawn(async move {
@@ -843,7 +823,7 @@ impl CancelBroadcaster {
843823
}
844824

845825
/// Adds an instrument to all HTTP clients in the pool for caching.
846-
pub fn add_instrument(&self, instrument: nautilus_model::instruments::any::InstrumentAny) {
826+
pub fn add_instrument(&self, instrument: InstrumentAny) {
847827
let transports = self.transports.blocking_read();
848828
for transport in transports.iter() {
849829
transport.executor.add_instrument(instrument.clone());
@@ -933,9 +913,9 @@ mod tests {
933913
InstrumentId,
934914
Option<ClientOrderId>,
935915
Option<VenueOrderId>,
936-
) -> std::pin::Pin<
937-
Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send>,
938-
> + Send
916+
)
917+
-> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
918+
+ Send
939919
+ Sync,
940920
>,
941921
}
@@ -947,7 +927,7 @@ mod tests {
947927
+ Send
948928
+ Sync
949929
+ 'static,
950-
Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
930+
Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
951931
{
952932
Self {
953933
handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
@@ -956,10 +936,7 @@ mod tests {
956936
}
957937

958938
impl CancelExecutor for MockExecutor {
959-
fn health_check(
960-
&self,
961-
) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>
962-
{
939+
fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
963940
Box::pin(async { Ok(()) })
964941
}
965942

@@ -968,9 +945,7 @@ mod tests {
968945
instrument_id: InstrumentId,
969946
client_order_id: Option<ClientOrderId>,
970947
venue_order_id: Option<VenueOrderId>,
971-
) -> std::pin::Pin<
972-
Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
973-
> {
948+
) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
974949
(self.handler)(instrument_id, client_order_id, venue_order_id)
975950
}
976951

@@ -979,27 +954,17 @@ mod tests {
979954
_instrument_id: InstrumentId,
980955
_client_order_ids: Option<Vec<ClientOrderId>>,
981956
_venue_order_ids: Option<Vec<VenueOrderId>>,
982-
) -> std::pin::Pin<
983-
Box<
984-
dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
985-
+ Send
986-
+ '_,
987-
>,
988-
> {
957+
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
958+
{
989959
Box::pin(async { Ok(Vec::new()) })
990960
}
991961

992962
fn cancel_all_orders(
993963
&self,
994964
instrument_id: InstrumentId,
995-
_order_side: Option<nautilus_model::enums::OrderSide>,
996-
) -> std::pin::Pin<
997-
Box<
998-
dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
999-
+ Send
1000-
+ '_,
1001-
>,
1002-
> {
965+
_order_side: Option<OrderSide>,
966+
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
967+
{
1003968
// Try to get result from the single-order handler to propagate errors
1004969
let handler = Arc::clone(&self.handler);
1005970
Box::pin(async move {
@@ -1012,7 +977,7 @@ mod tests {
1012977
})
1013978
}
1014979

1015-
fn add_instrument(&self, _instrument: nautilus_model::instruments::any::InstrumentAny) {
980+
fn add_instrument(&self, _instrument: InstrumentAny) {
1016981
// No-op for mock
1017982
}
1018983
}
@@ -1060,7 +1025,7 @@ mod tests {
10601025
+ Send
10611026
+ Sync
10621027
+ 'static,
1063-
Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
1028+
Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
10641029
{
10651030
let executor = MockExecutor::new(handler);
10661031
TransportClient::new(executor, client_id.to_string())

0 commit comments

Comments
 (0)