Skip to content

Commit ff62b0a

Browse files
committed
fix: prevent check_connections from dropping connections during setup
When using topic_regex or multi-topic consumers, there is a race condition between getting a connection for topic refresh and the check_connections cleanup task. The strong_count check could cause connections to be dropped if check_connections runs while connect() is in progress, since connect() temporarily downgrades the Arc ref. This commit adds a 5-second grace period for newly created connections, allowing them to be picked up by consumers/producers before being considered for cleanup. Fixes: #386
1 parent 50baf97 commit ff62b0a

File tree

1 file changed

+92
-50
lines changed

1 file changed

+92
-50
lines changed

src/connection_manager.rs

Lines changed: 92 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{collections::HashMap, sync::Arc, time::Duration};
1+
use std::{
2+
collections::HashMap,
3+
sync::Arc,
4+
time::{Duration, Instant},
5+
};
26

37
use futures::{channel::oneshot, lock::Mutex};
48
use rand::Rng;
@@ -150,7 +154,10 @@ impl Default for TlsOptions {
150154
}
151155

152156
enum ConnectionStatus<Exe: Executor> {
153-
Connected(Arc<Connection<Exe>>),
157+
Connected {
158+
conn: Arc<Connection<Exe>>,
159+
created_at: Instant,
160+
},
154161
Connecting(Vec<oneshot::Sender<Result<Arc<Connection<Exe>>, ConnectionError>>>),
155162
}
156163

