Skip to content

Commit 96b1573

Browse files
committed
Properly flush and exit on ctrl-c
Fixes #60
1 parent bb81500 commit 96b1573

File tree

4 files changed

+60
-31
lines changed

4 files changed

+60
-31
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hotdog"
3-
version = "1.1.0"
3+
version = "1.2.0"
44
authors = ["R. Tyler Croy <[email protected]>"]
55
edition = "2024"
66

src/serve/mod.rs

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use crate::connection::*;
66
use crate::errors;
77
use crate::settings::Settings;
8+
use crate::sink::Message;
89
use crate::sink::Sink;
910
use crate::sink::kafka::Kafka;
1011
use crate::sink::parquet::Parquet;
@@ -22,44 +23,34 @@ use std::sync::Arc;
2223
pub mod plain;
2324
pub mod tls;
2425

26+
/// State entity which can be passed into connection handlers and callbacks
27+
#[derive(Clone)]
2528
pub struct ServerState {
26-
/**
27-
* A reference to the global Settings object for all configuration information
28-
*/
29+
/// A reference to the global Settings object for all configuration information
2930
pub settings: Arc<Settings>,
30-
/**
31-
* A Sender for sending statistics to the status handler
32-
*/
31+
/// A Sender for sending statistics to the status handler
3332
pub stats: InputQueueScope,
3433
}
3534

36-
/**
37-
* The Server trait describes the necessary functionality to implement a new hotdog backend server
38-
* which can receive syslog messages
39-
*/
35+
// The Server trait describes the necessary functionality to implement a new hotdog backend server
36+
// which can receive syslog messages
4037
#[async_trait]
4138
pub trait Server {
42-
/**
43-
* Bootstrap can/should be overridden by implementations which need to perform some work prior
44-
* to the creation of the TcpListener and the incoming connection loop
45-
*/
39+
/// Bootstrap can/should be overridden by implementations which need to perform some work prior
40+
/// to the creation of the TcpListener and the incoming connection loop
4641
fn bootstrap(&mut self, _state: &ServerState) -> Result<(), errors::HotdogError> {
4742
Ok(())
4843
}
4944

50-
/**
51-
* Shutdown scan/should be overridden by implementations which need to perform some work after
52-
* the termination of the connection accept loop
53-
*/
45+
/// Shutdown scan/should be overridden by implementations which need to perform some work after
46+
/// the termination of the connection accept loop
5447
fn shutdown(&self, _state: &ServerState) -> Result<(), errors::HotdogError> {
5548
Ok(())
5649
}
5750

58-
/**
59-
* Handle a single connection
60-
*
61-
* The close_channel parameter must be a clone of our connection-tracking channel Sender
62-
*/
51+
/// Handle a single connection
52+
///
53+
/// The close_channel parameter must be a clone of our connection-tracking channel Sender
6354
fn handle_connection(
6455
&self,
6556
stream: TcpStream,
@@ -78,9 +69,7 @@ pub trait Server {
7869
Ok(())
7970
}
8071

81-
/**
82-
* Accept connections on the addr
83-
*/
72+
/// Accept connections on the addr
8473
async fn accept_loop(
8574
&mut self,
8675
addr: &str,
@@ -119,11 +108,29 @@ pub trait Server {
119108
smol::spawn(async move {
120109
debug!("Starting Parquet loop");
121110
pq.runloop().await;
111+
debug!("Ending Parquet loop");
112+
std::process::exit(0);
122113
})
123114
.detach();
124115
}
125116

126117
let sender = sender.expect("Failed to configure a sink properly!");
118+
use std::sync::atomic::{AtomicBool, Ordering};
119+
let should_exit = Arc::new(AtomicBool::new(false));
120+
let se = should_exit.clone();
121+
122+
let ctrlc_tx = sender.clone();
123+
124+
ctrlc::set_handler(move || {
125+
info!("Interrupt has been received! Attempting to flush");
126+
se.store(true, Ordering::SeqCst);
127+
let tx = ctrlc_tx.clone();
128+
smol::block_on(async move {
129+
tx.send(Message::Flush { should_exit: true })
130+
.await
131+
.expect("Failed to send flush command");
132+
});
133+
});
127134

128135
self.bootstrap(&state)?;
129136

@@ -152,10 +159,21 @@ pub trait Server {
152159
.stats
153160
.gauge(status::Stats::ConnectionCount.into())
154161
.value(conn_count);
162+
163+
if should_exit.load(Ordering::Relaxed) {
164+
debug!("Serve has been instructed to exit");
165+
break;
166+
}
155167
}
156168

157169
self.shutdown(&state)?;
158170

159171
Ok(())
160172
}
161173
}
174+
175+
#[cfg(test)]
176+
mod tests {
177+
#[test]
178+
fn test_placholder() {}
179+
}

src/sink/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ pub enum Message {
5959
destination: String,
6060
payload: String,
6161
},
62-
Flush,
62+
Flush {
63+
should_exit: bool,
64+
},
6365
}
6466

6567
impl Message {
@@ -69,6 +71,10 @@ impl Message {
6971
payload,
7072
}
7173
}
74+
75+
pub fn flush() -> Self {
76+
Message::Flush { should_exit: false }
77+
}
7278
}
7379

7480
#[cfg(test)]

src/sink/parquet.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl Sink for Parquet {
104104
let mut timer = smol::Timer::interval(interval);
105105
while timer.next().await.is_some() {
106106
debug!("Timer has fired, issuing a flush");
107-
if let Err(e) = timer_tx.send(Message::Flush).await {
107+
if let Err(e) = timer_tx.send(Message::flush()).await {
108108
error!("Failed to trigger the flush timer in the parquet sink: {e:?}");
109109
}
110110
}
@@ -162,11 +162,11 @@ impl Sink for Parquet {
162162
"Reached the threshold to flush bytes for `{}`",
163163
&destination
164164
);
165-
let _ = self.tx.send(Message::Flush).await;
165+
let _ = self.tx.send(Message::flush()).await;
166166
}
167167
}
168168
}
169-
Message::Flush => {
169+
Message::Flush { should_exit } => {
170170
info!("Parquet sink has been told to flush");
171171

172172
for (destination, buf) in buffer.drain() {
@@ -196,6 +196,11 @@ impl Sink for Parquet {
196196
flush_to_parquet(self.store.clone(), schema, &destination, &buf);
197197
since_last_flush = Instant::now();
198198
}
199+
200+
if should_exit {
201+
debug!("Supposed to exit from the sink!");
202+
return;
203+
}
199204
}
200205
}
201206
}

0 commit comments

Comments
 (0)