Skip to content

Commit a468428

Browse files
authored
Merge pull request #111 from OpenMined/madhava/small-fix
small fixes
2 parents 126c494 + 1db0205 commit a468428

3 files changed

Lines changed: 63 additions & 38 deletions

File tree

rust/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ fn maybe_apply_hotlink_ice_from_server(resp: &WsResponse) {
8787
.filter(|v| !v.is_empty())
8888
{
8989
env::set_var("SYFTBOX_HOTLINK_TURN_PASS", pass);
90-
crate::logging::info("hotlink TURN pass from server: [set]".to_string());
90+
crate::logging::info("hotlink TURN pass from server: [set]");
9191
}
9292
}
9393
}

rust/src/hotlink_manager.rs

Lines changed: 60 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ const HOTLINK_WEBRTC_BUFFERED_HIGH_MAX: usize = 16 * 1024 * 1024;
5757
const HOTLINK_WEBRTC_BACKPRESSURE_WAIT_MS_DEFAULT: u64 = 1500;
5858
const HOTLINK_WEBRTC_BACKPRESSURE_WAIT_MS_MAX: u64 = 10_000;
5959
const HOTLINK_WEBRTC_BACKPRESSURE_POLL_MS: u64 = 5;
60-
const HOTLINK_WEBRTC_ERR_OUTBOUND_TOO_LARGE: &str = "outbound packet larger than maximum message size";
60+
const HOTLINK_WEBRTC_ERR_OUTBOUND_TOO_LARGE: &str =
61+
"outbound packet larger than maximum message size";
6162
const HOTLINK_TELEMETRY_FLUSH_MS: u64 = 1000;
6263
const HOTLINK_BENCH_STRICT_ENV: &str = "SYFTBOX_HOTLINK_BENCH_STRICT";
6364