@@ -277,29 +284,45 @@ impl<Exe: Executor> ConnectionManager<Exe> {
277284
&self,
278285
broker: &BrokerAddress,
279286
) -> Result<Arc<Connection<Exe>>, ConnectionError> {
287+
trace!("Looking for connection to {}...", broker.url);
280288
let rx = {
281289
let mut conns = self.connections.lock().await;
282290
match conns.get_mut(broker) {
283-
None => None,
284-
Some(ConnectionStatus::Connected(conn)) => {
291+
None => {
292+
trace!("[] no connection for {}", broker.url);
293+
None
294+
}
295+
Some(ConnectionStatus::Connected { conn, .. }) => {
285296
if conn.is_valid() {
297+
trace!("[connected] returning valid connection for {}", broker.url);
286298
return Ok(conn.clone());
287299
} else {
300+
warn!("[connected] invalid connection for {}", broker.url);
288301
None
289302
}
290303
}
291304
Some(ConnectionStatus::Connecting(ref mut v)) => {
292305
let (tx, rx) = oneshot::channel();
306+
debug!(
307+
"[connecting...] existing pending connection to {}",
308+
broker.url
309+
);
293310
v.push(tx);
294311
Some(rx)
295312
}
296313
}
297314
};
298315

299316
match rx {
300-
None => self.connect(broker.clone()).await,
317+
None => {
318+
info!("No existing connection, creating new for {}", broker.url);
319+
self.connect(broker.clone()).await
320+
}
301321
Some(rx) => match rx.await {
302-
Ok(res) => res,
322+
Ok(res) => {
323+
debug!("Connection found for {}", broker.url);
324+
res
325+
}
303326
Err(_) => Err(ConnectionError::Canceled),
304327
},
305328
}
@@ -310,8 +333,6 @@ impl<Exe: Executor> ConnectionManager<Exe> {
310333
&self,
311334
broker: &BrokerAddress,
312335
) -> Result<Arc<Connection<Exe>>, ConnectionError> {
313-
debug!("ConnectionManager::connect({:?})", broker);
314-
315336
let rx = {
316337
match self
317338
.connections
@@ -329,7 +350,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
329350
Some(rx)
330351
}
331352
}
332-
ConnectionStatus::Connected(_) => None,
353+
ConnectionStatus::Connected { .. } => None,
333354
}
334355
};
335356
if let Some(rx) = rx {
@@ -462,6 +483,20 @@ impl<Exe: Executor> ConnectionManager<Exe> {
462483
let res = self.executor.spawn(Box::pin(async move {
463484
use crate::futures::StreamExt;
464485
while let Some(()) = interval.next().await {
486+
let Some(strong_conn) = weak_conn.upgrade() else {
487+
debug!(
488+
"connection {} was dropped, stopping keepalive task",
489+
connection_id
490+
);
491+
break;
492+
};
493+
if !strong_conn.is_valid() {
494+
debug!(
495+
"connection {} is not valid anymore, stopping keepalive task",
496+
connection_id
497+
);
498+
break;
499+
}
465500
if let Some(url) = proxy_to_broker_url.as_ref() {
466501
trace!(
467502
"will ping connection {} to {} via proxy {}",
@@ -472,50 +507,38 @@ impl<Exe: Executor> ConnectionManager<Exe> {
472507
} else {
473508
trace!("will ping connection {} to {}", connection_id, broker_url);
474509
}
475-
if let Some(strong_conn) = weak_conn.upgrade() {
476-
if !strong_conn.is_valid() {
477-
trace!(
478-
"connection {} is not valid anymore, skip heart beat task",
479-
connection_id
480-
);
481-
break;
482-
}
483-
if let Err(e) = strong_conn.sender().send_ping().await {
484-
error!(
485-
"could not ping connection {} to the server at {}: {}",
486-
connection_id, broker_url, e
487-
);
488-
}
489-
} else {
490-
// if the strong pointers were dropped, we can stop the heartbeat for this
491-
// connection
492-
trace!("strong connection was dropped, stopping keepalive task");
493-
break;
510+
if let Err(e) = strong_conn.sender().send_ping().await {
511+
error!(
512+
"could not ping connection {} to the server at {}: {}",
513+
connection_id, broker_url, e
514+
);
494515
}
495516
}
496517
}));
497518
if res.is_err() {
498-
error!("the executor could not spawn the heartbeat future");
519+
error!("the executor could not spawn the keepalive future");
499520
return Err(ConnectionError::Shutdown);
500521
}
501522

502-
let old = self
503-
.connections
504-
.lock()
505-
.await
506-
.insert(broker, ConnectionStatus::Connected(c.clone()));
523+
let old = self.connections.lock().await.insert(
524+
broker,
525+
ConnectionStatus::Connected {
526+
conn: c.clone(),
527+
created_at: Instant::now(),
528+
},
529+
);
507530
match old {
508531
Some(ConnectionStatus::Connecting(mut v)) => {
509532
//info!("was in connecting state({} waiting)", v.len());
510533
for tx in v.drain(..) {
511534
let _ = tx.send(Ok(c.clone()));
512535
}
513536
}
514-
Some(ConnectionStatus::Connected(_)) => {
537+
Some(ConnectionStatus::Connected { .. }) => {
515538
info!("removing old connection");
516539
}
517540
None => {
518-
//info!("setting up new connection");
541+
debug!("setting up new connection");
519542
}
520543
};
521544

@@ -529,24 +552,43 @@ impl<Exe: Executor> ConnectionManager<Exe> {
529552
self.connections
530553
.lock()
531554
.await
532-
.retain(|_, ref mut connection| match connection {
533-
ConnectionStatus::Connecting(_) => true,
534-
ConnectionStatus::Connected(conn) => {
535-
// if the manager holds the only reference to that
536-
// connection, we can remove it from the manager
537-
// no need for special synchronization here: we're already
538-
// in a mutex, and a case appears where the Arc is cloned
539-
// somewhere at the same time, that just means the manager
540-
// will create a new connection the next time it is asked
555+
.retain(|broker, ref mut connection| match connection {
556+
ConnectionStatus::Connecting(_) => {
557+
trace!("Retaining connection in `Connecting` state");
558+
true
559+
}
560+
ConnectionStatus::Connected { conn, created_at } => {
561+
// Grace period of 5 seconds for newly created connections
562+
// to allow time for consumers/producers to grab a reference
563+
let grace_period = Duration::from_secs(5);
564+
let age = created_at.elapsed();
565+
let in_grace_period = age < grace_period;
541566
let strong_count = Arc::strong_count(conn);
567+
let is_valid = conn.is_valid();
568+
569+
// Keep connection if valid AND (someone is using it OR it's new)
570+
let should_retain = is_valid && (strong_count > 1 || in_grace_period);
571+
542572
trace!(
543-
"checking connection {}, is valid? {}, strong_count {}",
573+
"checking broker {} connection {}, is_valid: {}, strong_count: {}, age: {:?}, in_grace_period: {}",
574+
broker.url,
544575
conn.id(),
545-
conn.is_valid(),
546-
strong_count
576+
is_valid,
577+
strong_count,
578+
age,
579+
in_grace_period
547580
);
548-
549-
conn.is_valid() && strong_count > 1
581+
if !should_retain {
582+
info!(
583+
"Removing {} connection {} to {} (strong_count: {}, age: {:?})",
584+
if is_valid { "unused" } else { "invalid" },
585+
conn.id(),
586+
broker.url,
587+
strong_count,
588+
age
589+
);
590+
}
591+
should_retain
550592
}
551593
});
552594
}

0 commit comments

Comments
 (0)