Skip to content

Commit ba9abe3

Browse files
DanielChabrowski1c3t3a
authored andcommitted
Add headers to ReconnectSettings
1 parent cf9e93e commit ba9abe3

File tree

3 files changed

+55
-6
lines changed

3 files changed

+55
-6
lines changed

ci/socket-io-restart.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
const { Socket } = require("socket.io");
2-
31
let createServer = require("http").createServer;
42
let server = createServer();
53
const io = require("socket.io")(server);
@@ -8,8 +6,12 @@ const timeout = 2000;
86

97
console.log("Started");
108
var callback = (client) => {
9+
const headers = client.request.headers;
10+
console.log("headers", headers);
11+
const message = headers.message_back || "test";
12+
1113
console.log("Connected!");
12-
client.emit("message", "test");
14+
client.emit("message", message);
1315
client.on("restart_server", () => {
1416
console.log("will restart in ", timeout, "ms");
1517
io.close();

socketio/src/asynchronous/client/builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub struct ClientBuilder {
2929
pub(crate) on_reconnect: Option<Callback<DynAsyncReconnectSettingsCallback>>,
3030
pub(crate) namespace: String,
3131
tls_config: Option<TlsConnector>,
32-
opening_headers: Option<HeaderMap>,
32+
pub(crate) opening_headers: Option<HeaderMap>,
3333
transport_type: TransportType,
3434
pub(crate) auth: Option<serde_json::Value>,
3535
pub(crate) reconnect: bool,
@@ -214,6 +214,7 @@ impl ClientBuilder {
214214
/// let mut settings = ReconnectSettings::new();
215215
/// settings.address("http://server?test=123");
216216
/// settings.auth(json!({ "token": "abc" }));
217+
/// settings.opening_header("TRAIL", "abc-123");
217218
/// settings
218219
/// }.boxed()
219220
/// })

socketio/src/asynchronous/client/client.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
44
use futures_util::{future::BoxFuture, stream, Stream, StreamExt};
55
use log::{error, trace};
66
use rand::{thread_rng, Rng};
7+
use rust_engineio::header::{HeaderMap, HeaderValue};
78
use serde_json::Value;
89
use tokio::{
910
sync::RwLock,
@@ -38,6 +39,7 @@ enum DisconnectReason {
3839
pub struct ReconnectSettings {
3940
address: Option<String>,
4041
auth: Option<serde_json::Value>,
42+
headers: Option<HeaderMap>,
4143
}
4244

4345
impl ReconnectSettings {
@@ -58,6 +60,19 @@ impl ReconnectSettings {
5860
pub fn auth(&mut self, auth: serde_json::Value) {
5961
self.auth = Some(auth);
6062
}
63+
64+
/// Adds an http header to a container that is going to completely replace opening headers on reconnect.
65+
/// If there are no headers set in `ReconnectSettings`, client will use headers initially set via the builder.
66+
pub fn opening_header<T: Into<HeaderValue>, K: Into<String>>(
67+
&mut self,
68+
key: K,
69+
val: T,
70+
) -> &mut Self {
71+
self.headers
72+
.get_or_insert_with(|| HeaderMap::default())
73+
.insert(key.into(), val.into());
74+
self
75+
}
6176
}
6277

6378
/// A socket which handles communication with the server. It's initialized with
@@ -112,13 +127,18 @@ impl Client {
112127

113128
if let Some(config) = builder.on_reconnect.as_mut() {
114129
let reconnect_settings = config().await;
130+
115131
if let Some(address) = reconnect_settings.address {
116132
builder.address = address;
117133
}
118134

119135
if let Some(auth) = reconnect_settings.auth {
120136
self.auth = Some(auth);
121137
}
138+
139+
if reconnect_settings.headers.is_some() {
140+
builder.opening_headers = reconnect_settings.headers;
141+
}
122142
}
123143

124144
let socket = builder.inner_create().await?;
@@ -594,7 +614,7 @@ mod test {
594614
use serde_json::json;
595615
use serial_test::serial;
596616
use tokio::{
597-
sync::mpsc,
617+
sync::{mpsc, Mutex},
598618
time::{sleep, timeout},
599619
};
600620

@@ -755,6 +775,8 @@ mod test {
755775
static CONNECT_NUM: AtomicUsize = AtomicUsize::new(0);
756776
static MESSAGE_NUM: AtomicUsize = AtomicUsize::new(0);
757777
static ON_RECONNECT_CALLED: AtomicUsize = AtomicUsize::new(0);
778+
let latest_message = Arc::new(Mutex::new(String::new()));
779+
let handler_latest_message = latest_message.clone();
758780

759781
let url = crate::test::socket_io_restart_server();
760782

@@ -772,6 +794,7 @@ mod test {
772794
// Try setting the address to what we already have, just
773795
// to test. This is not strictly necessary in real usage.
774796
settings.address(url.to_string());
797+
settings.opening_header("MESSAGE_BACK", "updated");
775798
settings
776799
}
777800
.boxed()
@@ -789,11 +812,24 @@ mod test {
789812
}
790813
.boxed()
791814
})
792-
.on("message", |_, _socket| {
815+
.on("message", move |payload, _socket| {
816+
let latest_message = handler_latest_message.clone();
793817
async move {
794818
// test the iterator implementation and make sure there is a constant
795819
// stream of packets, even when reconnecting
796820
MESSAGE_NUM.fetch_add(1, Ordering::Release);
821+
822+
let msg = match payload {
823+
Payload::Text(msg) => msg
824+
.into_iter()
825+
.next()
826+
.expect("there should be one text payload"),
827+
_ => panic!(),
828+
};
829+
830+
let msg = serde_json::from_value(msg).expect("payload should be json string");
831+
832+
*latest_message.lock().await = msg;
797833
}
798834
.boxed()
799835
})
@@ -808,6 +844,11 @@ mod test {
808844

809845
assert_eq!(load(&CONNECT_NUM), 1, "should connect once");
810846
assert_eq!(load(&MESSAGE_NUM), 1, "should receive one");
847+
assert_eq!(
848+
*latest_message.lock().await,
849+
"test",
850+
"should receive test message"
851+
);
811852

812853
let r = socket.emit("restart_server", json!("")).await;
813854
assert!(r.is_ok(), "should emit restart success");
@@ -826,6 +867,11 @@ mod test {
826867
load(&ON_RECONNECT_CALLED) > 1,
827868
"should call on_reconnect at least once"
828869
);
870+
assert_eq!(
871+
*latest_message.lock().await,
872+
"updated",
873+
"should receive updated message"
874+
);
829875

830876
socket.disconnect().await?;
831877
Ok(())

0 commit comments

Comments
 (0)