diff --git a/crates/adapters/hyperliquid/src/http/client.rs b/crates/adapters/hyperliquid/src/http/client.rs index 05ac5e3ca42c..599e4b4ba8b8 100644 --- a/crates/adapters/hyperliquid/src/http/client.rs +++ b/crates/adapters/hyperliquid/src/http/client.rs @@ -24,6 +24,7 @@ use std::{ collections::HashMap, num::NonZeroU32, sync::{Arc, LazyLock}, + time::Duration, }; use anyhow::Context; @@ -31,6 +32,7 @@ use nautilus_core::consts::NAUTILUS_USER_AGENT; use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota}; use reqwest::{Method, header::USER_AGENT}; use serde_json::Value; +use tokio::time::sleep; use crate::{ common::{ @@ -44,6 +46,10 @@ use crate::{ HyperliquidL2Book, HyperliquidMeta, HyperliquidOrderStatus, }, query::{ExchangeAction, InfoRequest}, + rate_limits::{ + RateLimitSnapshot, WeightedLimiter, backoff_full_jitter, exchange_weight, + info_base_weight, info_extra_weight, + }, }, signing::{ HyperliquidActionType, HyperliquidEip712Signer, NonceManager, SignRequest, types::SignerId, @@ -68,6 +74,10 @@ pub struct HyperliquidHttpClient { signer: Option, nonce_manager: Option>, vault_address: Option, + rest_limiter: Arc, + rate_limit_backoff_base: Duration, + rate_limit_backoff_cap: Duration, + rate_limit_max_attempts_info: u32, } impl Default for HyperliquidHttpClient { @@ -98,6 +108,10 @@ impl HyperliquidHttpClient { signer: None, nonce_manager: None, vault_address: None, + rest_limiter: Arc::new(WeightedLimiter::per_minute(1200)), + rate_limit_backoff_base: Duration::from_millis(125), + rate_limit_backoff_cap: Duration::from_secs(5), + rate_limit_max_attempts_info: 3, } } @@ -122,6 +136,10 @@ impl HyperliquidHttpClient { signer: Some(signer), nonce_manager: Some(nonce_manager), vault_address: secrets.vault_address, + rest_limiter: Arc::new(WeightedLimiter::per_minute(1200)), + rate_limit_backoff_base: Duration::from_millis(125), + rate_limit_backoff_cap: Duration::from_secs(5), + rate_limit_max_attempts_info: 3, } } @@ -137,6 +155,15 @@ impl HyperliquidHttpClient { Ok(Self::with_credentials(&secrets, None)) } + /// Configure rate limiting parameters (chainable). + pub fn with_rate_limits(mut self) -> Self { + self.rest_limiter = Arc::new(WeightedLimiter::per_minute(1200)); + self.rate_limit_backoff_base = Duration::from_millis(125); + self.rate_limit_backoff_cap = Duration::from_secs(5); + self.rate_limit_max_attempts_info = 3; + self + } + /// Returns whether this client is configured for testnet. #[must_use] pub fn is_testnet(&self) -> bool { @@ -181,16 +208,91 @@ impl HyperliquidHttpClient { serde_json::from_value(response).map_err(Error::Serde) } + /// Generic info request method that returns raw JSON (useful for new endpoints and testing). + pub async fn send_info_request_raw(&self, request: &InfoRequest) -> Result { + self.send_info_request(request).await + } + /// Send a raw info request and return the JSON response. async fn send_info_request(&self, request: &InfoRequest) -> Result { + let base_w = info_base_weight(request); + self.rest_limiter.acquire(base_w).await; + + let mut attempt = 0u32; + loop { + let response = self.http_roundtrip_info(request).await?; + + if response.status.is_success() { + // decode once to count items, then materialize T + let val: Value = serde_json::from_slice(&response.body).map_err(Error::Serde)?; + let extra = info_extra_weight(request, &val); + if extra > 0 { + self.rest_limiter.debit_extra(extra).await; + tracing::debug!(endpoint=?request, base_w, extra, "info: debited extra weight"); + } + return Ok(val); + } + + // 429 → respect Retry-After; else jittered backoff. Retry Info only. + if response.status.as_u16() == 429 { + if attempt >= self.rate_limit_max_attempts_info { + let ra = self.parse_retry_after_simple(&response.headers); + return Err(Error::rate_limit("info", base_w, ra)); + } + let delay = self + .parse_retry_after_simple(&response.headers) + .map(Duration::from_millis) + .unwrap_or_else(|| { + backoff_full_jitter( + attempt, + self.rate_limit_backoff_base, + self.rate_limit_backoff_cap, + ) + }); + tracing::warn!(endpoint=?request, attempt, wait_ms=?delay.as_millis(), "429 Too Many Requests; backing off"); + attempt += 1; + sleep(delay).await; + // tiny re-acquire to avoid stampede exactly on minute boundary + self.rest_limiter.acquire(1).await; + continue; + } + + // transient 5xx: treat like retryable Info (bounded) + if (response.status.is_server_error() || response.status.as_u16() == 408) + && attempt < self.rate_limit_max_attempts_info + { + let delay = backoff_full_jitter( + attempt, + self.rate_limit_backoff_base, + self.rate_limit_backoff_cap, + ); + tracing::warn!(endpoint=?request, attempt, status=?response.status.as_u16(), wait_ms=?delay.as_millis(), "transient error; retrying"); + attempt += 1; + sleep(delay).await; + continue; + } + + // non-retryable or exhausted + let error_body = String::from_utf8_lossy(&response.body); + return Err(Error::http( + response.status.as_u16(), + error_body.to_string(), + )); + } + } + + /// Raw HTTP roundtrip for info requests - returns the original HttpResponse + async fn http_roundtrip_info( + &self, + request: &InfoRequest, + ) -> Result { let url = &self.base_info; let body = serde_json::to_value(request).map_err(Error::Serde)?; let body_bytes = serde_json::to_string(&body) .map_err(Error::Serde)? .into_bytes(); - let response = self - .client + self.client .request( Method::POST, url.clone(), @@ -200,17 +302,13 @@ impl HyperliquidHttpClient { None, ) .await - .map_err(Error::from_http_client)?; + .map_err(Error::from_http_client) + } - if response.status.is_success() { - serde_json::from_slice(&response.body).map_err(Error::Serde) - } else { - let error_body = String::from_utf8_lossy(&response.body); - Err(Error::http( - response.status.as_u16(), - error_body.to_string(), - )) - } + /// Parse Retry-After from response headers (simplified) + fn parse_retry_after_simple(&self, headers: &HashMap) -> Option { + let retry_after = headers.get("retry-after")?; + retry_after.parse::().ok().map(|s| s * 1000) // convert seconds to ms } // ---------------- EXCHANGE ENDPOINTS --------------------------------------- @@ -220,6 +318,9 @@ impl HyperliquidHttpClient { &self, action: &ExchangeAction, ) -> Result { + let w = exchange_weight(action); + self.rest_limiter.acquire(w).await; + let signer = self .signer .as_ref() @@ -257,12 +358,32 @@ impl HyperliquidHttpClient { HyperliquidExchangeRequest::new(action.clone(), time_nonce.as_millis() as u64, sig) }; + let response = self.http_roundtrip_exchange(&request).await?; + + if response.status.is_success() { + serde_json::from_slice(&response.body).map_err(Error::Serde) + } else if response.status.as_u16() == 429 { + let ra = self.parse_retry_after_simple(&response.headers); + Err(Error::rate_limit("exchange", w, ra)) + } else { + let error_body = String::from_utf8_lossy(&response.body); + Err(Error::http( + response.status.as_u16(), + error_body.to_string(), + )) + } + } + + /// Raw HTTP roundtrip for exchange requests + async fn http_roundtrip_exchange( + &self, + request: &HyperliquidExchangeRequest, + ) -> Result { let url = &self.base_exchange; let body = serde_json::to_string(&request).map_err(Error::Serde)?; let body_bytes = body.into_bytes(); - let response = self - .client + self.client .request( Method::POST, url.clone(), @@ -272,17 +393,12 @@ impl HyperliquidHttpClient { None, ) .await - .map_err(Error::from_http_client)?; + .map_err(Error::from_http_client) + } - if response.status.is_success() { - serde_json::from_slice(&response.body).map_err(Error::Serde) - } else { - let error_body = String::from_utf8_lossy(&response.body); - Err(Error::http( - response.status.as_u16(), - error_body.to_string(), - )) - } + /// Best-effort gauge for diagnostics/metrics + pub async fn rest_limiter_snapshot(&self) -> RateLimitSnapshot { + self.rest_limiter.snapshot().await } // ---------------- INTERNALS ----------------------------------------------- diff --git a/crates/adapters/hyperliquid/src/http/error.rs b/crates/adapters/hyperliquid/src/http/error.rs index c1b7af791686..968c29cdaab1 100644 --- a/crates/adapters/hyperliquid/src/http/error.rs +++ b/crates/adapters/hyperliquid/src/http/error.rs @@ -31,8 +31,12 @@ pub enum Error { Auth(String), /// Rate limiting errors with optional retry information - #[error("rate limited (retry_after={retry_after:?}s)")] - RateLimit { retry_after: Option }, + #[error("Rate limited on {scope} (weight={weight}) retry_after_ms={retry_after_ms:?}")] + RateLimit { + scope: &'static str, + weight: u32, + retry_after_ms: Option, + }, /// Nonce window violations (nonces must be within time window and unique) #[error("nonce window error: {0}")] @@ -83,8 +87,12 @@ impl Error { } /// Create a rate limit error - pub fn rate_limit(retry_after: Option) -> Self { - Self::RateLimit { retry_after } + pub fn rate_limit(scope: &'static str, weight: u32, retry_after_ms: Option) -> Self { + Self::RateLimit { + scope, + weight, + retry_after_ms, + } } /// Create a nonce window error @@ -115,6 +123,18 @@ impl Error { } } + /// Create an error from HTTP status code and body + pub fn from_http_status(status: reqwest::StatusCode, body: &[u8]) -> Self { + let message = String::from_utf8_lossy(body).to_string(); + match status.as_u16() { + 401 | 403 => Self::auth(format!("HTTP {}: {}", status.as_u16(), message)), + 400 => Self::bad_request(format!("HTTP {}: {}", status.as_u16(), message)), + 429 => Self::rate_limit("unknown", 0, None), + 500..=599 => Self::exchange(format!("HTTP {}: {}", status.as_u16(), message)), + _ => Self::http(status.as_u16(), message), + } + } + /// Map reqwest errors to appropriate error types pub fn from_reqwest(error: reqwest::Error) -> Self { if error.is_timeout() { @@ -124,7 +144,7 @@ impl Error { match status_code { 401 | 403 => Self::auth(format!("HTTP {}: authentication failed", status_code)), 400 => Self::bad_request(format!("HTTP {}: bad request", status_code)), - 429 => Self::rate_limit(None), // TODO: Extract retry-after header + 429 => Self::rate_limit("unknown", 0, None), // TODO: Extract retry-after header 500..=599 => Self::exchange(format!("HTTP {}: server error", status_code)), _ => Self::http(status_code, format!("HTTP error: {}", error)), } @@ -185,7 +205,7 @@ mod tests { let auth_err = Error::auth("Invalid signature"); assert!(auth_err.is_auth_error()); - let rate_limit_err = Error::rate_limit(Some(30)); + let rate_limit_err = Error::rate_limit("test", 30, Some(30000)); assert!(rate_limit_err.is_rate_limited()); assert!(rate_limit_err.is_retryable()); @@ -196,9 +216,14 @@ mod tests { #[rstest] fn test_error_display() { let err = Error::RateLimit { - retry_after: Some(60), + scope: "info", + weight: 20, + retry_after_ms: Some(60000), }; - assert_eq!(err.to_string(), "rate limited (retry_after=Some(60)s)"); + assert_eq!( + err.to_string(), + "Rate limited on info (weight=20) retry_after_ms=Some(60000)" + ); let err = Error::NonceWindow("Nonce too old".to_string()); assert_eq!(err.to_string(), "nonce window error: Nonce too old"); @@ -208,7 +233,7 @@ mod tests { fn test_retryable_errors() { assert!(Error::transport("test").is_retryable()); assert!(Error::Timeout.is_retryable()); - assert!(Error::rate_limit(None).is_retryable()); + assert!(Error::rate_limit("test", 10, None).is_retryable()); assert!(Error::http(500, "server error").is_retryable()); assert!(!Error::auth("test").is_retryable()); diff --git a/crates/adapters/hyperliquid/src/http/mod.rs b/crates/adapters/hyperliquid/src/http/mod.rs index 01dc731a26c3..1c760714ec1b 100644 --- a/crates/adapters/hyperliquid/src/http/mod.rs +++ b/crates/adapters/hyperliquid/src/http/mod.rs @@ -18,6 +18,7 @@ pub mod error; pub mod models; pub mod parse; pub mod query; +pub mod rate_limits; // Re-exports pub use crate::http::client::HyperliquidHttpClient; diff --git a/crates/adapters/hyperliquid/src/http/rate_limits.rs b/crates/adapters/hyperliquid/src/http/rate_limits.rs new file mode 100644 index 000000000000..d4b459c4a5ac --- /dev/null +++ b/crates/adapters/hyperliquid/src/http/rate_limits.rs @@ -0,0 +1,353 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved. +// https://nautechsystems.io +// +// Licensed under the GNU Lesser General Public License Version 3.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------------------------------- + +use std::time::{Duration, Instant}; + +use serde_json::Value; +use tokio::sync::Mutex; + +#[derive(Debug)] +pub struct WeightedLimiter { + capacity: f64, // tokens per minute (e.g., 1200) + refill_per_sec: f64, // capacity / 60 + state: Mutex, +} + +#[derive(Debug)] +struct State { + tokens: f64, + last_refill: Instant, +} + +impl WeightedLimiter { + pub fn per_minute(capacity: u32) -> Self { + let cap = capacity as f64; + Self { + capacity: cap, + refill_per_sec: cap / 60.0, + state: Mutex::new(State { + tokens: cap, + last_refill: Instant::now(), + }), + } + } + + /// Acquire `weight` tokens, sleeping until available. + pub async fn acquire(&self, weight: u32) { + let need = weight as f64; + loop { + let mut st = self.state.lock().await; + Self::refill_locked(&mut st, self.refill_per_sec, self.capacity); + + if st.tokens >= need { + st.tokens -= need; + return; + } + let deficit = need - st.tokens; + let secs = deficit / self.refill_per_sec; + drop(st); + tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await; + } + } + + /// Post-response debit for per-items adders (can temporarily clamp to 0). + pub async fn debit_extra(&self, extra: u32) { + if extra == 0 { + return; + } + let mut st = self.state.lock().await; + Self::refill_locked(&mut st, self.refill_per_sec, self.capacity); + st.tokens = (st.tokens - extra as f64).max(0.0); + } + + pub async fn snapshot(&self) -> RateLimitSnapshot { + let mut st = self.state.lock().await; + Self::refill_locked(&mut st, self.refill_per_sec, self.capacity); + RateLimitSnapshot { + capacity: self.capacity as u32, + tokens: st.tokens.max(0.0) as u32, + } + } + + fn refill_locked(st: &mut State, per_sec: f64, cap: f64) { + let dt = Instant::now().duration_since(st.last_refill).as_secs_f64(); + if dt > 0.0 { + st.tokens = (st.tokens + dt * per_sec).min(cap); + st.last_refill = Instant::now(); + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct RateLimitSnapshot { + pub capacity: u32, + pub tokens: u32, +} + +pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration { + use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + }; + + // Simple pseudo-random based on attempt and time + let mut hasher = DefaultHasher::new(); + attempt.hash(&mut hasher); + Instant::now().elapsed().as_nanos().hash(&mut hasher); + let hash = hasher.finish(); + + let max = (base.as_millis() as u64) + .saturating_mul(1u64 << attempt.min(16)) + .min(cap.as_millis() as u64) + .max(base.as_millis() as u64); + Duration::from_millis(hash % max) +} + +/// Classify Info requests into weight classes based on request_type. +/// Since InfoRequest uses struct with request_type string, we match on that. +pub fn info_base_weight(req: &crate::http::query::InfoRequest) -> u32 { + match req.request_type.as_str() { + // Cheap (2) + "l2Book" + | "allMids" + | "clearinghouseState" + | "orderStatus" + | "spotClearinghouseState" + | "exchangeStatus" => 2, + // Very expensive (60) + "userRole" => 60, + // Default (20) + _ => 20, + } +} + +/// Extra weight for heavy Info endpoints: +1 per 20 (most), +1 per 60 for candleSnapshot. +/// We count the largest array in the response (robust to schema variants). +pub fn info_extra_weight(req: &crate::http::query::InfoRequest, json: &Value) -> u32 { + let items = match json { + Value::Array(a) => a.len(), + Value::Object(m) => m + .values() + .filter_map(|v| v.as_array().map(|a| a.len())) + .max() + .unwrap_or(0), + _ => 0, + }; + + let unit = match req.request_type.as_str() { + "candleSnapshot" => 60usize, // +1 per 60 + "recentTrades" + | "historicalOrders" + | "userFills" + | "userFillsByTime" + | "fundingHistory" + | "userFunding" + | "nonUserFundingUpdates" + | "twapHistory" + | "userTwapSliceFills" + | "userTwapSliceFillsByTime" + | "delegatorHistory" + | "delegatorRewards" + | "validatorStats" => 20usize, // +1 per 20 + _ => return 0, + }; + (items / unit) as u32 +} + +/// Exchange: 1 + floor(batch_len / 40) +pub fn exchange_weight(action: &crate::http::query::ExchangeAction) -> u32 { + // Since ExchangeAction uses struct with action_type and params, + // we need to extract batch size from params based on action_type + let batch_size = match action.action_type.as_str() { + "order" => { + if let Some(orders) = action.params.get("orders") { + orders.as_array().map(|a| a.len()).unwrap_or(0) + } else { + 0 + } + } + "cancel" => { + if let Some(cancels) = action.params.get("cancels") { + cancels.as_array().map(|a| a.len()).unwrap_or(0) + } else { + 0 + } + } + "batchModify" => { + if let Some(modifies) = action.params.get("modifies") { + modifies.as_array().map(|a| a.len()).unwrap_or(0) + } else { + 0 + } + } + _ => 0, + }; + 1 + (batch_size as u32 / 40) +} + +//////////////////////////////////////////////////////////////////////////////// +// Tests +//////////////////////////////////////////////////////////////////////////////// + +#[cfg(test)] +mod tests { + use rstest::rstest; + use serde_json::json; + + use super::*; + use crate::http::query::{ExchangeAction, InfoRequest}; + + #[rstest] + #[case("order", "orders", 1, 1)] + #[case("order", "orders", 39, 1)] + #[case("order", "orders", 40, 2)] + #[case("order", "orders", 79, 2)] + #[case("order", "orders", 80, 3)] + #[case("cancel", "cancels", 40, 2)] + #[case("batchModify", "modifies", 40, 2)] + fn test_exchange_weight_steps_every_40( + #[case] action_type: &str, + #[case] array_key: &str, + #[case] array_len: usize, + #[case] expected_weight: u32, + ) { + let action = ExchangeAction { + action_type: action_type.to_string(), + params: json!({ array_key: vec![1; array_len] }), + }; + assert_eq!(exchange_weight(&action), expected_weight); + } + + #[rstest] + fn test_exchange_weight_non_batch_action() { + let update_leverage = ExchangeAction { + action_type: "updateLeverage".to_string(), + params: json!({ "asset": 1, "isCross": true, "leverage": 10 }), + }; + assert_eq!(exchange_weight(&update_leverage), 1); + } + + #[rstest] + #[case("l2Book", 2)] + #[case("allMids", 2)] + #[case("clearinghouseState", 2)] + #[case("orderStatus", 2)] + #[case("spotClearinghouseState", 2)] + #[case("exchangeStatus", 2)] + #[case("userRole", 60)] + #[case("userFills", 20)] + #[case("unknownEndpoint", 20)] + fn test_info_base_weights(#[case] request_type: &str, #[case] expected_weight: u32) { + let request = InfoRequest { + request_type: request_type.to_string(), + params: json!({ "coin": "BTC" }), + }; + assert_eq!(info_base_weight(&request), expected_weight); + } + + #[rstest] + fn test_info_extra_weight_no_charging() { + let l2_book = InfoRequest { + request_type: "l2Book".to_string(), + params: json!({ "coin": "BTC" }), + }; + let large_json = json!(vec![1; 1000]); + assert_eq!(info_extra_weight(&l2_book, &large_json), 0); + } + + #[rstest] + fn test_info_extra_weight_complex_json() { + let user_fills = InfoRequest { + request_type: "userFills".to_string(), + params: json!({ "user": "0x123" }), + }; + let complex_json = json!({ + "fills": vec![1; 40], + "orders": vec![1; 20], + "other": "data" + }); + assert_eq!(info_extra_weight(&user_fills, &complex_json), 2); // largest array is 40, 40/20 = 2 + } + + #[tokio::test] + async fn test_limiter_roughly_caps_to_capacity() { + let limiter = WeightedLimiter::per_minute(1200); + + // Consume ~1200 in quick succession + for _ in 0..60 { + limiter.acquire(20).await; // 60 * 20 = 1200 + } + + // The next acquire should take time for tokens to refill + let t0 = std::time::Instant::now(); + limiter.acquire(20).await; + let elapsed = t0.elapsed(); + + // Should take at least some time to refill (allow some jitter/timing variance) + assert!( + elapsed.as_millis() >= 500, + "Expected significant delay, got {}ms", + elapsed.as_millis() + ); + } + + #[tokio::test] + async fn test_limiter_debit_extra_works() { + let limiter = WeightedLimiter::per_minute(100); + + // Start with full bucket + let snapshot = limiter.snapshot().await; + assert_eq!(snapshot.capacity, 100); + assert_eq!(snapshot.tokens, 100); + + // Acquire some tokens + limiter.acquire(30).await; + let snapshot = limiter.snapshot().await; + assert_eq!(snapshot.tokens, 70); + + // Debit extra + limiter.debit_extra(20).await; + let snapshot = limiter.snapshot().await; + assert_eq!(snapshot.tokens, 50); + + // Debit more than available (should clamp to 0) + limiter.debit_extra(100).await; + let snapshot = limiter.snapshot().await; + assert_eq!(snapshot.tokens, 0); + } + + #[rstest] + #[case(0, 100)] + #[case(1, 200)] + #[case(2, 400)] + fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) { + let base = Duration::from_millis(100); + let cap = Duration::from_secs(5); + + let delay = backoff_full_jitter(attempt, base, cap); + + // Should be in expected ranges (allowing for jitter) + assert!(delay.as_millis() <= max_expected_ms as u128); + } + + #[rstest] + fn test_backoff_full_jitter_respects_cap() { + let base = Duration::from_millis(100); + let cap = Duration::from_secs(5); + + let delay_high = backoff_full_jitter(10, base, cap); + assert!(delay_high.as_millis() <= cap.as_millis()); + } +}