Skip to content

Commit a8d2c89

Browse files
Merge branch 'master' into feat/transactions
2 parents bcebb13 + 7a7f3af commit a8d2c89

File tree

3 files changed

+70
-20
lines changed

3 files changed

+70
-20
lines changed

src/connection_manager.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,59 @@ impl Default for OperationRetryOptions {
6868
}
6969
}
7070

71+
impl OperationRetryOptions {
72+
pub fn allow_retry(&self, current: u32) -> bool {
73+
self.max_retries.is_none() || current < self.max_retries.unwrap()
74+
}
75+
}
76+
77+
#[cfg(test)]
78+
mod tests {
79+
use super::*;
80+
81+
#[test]
82+
fn test_allow_retry_no_max_retries() {
83+
let options = OperationRetryOptions {
84+
operation_timeout: Duration::from_secs(30),
85+
retry_delay: Duration::from_secs(5),
86+
max_retries: None,
87+
};
88+
89+
// If max_retries is None, it should always allow retries
90+
assert!(options.allow_retry(0));
91+
assert!(options.allow_retry(100));
92+
assert!(options.allow_retry(u32::MAX));
93+
}
94+
95+
#[test]
96+
fn test_allow_retry_with_max_retries() {
97+
let options = OperationRetryOptions {
98+
operation_timeout: Duration::from_secs(30),
99+
retry_delay: Duration::from_secs(5),
100+
max_retries: Some(3),
101+
};
102+
103+
// If max_retries is set to 3, we allow retries for current < 3
104+
assert!(options.allow_retry(0)); // current < 3
105+
assert!(options.allow_retry(2)); // current < 3
106+
assert!(!options.allow_retry(3)); // current == 3
107+
assert!(!options.allow_retry(4)); // current > 3
108+
}
109+
110+
#[test]
111+
fn test_allow_retry_max_retries_is_zero() {
112+
let options = OperationRetryOptions {
113+
operation_timeout: Duration::from_secs(30),
114+
retry_delay: Duration::from_secs(5),
115+
max_retries: Some(0),
116+
};
117+
118+
// If max_retries is 0, it should not allow any retries
119+
assert!(!options.allow_retry(0)); // current == 0
120+
assert!(!options.allow_retry(1)); // current > 0
121+
}
122+
}
123+
71124
/// configuration for TLS connections
72125
#[derive(Debug, Clone)]
73126
pub struct TlsOptions {

src/producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1194,7 +1194,7 @@ impl<'a, T, Exe: Executor> MessageBuilder<'a, T, Exe> {
11941194
}
11951195
}
11961196

1197-
impl<'a, T: SerializeMessage + Sized, Exe: Executor> MessageBuilder<'a, T, Exe> {
1197+
impl<T: SerializeMessage + Sized, Exe: Executor> MessageBuilder<'_, T, Exe> {
11981198
/// sends the message through the producer that created it
11991199
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
12001200
#[deprecated = "instead use send_non_blocking"]

src/retry_op.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,26 +54,23 @@ pub async fn handle_retry_error<Exe: Executor>(
5454
return Err(err.into());
5555
}
5656
};
57-
match operation_retry_options.max_retries {
58-
Some(max_retries) if current_retries < max_retries => {
59-
error!(
60-
"{operation_name}({topic}) answered {kind}{text}, retrying request after {:?} (max_retries = {max_retries})",
61-
operation_retry_options.retry_delay
62-
);
63-
client
64-
.executor
65-
.delay(operation_retry_options.retry_delay)
66-
.await;
67-
68-
*addr = client.lookup_topic(topic).await?;
69-
*connection = client.manager.get_connection(addr).await?;
70-
Ok(())
71-
}
72-
_ => {
73-
error!("{operation_name}({topic}) answered {kind}{text}, reached max retries");
74-
Err(err.into())
75-
}
57+
if !(operation_retry_options.allow_retry(current_retries)) {
58+
error!("{operation_name}({topic}) answered {kind}{text}, reached max retries");
59+
return Err(err.into());
7660
}
61+
error!(
62+
"{operation_name}({topic}) answered {kind}{text}, retrying request after {:?} (max_retries = {:?})",
63+
operation_retry_options.retry_delay,
64+
operation_retry_options.max_retries
65+
);
66+
client
67+
.executor
68+
.delay(operation_retry_options.retry_delay)
69+
.await;
70+
71+
*addr = client.lookup_topic(topic).await?;
72+
*connection = client.manager.get_connection(addr).await?;
73+
Ok(())
7774
}
7875

7976
pub async fn retry_subscribe_consumer<Exe: Executor>(

0 commit comments

Comments
 (0)