Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 140 additions & 24 deletions crates/adapters/hyperliquid/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use std::{
collections::HashMap,
num::NonZeroU32,
sync::{Arc, LazyLock},
time::Duration,
};

use anyhow::Context;
use nautilus_core::consts::NAUTILUS_USER_AGENT;
use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota};
use reqwest::{Method, header::USER_AGENT};
use serde_json::Value;
use tokio::time::sleep;

use crate::{
common::{
Expand All @@ -44,6 +46,10 @@ use crate::{
HyperliquidL2Book, HyperliquidMeta, HyperliquidOrderStatus,
},
query::{ExchangeAction, InfoRequest},
rate_limits::{
RateLimitSnapshot, WeightedLimiter, backoff_full_jitter, exchange_weight,
info_base_weight, info_extra_weight,
},
},
signing::{
HyperliquidActionType, HyperliquidEip712Signer, NonceManager, SignRequest, types::SignerId,
Expand All @@ -68,6 +74,10 @@ pub struct HyperliquidHttpClient {
signer: Option<HyperliquidEip712Signer>,
nonce_manager: Option<Arc<NonceManager>>,
vault_address: Option<VaultAddress>,
rest_limiter: Arc<WeightedLimiter>,
rate_limit_backoff_base: Duration,
rate_limit_backoff_cap: Duration,
rate_limit_max_attempts_info: u32,
}

impl Default for HyperliquidHttpClient {
Expand Down Expand Up @@ -98,6 +108,10 @@ impl HyperliquidHttpClient {
signer: None,
nonce_manager: None,
vault_address: None,
rest_limiter: Arc::new(WeightedLimiter::per_minute(1200)),
rate_limit_backoff_base: Duration::from_millis(125),
rate_limit_backoff_cap: Duration::from_secs(5),
rate_limit_max_attempts_info: 3,
}
}

Expand All @@ -122,6 +136,10 @@ impl HyperliquidHttpClient {
signer: Some(signer),
nonce_manager: Some(nonce_manager),
vault_address: secrets.vault_address,
rest_limiter: Arc::new(WeightedLimiter::per_minute(1200)),
rate_limit_backoff_base: Duration::from_millis(125),
rate_limit_backoff_cap: Duration::from_secs(5),
rate_limit_max_attempts_info: 3,
}
}

Expand All @@ -137,6 +155,15 @@ impl HyperliquidHttpClient {
Ok(Self::with_credentials(&secrets, None))
}

/// Configure rate limiting parameters (chainable).
pub fn with_rate_limits(mut self) -> Self {
self.rest_limiter = Arc::new(WeightedLimiter::per_minute(1200));
self.rate_limit_backoff_base = Duration::from_millis(125);
self.rate_limit_backoff_cap = Duration::from_secs(5);
self.rate_limit_max_attempts_info = 3;
self
}

/// Returns whether this client is configured for testnet.
#[must_use]
pub fn is_testnet(&self) -> bool {
Expand Down Expand Up @@ -181,16 +208,91 @@ impl HyperliquidHttpClient {
serde_json::from_value(response).map_err(Error::Serde)
}

/// Generic info request method that returns raw JSON (useful for new endpoints and testing).
pub async fn send_info_request_raw(&self, request: &InfoRequest) -> Result<Value> {
self.send_info_request(request).await
}

/// Send a raw info request and return the JSON response.
async fn send_info_request(&self, request: &InfoRequest) -> Result<Value> {
let base_w = info_base_weight(request);
self.rest_limiter.acquire(base_w).await;

let mut attempt = 0u32;
loop {
let response = self.http_roundtrip_info(request).await?;

if response.status.is_success() {
// decode once to count items, then materialize T
let val: Value = serde_json::from_slice(&response.body).map_err(Error::Serde)?;
let extra = info_extra_weight(request, &val);
if extra > 0 {
self.rest_limiter.debit_extra(extra).await;
tracing::debug!(endpoint=?request, base_w, extra, "info: debited extra weight");
}
return Ok(val);
}

// 429 → respect Retry-After; else jittered backoff. Retry Info only.
if response.status.as_u16() == 429 {
if attempt >= self.rate_limit_max_attempts_info {
let ra = self.parse_retry_after_simple(&response.headers);
return Err(Error::rate_limit("info", base_w, ra));
}
let delay = self
.parse_retry_after_simple(&response.headers)
.map(Duration::from_millis)
.unwrap_or_else(|| {
backoff_full_jitter(
attempt,
self.rate_limit_backoff_base,
self.rate_limit_backoff_cap,
)
});
tracing::warn!(endpoint=?request, attempt, wait_ms=?delay.as_millis(), "429 Too Many Requests; backing off");
attempt += 1;
sleep(delay).await;
// tiny re-acquire to avoid stampede exactly on minute boundary
self.rest_limiter.acquire(1).await;
continue;
}

// transient 5xx: treat like retryable Info (bounded)
if (response.status.is_server_error() || response.status.as_u16() == 408)
&& attempt < self.rate_limit_max_attempts_info
{
let delay = backoff_full_jitter(
attempt,
self.rate_limit_backoff_base,
self.rate_limit_backoff_cap,
);
tracing::warn!(endpoint=?request, attempt, status=?response.status.as_u16(), wait_ms=?delay.as_millis(), "transient error; retrying");
attempt += 1;
sleep(delay).await;
continue;
}

// non-retryable or exhausted
let error_body = String::from_utf8_lossy(&response.body);
return Err(Error::http(
response.status.as_u16(),
error_body.to_string(),
));
}
}

/// Raw HTTP roundtrip for info requests - returns the original HttpResponse
async fn http_roundtrip_info(
&self,
request: &InfoRequest,
) -> Result<nautilus_network::http::HttpResponse> {
let url = &self.base_info;
let body = serde_json::to_value(request).map_err(Error::Serde)?;
let body_bytes = serde_json::to_string(&body)
.map_err(Error::Serde)?
.into_bytes();

let response = self
.client
self.client
.request(
Method::POST,
url.clone(),
Expand All @@ -200,17 +302,13 @@ impl HyperliquidHttpClient {
None,
)
.await
.map_err(Error::from_http_client)?;
.map_err(Error::from_http_client)
}

if response.status.is_success() {
serde_json::from_slice(&response.body).map_err(Error::Serde)
} else {
let error_body = String::from_utf8_lossy(&response.body);
Err(Error::http(
response.status.as_u16(),
error_body.to_string(),
))
}
/// Parse Retry-After from response headers (simplified)
fn parse_retry_after_simple(&self, headers: &HashMap<String, String>) -> Option<u64> {
let retry_after = headers.get("retry-after")?;
retry_after.parse::<u64>().ok().map(|s| s * 1000) // convert seconds to ms
}

// ---------------- EXCHANGE ENDPOINTS ---------------------------------------
Expand All @@ -220,6 +318,9 @@ impl HyperliquidHttpClient {
&self,
action: &ExchangeAction,
) -> Result<HyperliquidExchangeResponse> {
let w = exchange_weight(action);
self.rest_limiter.acquire(w).await;

let signer = self
.signer
.as_ref()
Expand Down Expand Up @@ -257,12 +358,32 @@ impl HyperliquidHttpClient {
HyperliquidExchangeRequest::new(action.clone(), time_nonce.as_millis() as u64, sig)
};

let response = self.http_roundtrip_exchange(&request).await?;

if response.status.is_success() {
serde_json::from_slice(&response.body).map_err(Error::Serde)
} else if response.status.as_u16() == 429 {
let ra = self.parse_retry_after_simple(&response.headers);
Err(Error::rate_limit("exchange", w, ra))
} else {
let error_body = String::from_utf8_lossy(&response.body);
Err(Error::http(
response.status.as_u16(),
error_body.to_string(),
))
}
}

/// Raw HTTP roundtrip for exchange requests
async fn http_roundtrip_exchange(
&self,
request: &HyperliquidExchangeRequest<ExchangeAction>,
) -> Result<nautilus_network::http::HttpResponse> {
let url = &self.base_exchange;
let body = serde_json::to_string(&request).map_err(Error::Serde)?;
let body_bytes = body.into_bytes();

let response = self
.client
self.client
.request(
Method::POST,
url.clone(),
Expand All @@ -272,17 +393,12 @@ impl HyperliquidHttpClient {
None,
)
.await
.map_err(Error::from_http_client)?;
.map_err(Error::from_http_client)
}

if response.status.is_success() {
serde_json::from_slice(&response.body).map_err(Error::Serde)
} else {
let error_body = String::from_utf8_lossy(&response.body);
Err(Error::http(
response.status.as_u16(),
error_body.to_string(),
))
}
/// Best-effort gauge for diagnostics/metrics
pub async fn rest_limiter_snapshot(&self) -> RateLimitSnapshot {
self.rest_limiter.snapshot().await
}

// ---------------- INTERNALS -----------------------------------------------
Expand Down
43 changes: 34 additions & 9 deletions crates/adapters/hyperliquid/src/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ pub enum Error {
Auth(String),

/// Rate limiting errors with optional retry information
#[error("rate limited (retry_after={retry_after:?}s)")]
RateLimit { retry_after: Option<u64> },
#[error("Rate limited on {scope} (weight={weight}) retry_after_ms={retry_after_ms:?}")]
RateLimit {
scope: &'static str,
weight: u32,
retry_after_ms: Option<u64>,
},

/// Nonce window violations (nonces must be within time window and unique)
#[error("nonce window error: {0}")]
Expand Down Expand Up @@ -83,8 +87,12 @@ impl Error {
}

/// Create a rate limit error
pub fn rate_limit(retry_after: Option<u64>) -> Self {
Self::RateLimit { retry_after }
pub fn rate_limit(scope: &'static str, weight: u32, retry_after_ms: Option<u64>) -> Self {
Self::RateLimit {
scope,
weight,
retry_after_ms,
}
}

/// Create a nonce window error
Expand Down Expand Up @@ -115,6 +123,18 @@ impl Error {
}
}

/// Create an error from HTTP status code and body
pub fn from_http_status(status: reqwest::StatusCode, body: &[u8]) -> Self {
let message = String::from_utf8_lossy(body).to_string();
match status.as_u16() {
401 | 403 => Self::auth(format!("HTTP {}: {}", status.as_u16(), message)),
400 => Self::bad_request(format!("HTTP {}: {}", status.as_u16(), message)),
429 => Self::rate_limit("unknown", 0, None),
500..=599 => Self::exchange(format!("HTTP {}: {}", status.as_u16(), message)),
_ => Self::http(status.as_u16(), message),
}
}

/// Map reqwest errors to appropriate error types
pub fn from_reqwest(error: reqwest::Error) -> Self {
if error.is_timeout() {
Expand All @@ -124,7 +144,7 @@ impl Error {
match status_code {
401 | 403 => Self::auth(format!("HTTP {}: authentication failed", status_code)),
400 => Self::bad_request(format!("HTTP {}: bad request", status_code)),
429 => Self::rate_limit(None), // TODO: Extract retry-after header
429 => Self::rate_limit("unknown", 0, None), // TODO: Extract retry-after header
500..=599 => Self::exchange(format!("HTTP {}: server error", status_code)),
_ => Self::http(status_code, format!("HTTP error: {}", error)),
}
Expand Down Expand Up @@ -185,7 +205,7 @@ mod tests {
let auth_err = Error::auth("Invalid signature");
assert!(auth_err.is_auth_error());

let rate_limit_err = Error::rate_limit(Some(30));
let rate_limit_err = Error::rate_limit("test", 30, Some(30000));
assert!(rate_limit_err.is_rate_limited());
assert!(rate_limit_err.is_retryable());

Expand All @@ -196,9 +216,14 @@ mod tests {
#[rstest]
fn test_error_display() {
let err = Error::RateLimit {
retry_after: Some(60),
scope: "info",
weight: 20,
retry_after_ms: Some(60000),
};
assert_eq!(err.to_string(), "rate limited (retry_after=Some(60)s)");
assert_eq!(
err.to_string(),
"Rate limited on info (weight=20) retry_after_ms=Some(60000)"
);

let err = Error::NonceWindow("Nonce too old".to_string());
assert_eq!(err.to_string(), "nonce window error: Nonce too old");
Expand All @@ -208,7 +233,7 @@ mod tests {
fn test_retryable_errors() {
assert!(Error::transport("test").is_retryable());
assert!(Error::Timeout.is_retryable());
assert!(Error::rate_limit(None).is_retryable());
assert!(Error::rate_limit("test", 10, None).is_retryable());
assert!(Error::http(500, "server error").is_retryable());

assert!(!Error::auth("test").is_retryable());
Expand Down
1 change: 1 addition & 0 deletions crates/adapters/hyperliquid/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod error;
pub mod models;
pub mod parse;
pub mod query;
pub mod rate_limits;

// Re-exports
pub use crate::http::client::HyperliquidHttpClient;
Loading
Loading