Skip to content

Commit 71d7c0e

Browse files
committed
add retry policy
1 parent dd968b4 commit 71d7c0e

7 files changed

Lines changed: 202 additions & 19 deletions

File tree

rust/Cargo.lock

Lines changed: 35 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ governor = { version = "0.5.1", features = ["dashmap"] }
8181
http = { version = "1.1.0" }
8282
http-body-util = "0.1.0"
8383
httpmock = "0.7.0"
84+
mockito = "1"
8485
hyper = { version = "1.6", features = ["server", "http1", "http2"] }
8586
hyper-util = { version = "0.1", features = ["server", "server-graceful", "tokio", "http1", "http2"] }
8687
itoa = "1.0.15"

rust/batch-import-worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,4 @@ posthog-rs = { workspace = true }
5151
[dev-dependencies]
5252
httpmock = { workspace = true }
5353
flate2 = { workspace = true }
54+
mockito = { workspace = true }

rust/batch-import-worker/src/emit/capture.rs

Lines changed: 162 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,20 @@ use anyhow::Error;
77
use async_trait::async_trait;
88
use common_types::{InternallyCapturedEvent, RawEvent};
99
use posthog_rs::{Client, Event};
10-
use tracing::info;
10+
use tracing::{info, warn};
1111

12-
use crate::error::RateLimitedError;
12+
use crate::job::backoff::BackoffPolicy;
1313

1414
use super::{Emitter, Transaction};
1515

16+
const MAX_RETRIES: u32 = 5;
17+
18+
/// Retry policy for transient HTTP errors from the capture service. Starts at
19+
/// 1 second and doubles up to 30 seconds, giving roughly a minute of total
20+
/// retry time before surfacing the error.
21+
const RETRY_POLICY: BackoffPolicy =
22+
BackoffPolicy::new(Duration::from_secs(1), 2.0, Duration::from_secs(30));
23+
1624
pub struct CaptureEmitter {
1725
client: Client,
1826
send_rate: u64,
@@ -23,6 +31,7 @@ pub struct CaptureTransaction<'a> {
2331
send_rate: u64,
2432
start: Instant,
2533
events: Mutex<Vec<Event>>,
34+
retry_policy: BackoffPolicy,
2635
}
2736

2837
impl CaptureEmitter {
@@ -39,6 +48,7 @@ impl Emitter for CaptureEmitter {
3948
send_rate: self.send_rate,
4049
start: Instant::now(),
4150
events: Mutex::new(Vec::new()),
51+
retry_policy: RETRY_POLICY,
4252
}))
4353
}
4454
}
@@ -78,6 +88,13 @@ fn convert_event(ice: &InternallyCapturedEvent) -> Result<Event, Error> {
7888
Ok(event)
7989
}
8090