@@ -187,7 +188,10 @@ impl HotlinkManager {
187188
if !active_ok {
188189
writers.insert(key.to_string(), entry);
189190
if hotlink_debug_enabled() {
190-
crate::logging::info(format!("hotlink tcp writer promoted standby key={}", key));
191+
crate::logging::info(format!(
192+
"hotlink tcp writer promoted standby key={}",
193+
key
194+
));
191195
}
192196
return true;
193197
}
@@ -405,7 +409,11 @@ impl HotlinkManager {
405409
let short_id = &id[..8.min(id.len())];
406410
let peer = Self::peer_from_path(&s.path);
407411
let channel = Self::channel_from_path(&s.path);
408-
let wrtc = s.webrtc.as_ref().map(|w| Self::webrtc_state_str(w)).unwrap_or("none");
412+
let wrtc = s
413+
.webrtc
414+
.as_ref()
415+
.map(|w| Self::webrtc_state_str(w))
416+
.unwrap_or("none");
409417
inbound_list.push(json!({
410418
"sid": short_id,
411419
"peer": peer,
@@ -428,7 +436,11 @@ impl HotlinkManager {
428436
} else {
429437
"pending"
430438
};
431-
let wrtc = o.webrtc.as_ref().map(|w| Self::webrtc_state_str(w)).unwrap_or("none");
439+
let wrtc = o
440+
.webrtc
441+
.as_ref()
442+
.map(|w| Self::webrtc_state_str(w))
443+
.unwrap_or("none");
432444
outbound_list.push(json!({
433445
"sid": short_id,
434446
"peer": peer,
@@ -458,21 +470,32 @@ impl HotlinkManager {
458470

459471
let in_count = sessions.as_ref().map(|s| s.len()).unwrap_or(0);
460472
let out_count = outbound.as_ref().map(|o| o.len()).unwrap_or(0);
461-
let out_accepted = outbound.as_ref().map(|o| o.values().filter(|v| v.accepted).count()).unwrap_or(0);
473+
let out_accepted = outbound
474+
.as_ref()
475+
.map(|o| o.values().filter(|v| v.accepted).count())
476+
.unwrap_or(0);
462477
let out_pending = out_count - out_accepted;
463478

464479
// Count WebRTC connected sessions (both directions)
465480
let mut wrtc_connected = 0u64;
466481
if let Ok(ref sess) = sessions {
467482
for s in sess.values() {
468-
if s.webrtc.as_ref().map(|w| w.ready_flag.load(Ordering::Relaxed)).unwrap_or(false) {
483+
if s.webrtc
484+
.as_ref()
485+
.map(|w| w.ready_flag.load(Ordering::Relaxed))
486+
.unwrap_or(false)
487+
{
469488
wrtc_connected += 1;
470489
}
471490
}
472491
}
473492
if let Ok(ref out) = outbound {
474493
for o in out.values() {
475-
if o.webrtc.as_ref().map(|w| w.ready_flag.load(Ordering::Relaxed)).unwrap_or(false) {
494+
if o.webrtc
495+
.as_ref()
496+
.map(|w| w.ready_flag.load(Ordering::Relaxed))
497+
.unwrap_or(false)
498+
{
476499
wrtc_connected += 1;
477500
}
478501
}
@@ -721,11 +744,14 @@ impl HotlinkManager {
721744
marker_path.display()
722745
));
723746
}
724-
proxies.insert(channel_key, TcpProxyInfo {
725-
port: info.port,
726-
from_email: info.from_email.clone(),
727-
to_email: info.to_email.clone(),
728-
});
747+
proxies.insert(
748+
channel_key,
749+
TcpProxyInfo {
750+
port: info.port,
751+
from_email: info.from_email.clone(),
752+
to_email: info.to_email.clone(),
753+
},
754+
);
729755
let manager_clone = manager.clone();
730756
tokio::spawn(async move {
731757
manager_clone.run_tcp_proxy(marker_path).await;
@@ -826,8 +852,7 @@ impl HotlinkManager {
826852
// Use directional outbound path owned by local user so that the
827853
// server write-ACL check always passes. Each party sends on its own
828854
// namespace (`local_email/_mpc/local_pid_to_peer_pid/...`).
829-
let peer_inbound_key =
830-
peer_inbound_tcp_key(&rel_marker, &info, local_email.as_deref());
855+
let peer_inbound_key = peer_inbound_tcp_key(&rel_marker, &info, local_email.as_deref());
831856
let outbound_key = local_outbound_tcp_key(&rel_marker, &info, local_email.as_deref())
832857
.unwrap_or_else(|| channel_key.clone());
833858
let port = info.port;
@@ -1081,7 +1106,8 @@ impl HotlinkManager {
10811106
if is_tcp_proxy_path(&path) {
10821107
crate::logging::info(format!(
10831108
"hotlink sending accept for tcp proxy: session={} path={}",
1084-
&session_id[..8], path
1109+
&session_id[..8],
1110+
path
10851111
));
10861112
if let Err(err) = self.send_accept(&session_id).await {
10871113
crate::logging::error(format!(
@@ -1243,12 +1269,10 @@ impl HotlinkManager {
12431269
None => frame.path.clone(),
12441270
};
12451271
let mut reorder = self.tcp_reorder.lock().await;
1246-
let buf = reorder
1247-
.entry(reorder_key)
1248-
.or_insert_with(|| TcpReorderBuf {
1249-
next_seq: 1,
1250-
pending: BTreeMap::new(),
1251-
});
1272+
let buf = reorder.entry(reorder_key).or_insert_with(|| TcpReorderBuf {
1273+
next_seq: 1,
1274+
pending: BTreeMap::new(),
1275+
});
12521276
buf.pending.insert(frame.seq, frame.payload);
12531277
let mut ready = Vec::new();
12541278
while let Some(data) = buf.pending.remove(&buf.next_seq) {
@@ -2053,7 +2077,9 @@ impl HotlinkManager {
20532077
if let Some(id) = existing_session.clone() {
20542078
let adopted = {
20552079
let out = self.outbound.read().await;
2056-
out.get(&id).map(|e| e.adopted_from_inbound).unwrap_or(false)
2080+
out.get(&id)
2081+
.map(|e| e.adopted_from_inbound)
2082+
.unwrap_or(false)
20572083
};
20582084
if adopted {
20592085
self.remove_outbound(&id).await;
@@ -2204,7 +2230,8 @@ impl HotlinkManager {
22042230
let id = Uuid::new_v4().to_string();
22052231
crate::logging::info(format!(
22062232
"hotlink session new: session={} path={}",
2207-
&id[..8], rel_path
2233+
&id[..8],
2234+
rel_path
22082235
));
22092236
let outbound = HotlinkOutbound {
22102237
id: id.clone(),
@@ -2303,10 +2330,7 @@ impl HotlinkManager {
23032330
}
23042331
};
23052332

2306-
if hotlink_debug_enabled()
2307-
&& is_tcp_proxy_path(&rel_path)
2308-
&& (seq <= 3 || seq % 500 == 0)
2309-
{
2333+
if hotlink_debug_enabled() && is_tcp_proxy_path(&rel_path) && (seq <= 3 || seq % 500 == 0) {
23102334
crate::logging::info(format!(
23112335
"hotlink send data: path={} session={} seq={} bytes={}",
23122336
rel_path,
@@ -2333,11 +2357,7 @@ impl HotlinkManager {
23332357
.await
23342358
{
23352359
Ok(Some(())) => {
2336-
self.record_tx(
2337-
payload_len,
2338-
send_started.elapsed().as_millis() as u64,
2339-
true,
2340-
);
2360+
self.record_tx(payload_len, send_started.elapsed().as_millis() as u64, true);
23412361
return Ok(());
23422362
}
23432363
Ok(None) => {
@@ -2370,9 +2390,7 @@ impl HotlinkManager {
23702390
return Err(anyhow::anyhow!(err));
23712391
}
23722392
if hotlink_debug_enabled() {
2373-
crate::logging::info(format!(
2374-
"hotlink webrtc send err: {e:?}"
2375-
));
2393+
crate::logging::info(format!("hotlink webrtc send err: {e:?}"));
23762394
}
23772395
if !p2p_only {
23782396
self.mark_ws_fallback(&session_id, &rel_path).await;
@@ -2765,6 +2783,7 @@ impl HotlinkManager {
27652783
Ok(())
27662784
}
27672785

2786+
#[allow(dead_code)]
27682787
async fn send_close(&self, session_id: &str, reason: &str) -> Result<()> {
27692788
let id = Uuid::new_v4().to_string();
27702789
if hotlink_debug_enabled() {
@@ -2811,7 +2830,12 @@ fn hotlink_tcp_proxy_chunk_size() -> usize {
28112830
std::env::var("SYFTBOX_HOTLINK_TCP_PROXY_CHUNK_SIZE")
28122831
.ok()
28132832
.and_then(|v| v.trim().parse::<usize>().ok())
2814-
.map(|v| v.clamp(HOTLINK_TCP_PROXY_CHUNK_SIZE_MIN, HOTLINK_TCP_PROXY_CHUNK_SIZE_MAX))
2833+
.map(|v| {
2834+
v.clamp(
2835+
HOTLINK_TCP_PROXY_CHUNK_SIZE_MIN,
2836+
HOTLINK_TCP_PROXY_CHUNK_SIZE_MAX,
2837+
)
2838+
})
28152839
.unwrap_or(HOTLINK_TCP_PROXY_CHUNK_SIZE_DEFAULT)
28162840
}
28172841

@@ -3103,7 +3127,6 @@ fn ice_servers() -> Vec<RTCIceServer> {
31033127
} else {
31043128
String::new()
31053129
},
3106-
..Default::default()
31073130
}
31083131
})
31093132
.collect()

rust/src/wsproto.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ pub struct HotlinkOpen {
8787
pub session_id: String,
8888
pub path: String,
8989
pub from: Option<String>,
90+
#[allow(dead_code)]
9091
pub to: Option<String>,
9192
}
9293

@@ -121,6 +122,7 @@ pub struct HotlinkClose {
121122
pub struct HotlinkSignal {
122123
pub session_id: String,
123124
pub kind: String,
125+
#[allow(dead_code)]
124126
pub addrs: Vec<String>,
125127
#[allow(dead_code)]
126128
pub token: String,

0 commit comments

Comments
 (0)