Skip to content

Commit fc0daee

Browse files
sanityclaude
andcommitted
test: add gateway reconnection integration test
As requested in PR review, this test verifies that a peer can reconnect to a gateway after disconnection and continue to operate normally. Test sequence: 1. Start gateway and peer connected to it 2. Perform PUT/GET operations to verify connectivity 3. Disconnect client websocket connection 4. Reconnect and verify peer can still perform operations This ensures the fix doesn't break reconnection scenarios. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent fab8bb6 commit fc0daee

File tree

1 file changed

+327
-0
lines changed

1 file changed

+327
-0
lines changed

crates/core/tests/connectivity.rs

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
use anyhow::{anyhow, bail};
2+
use freenet::{
3+
config::{ConfigArgs, InlineGwConfig, NetworkArgs, SecretArgs, WebsocketApiArgs},
4+
dev_tool::TransportKeypair,
5+
local_node::NodeConfig,
6+
server::serve_gateway,
7+
test_utils::{self, make_get, make_put},
8+
};
9+
use freenet_stdlib::{
10+
client_api::{ClientRequest, ContractResponse, HostResponse, WebApi},
11+
prelude::*,
12+
};
13+
use futures::FutureExt;
14+
use rand::{Rng, SeedableRng};
15+
use std::{
16+
net::{Ipv4Addr, TcpListener},
17+
time::Duration,
18+
};
19+
use testresult::TestResult;
20+
use tokio::select;
21+
use tokio_tungstenite::connect_async;
22+
use tracing::{level_filters::LevelFilter, span, Instrument, Level};
23+
24+
static RNG: once_cell::sync::Lazy<std::sync::Mutex<rand::rngs::StdRng>> =
25+
once_cell::sync::Lazy::new(|| {
26+
std::sync::Mutex::new(rand::rngs::StdRng::from_seed(
27+
*b"0102030405060708090a0b0c0d0e0f10",
28+
))
29+
});
30+
31+
/// Test gateway reconnection:
32+
/// 1. Start a gateway and a peer connected to it
33+
/// 2. Perform operations to verify connectivity
34+
/// 3. Force disconnect
35+
/// 4. Verify that the peer can reconnect and operate normally
36+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
37+
async fn test_gateway_reconnection() -> TestResult {
38+
freenet::config::set_logger(Some(LevelFilter::INFO), None);
39+
40+
// Create sockets for ports
41+
let gateway_network_socket = TcpListener::bind("127.0.0.1:0")?;
42+
let gateway_ws_socket = TcpListener::bind("127.0.0.1:0")?;
43+
let peer_ws_socket = TcpListener::bind("127.0.0.1:0")?;
44+
45+
let gateway_public_port = gateway_network_socket.local_addr()?.port();
46+
let gateway_ws_port = gateway_ws_socket.local_addr()?.port();
47+
let peer_ws_port = peer_ws_socket.local_addr()?.port();
48+
49+
// Setup temp directories for nodes
50+
let gateway_temp_dir = tempfile::tempdir()?;
51+
let peer_temp_dir = tempfile::tempdir()?;
52+
53+
// Configure gateway
54+
let gateway_key = TransportKeypair::new_with_rng(&mut *RNG.lock().unwrap());
55+
let gateway_transport_keypair = gateway_temp_dir.path().join("private.pem");
56+
gateway_key.save(&gateway_transport_keypair)?;
57+
gateway_key
58+
.public()
59+
.save(gateway_temp_dir.path().join("public.pem"))?;
60+
61+
// Create empty gateways.toml file for gateway
62+
std::fs::write(
63+
gateway_temp_dir.path().join("gateways.toml"),
64+
"gateways = []",
65+
)?;
66+
67+
let gateway_config = ConfigArgs {
68+
ws_api: WebsocketApiArgs {
69+
address: Some(Ipv4Addr::LOCALHOST.into()),
70+
ws_api_port: Some(gateway_ws_port),
71+
},
72+
network_api: NetworkArgs {
73+
public_address: Some(Ipv4Addr::LOCALHOST.into()),
74+
public_port: Some(gateway_public_port),
75+
is_gateway: true,
76+
skip_load_from_network: true,
77+
gateways: Some(vec![]),
78+
location: Some(RNG.lock().unwrap().gen()),
79+
ignore_protocol_checking: true,
80+
address: Some(Ipv4Addr::LOCALHOST.into()),
81+
network_port: Some(gateway_public_port),
82+
bandwidth_limit: None,
83+
blocked_addresses: None,
84+
},
85+
config_paths: {
86+
freenet::config::ConfigPathsArgs {
87+
config_dir: Some(gateway_temp_dir.path().to_path_buf()),
88+
data_dir: Some(gateway_temp_dir.path().to_path_buf()),
89+
}
90+
},
91+
secrets: SecretArgs {
92+
transport_keypair: Some(gateway_transport_keypair),
93+
..Default::default()
94+
},
95+
..Default::default()
96+
};
97+
98+
// Configure peer
99+
let peer_key = TransportKeypair::new_with_rng(&mut *RNG.lock().unwrap());
100+
let peer_transport_keypair = peer_temp_dir.path().join("private.pem");
101+
peer_key.save(&peer_transport_keypair)?;
102+
peer_key
103+
.public()
104+
.save(peer_temp_dir.path().join("public.pem"))?;
105+
106+
// Create gateways.toml file for peer with gateway info
107+
let gateway_info = format!(
108+
r#"[[gateways]]
109+
address = "127.0.0.1:{}"
110+
public_key_path = "{}"
111+
"#,
112+
gateway_public_port,
113+
gateway_temp_dir.path().join("public.pem").display()
114+
);
115+
std::fs::write(peer_temp_dir.path().join("gateways.toml"), gateway_info)?;
116+
117+
let peer_config = ConfigArgs {
118+
ws_api: WebsocketApiArgs {
119+
address: Some(Ipv4Addr::LOCALHOST.into()),
120+
ws_api_port: Some(peer_ws_port),
121+
},
122+
network_api: NetworkArgs {
123+
public_address: Some(Ipv4Addr::LOCALHOST.into()),
124+
public_port: None,
125+
is_gateway: false,
126+
skip_load_from_network: true,
127+
gateways: Some(vec![serde_json::to_string(&InlineGwConfig {
128+
address: (Ipv4Addr::LOCALHOST, gateway_public_port).into(),
129+
location: Some(RNG.lock().unwrap().gen()),
130+
public_key_path: gateway_temp_dir.path().join("public.pem"),
131+
})?]),
132+
location: Some(RNG.lock().unwrap().gen()),
133+
ignore_protocol_checking: true,
134+
address: Some(Ipv4Addr::LOCALHOST.into()),
135+
network_port: None,
136+
bandwidth_limit: None,
137+
blocked_addresses: None,
138+
},
139+
config_paths: {
140+
freenet::config::ConfigPathsArgs {
141+
config_dir: Some(peer_temp_dir.path().to_path_buf()),
142+
data_dir: Some(peer_temp_dir.path().to_path_buf()),
143+
}
144+
},
145+
secrets: SecretArgs {
146+
transport_keypair: Some(peer_transport_keypair),
147+
..Default::default()
148+
},
149+
..Default::default()
150+
};
151+
152+
// Create test contract
153+
const TEST_CONTRACT: &str = "test-contract-integration";
154+
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
155+
let contract_key = contract.key();
156+
let initial_state = test_utils::create_empty_todo_list();
157+
let wrapped_state = WrappedState::from(initial_state);
158+
159+
// Start gateway node
160+
std::mem::drop(gateway_network_socket);
161+
std::mem::drop(gateway_ws_socket);
162+
let gateway = async {
163+
let config = gateway_config.build().await?;
164+
let node = NodeConfig::new(config.clone())
165+
.await?
166+
.build(serve_gateway(config.ws_api).await)
167+
.await?;
168+
node.run().await
169+
}
170+
.instrument(span!(Level::DEBUG, "gateway"))
171+
.boxed_local();
172+
173+
// Start peer node
174+
std::mem::drop(peer_ws_socket);
175+
let peer = async move {
176+
let config = peer_config.build().await?;
177+
let node = NodeConfig::new(config.clone())
178+
.await?
179+
.build(serve_gateway(config.ws_api).await)
180+
.await?;
181+
node.run().await
182+
}
183+
.instrument(span!(Level::DEBUG, "peer"))
184+
.boxed_local();
185+
186+
// Main test logic
187+
let test = async move {
188+
// Give nodes time to start up and connect
189+
tokio::time::sleep(Duration::from_secs(5)).await;
190+
191+
// Connect to peer's websocket API
192+
let uri =
193+
format!("ws://127.0.0.1:{peer_ws_port}/v1/contract/command?encodingProtocol=native");
194+
let (stream, _) = connect_async(&uri).await?;
195+
let mut client_api = WebApi::start(stream);
196+
197+
// Perform initial PUT to verify connectivity
198+
tracing::info!("Performing initial PUT to verify connectivity");
199+
make_put(
200+
&mut client_api,
201+
wrapped_state.clone(),
202+
contract.clone(),
203+
false,
204+
)
205+
.await?;
206+
207+
// Wait for put response
208+
let resp = tokio::time::timeout(Duration::from_secs(60), client_api.recv()).await;
209+
match resp {
210+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
211+
assert_eq!(key, contract_key);
212+
tracing::info!("Initial PUT successful");
213+
}
214+
Ok(Ok(other)) => {
215+
bail!("Unexpected response while waiting for put: {:?}", other);
216+
}
217+
Ok(Err(e)) => {
218+
bail!("Error receiving put response: {}", e);
219+
}
220+
Err(_) => {
221+
bail!("Timeout waiting for put response");
222+
}
223+
}
224+
225+
// Verify with GET
226+
tracing::info!("Verifying with GET");
227+
make_get(&mut client_api, contract_key, false, false).await?;
228+
let get_response = tokio::time::timeout(Duration::from_secs(60), client_api.recv()).await;
229+
match get_response {
230+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
231+
contract: recv_contract,
232+
state: recv_state,
233+
..
234+
}))) => {
235+
assert_eq!(
236+
recv_contract.as_ref().expect("Contract should exist").key(),
237+
contract_key
238+
);
239+
assert_eq!(recv_state, wrapped_state);
240+
tracing::info!("Initial GET successful");
241+
}
242+
Ok(Ok(other)) => {
243+
bail!("Unexpected response while waiting for get: {:?}", other);
244+
}
245+
Ok(Err(e)) => {
246+
bail!("Error receiving get response: {}", e);
247+
}
248+
Err(_) => {
249+
bail!("Timeout waiting for get response");
250+
}
251+
}
252+
253+
// Disconnect from peer
254+
tracing::info!("Disconnecting from peer");
255+
client_api
256+
.send(ClientRequest::Disconnect { cause: None })
257+
.await?;
258+
259+
// Wait for disconnect to complete
260+
tokio::time::sleep(Duration::from_secs(3)).await;
261+
262+
// Reconnect to the peer's websocket API
263+
tracing::info!("Reconnecting to peer");
264+
let (stream, _) = connect_async(&uri).await?;
265+
let mut client_api = WebApi::start(stream);
266+
267+
// Wait for reconnection to establish (peer should reconnect to gateway)
268+
tokio::time::sleep(Duration::from_secs(5)).await;
269+
270+
// Perform GET to verify reconnection worked and peer can operate normally
271+
tracing::info!("Performing GET after reconnection");
272+
make_get(&mut client_api, contract_key, false, false).await?;
273+
let get_response = tokio::time::timeout(Duration::from_secs(60), client_api.recv()).await;
274+
match get_response {
275+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
276+
contract: recv_contract,
277+
state: recv_state,
278+
..
279+
}))) => {
280+
assert_eq!(
281+
recv_contract.as_ref().expect("Contract should exist").key(),
282+
contract_key
283+
);
284+
assert_eq!(recv_state, wrapped_state);
285+
tracing::info!(
286+
"Reconnection test successful - peer can perform operations after reconnecting"
287+
);
288+
}
289+
Ok(Ok(other)) => {
290+
bail!(
291+
"Unexpected response while waiting for get after reconnection: {:?}",
292+
other
293+
);
294+
}
295+
Ok(Err(e)) => {
296+
bail!("Error receiving get response after reconnection: {}", e);
297+
}
298+
Err(_) => {
299+
bail!("Timeout waiting for get response after reconnection");
300+
}
301+
}
302+
303+
// Clean disconnect
304+
client_api
305+
.send(ClientRequest::Disconnect { cause: None })
306+
.await?;
307+
tokio::time::sleep(Duration::from_millis(100)).await;
308+
309+
Ok::<_, anyhow::Error>(())
310+
};
311+
312+
select! {
313+
g = gateway => {
314+
g.map_err(|e| anyhow!("Gateway error: {}", e))?;
315+
}
316+
p = peer => {
317+
p.map_err(|e| anyhow!("Peer error: {}", e))?;
318+
}
319+
r = test => {
320+
r?;
321+
// Give time for cleanup before dropping nodes
322+
tokio::time::sleep(Duration::from_secs(3)).await;
323+
}
324+
}
325+
326+
Ok(())
327+
}

0 commit comments

Comments
 (0)