91+
fn is_retryable(err: &posthog_rs::Error) -> bool {
92+
matches!(
93+
err,
94+
posthog_rs::Error::RateLimit { .. } | posthog_rs::Error::ServerError { .. }
95+
)
96+
}
97+
8198
#[async_trait]
8299
impl<'a> Transaction<'a> for CaptureTransaction<'a> {
83100
async fn emit(&self, data: &[InternallyCapturedEvent]) -> Result<(), Error> {
@@ -103,24 +120,37 @@ impl<'a> Transaction<'a> for CaptureTransaction<'a> {
103120
let to_sleep = min_duration.saturating_sub(txn_elapsed);
104121

105122
info!(
106-
"sending {} events to capture in {:?}, minimum send duration is {:?}, sleeping for {:?}",
107-
count, txn_elapsed, min_duration, to_sleep
123+
"sending {count} events to capture in {txn_elapsed:?}, minimum send duration is {min_duration:?}, sleeping for {to_sleep:?}"
108124
);
109125

110-
let result = self.client.capture_batch(events, true).await;
111-
112-
match result {
113-
Ok(()) => {
114-
info!("successfully sent batch to capture");
115-
Ok(to_sleep)
116-
}
117-
Err(e @ posthog_rs::Error::RateLimit { retry_after }) => Err(RateLimitedError {
118-
retry_after,
119-
source: Box::new(e),
126+
for attempt in 0..=MAX_RETRIES {
127+
match self.client.capture_batch(events.clone(), true).await {
128+
Ok(()) => break,
129+
Err(e) if is_retryable(&e) && attempt < MAX_RETRIES => {
130+
// Prefer the server's Retry-After hint when present (capped
131+
// to our max delay), otherwise fall back to exponential backoff.
132+
let delay = match &e {
133+
posthog_rs::Error::RateLimit {
134+
retry_after: Some(ra),
135+
} => (*ra).min(self.retry_policy.max_delay),
136+
_ => self.retry_policy.next_delay(attempt),
137+
};
138+
warn!(
139+
"transient capture error, retrying (attempt {attempt}/{MAX_RETRIES}, delay {delay:?}): {e}"
140+
);
141+
tokio::time::sleep(delay).await;
142+
}
143+
Err(e) => {
144+
return Err(Error::msg(format!(
145+
"capture batch failed after {} attempts: {e}",
146+
attempt + 1
147+
)));
148+
}
120149
}
121-
.into()),
122-
Err(e) => Err(Error::msg(e.to_string())),
123150
}
151+
152+
info!("successfully sent batch to capture");
153+
Ok(to_sleep)
124154
}
125155
}
126156

@@ -235,4 +265,120 @@ mod tests {
235265
assert_eq!(get_min_txn_duration(1000, 1000), Duration::from_secs(1));
236266
assert_eq!(get_min_txn_duration(500, 1000), Duration::from_secs(2));
237267
}
268+
269+
#[test]
270+
fn test_is_retryable() {
271+
assert!(is_retryable(&posthog_rs::Error::RateLimit {
272+
retry_after: None
273+
}));
274+
assert!(is_retryable(&posthog_rs::Error::ServerError {
275+
status: 500,
276+
message: "internal".to_string()
277+
}));
278+
assert!(!is_retryable(&posthog_rs::Error::BadRequest(
279+
"bad".to_string()
280+
)));
281+
assert!(!is_retryable(&posthog_rs::Error::Connection(
282+
"timeout".to_string()
283+
)));
284+
}
285+
286+
async fn make_client(base_url: &str) -> Client {
287+
let options: posthog_rs::ClientOptions = ("test_api_key", base_url).into();
288+
posthog_rs::client(options).await
289+
}
290+
291+
fn make_transaction(client: &Client) -> Box<CaptureTransaction<'_>> {
292+
let mut event = Event::new("test", "user1");
293+
event.insert_prop("key", "value").unwrap();
294+
295+
Box::new(CaptureTransaction {
296+
client,
297+
send_rate: 10_000,
298+
start: Instant::now(),
299+
events: Mutex::new(vec![event]),
300+
retry_policy: BackoffPolicy::new(Duration::ZERO, 1.0, Duration::ZERO),
301+
})
302+
}
303+
304+
#[tokio::test]
305+
async fn test_commit_write_succeeds_on_first_try() {
306+
let mut server = mockito::Server::new_async().await;
307+
let mock = server
308+
.mock("POST", "/batch/")
309+
.with_status(200)
310+
.expect(1)
311+
.create();
312+
313+
let client = make_client(&server.url()).await;
314+
let txn = make_transaction(&client);
315+
316+
let result = txn.commit_write().await;
317+
assert!(result.is_ok());
318+
mock.assert();
319+
}
320+
321+
#[tokio::test]
322+
async fn test_commit_write_retries_on_500_then_succeeds() {
323+
let mut server = mockito::Server::new_async().await;
324+
let fail_mock = server
325+
.mock("POST", "/batch/")
326+
.with_status(500)
327+
.with_body("internal error")
328+
.expect(2)
329+
.create();
330+
let success_mock = server
331+
.mock("POST", "/batch/")
332+
.with_status(200)
333+
.expect(1)
334+
.create();
335+
336+
let client = make_client(&server.url()).await;
337+
let txn = make_transaction(&client);
338+
339+
let result = txn.commit_write().await;
340+
assert!(result.is_ok());
341+
fail_mock.assert();
342+
success_mock.assert();
343+
}
344+
345+
#[tokio::test]
346+
async fn test_commit_write_fails_immediately_on_400() {
347+
let mut server = mockito::Server::new_async().await;
348+
let mock = server
349+
.mock("POST", "/batch/")
350+
.with_status(400)
351+
.with_body("bad request")
352+
.expect(1)
353+
.create();
354+
355+
let client = make_client(&server.url()).await;
356+
let txn = make_transaction(&client);
357+
358+
let result = txn.commit_write().await;
359+
assert!(result.is_err());
360+
assert!(result
361+
.unwrap_err()
362+
.to_string()
363+
.contains("after 1 attempts"));
364+
mock.assert();
365+
}
366+
367+
#[tokio::test]
368+
async fn test_commit_write_exhausts_retries_on_persistent_500() {
369+
let mut server = mockito::Server::new_async().await;
370+
let mock = server
371+
.mock("POST", "/batch/")
372+
.with_status(500)
373+
.with_body("internal error")
374+
.expect((MAX_RETRIES + 1) as usize)
375+
.create();
376+
377+
let client = make_client(&server.url()).await;
378+
let txn = make_transaction(&client);
379+
380+
let result = txn.commit_write().await;
381+
assert!(result.is_err());
382+
mock.assert();
383+
}
238384
}

rust/batch-import-worker/src/error/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub fn get_user_message(error: &anyhow::Error) -> String {
4141
pub struct RateLimitedError {
4242
pub retry_after: Option<Duration>,
4343
#[source]
44-
pub source: Box<dyn std::error::Error + Send + Sync + 'static>,
44+
pub source: reqwest::Error,
4545
}
4646

4747
/// Extracts a Retry-After duration if a RateLimitedError is present in the error chain

rust/batch-import-worker/src/job/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,7 @@ mod tests {
744744
.expect("retry-after parsed");
745745
let rl = crate::error::RateLimitedError {
746746
retry_after: Some(retry_after),
747-
source: Box::new(http_err),
747+
source: http_err,
748748
};
749749
let err = anyhow::Error::from(rl);
750750

rust/batch-import-worker/src/source/date_range_export.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ impl DateRangeExportSource {
389389
let retry_after = parse_retry_after_header(&headers_clone);
390390
let rl = RateLimitedError {
391391
retry_after,
392-
source: Box::new(http_err),
392+
source: http_err,
393393
};
394394
let err = anyhow::Error::from(rl).context(crate::error::UserError::new(
395395
"Rate limit exceeded -- pause the job and try again later",

0 commit comments

Comments
 (0)