diff --git a/miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs b/miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs index 1619eb44d..2a71e5d72 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs @@ -30,7 +30,7 @@ use stratum_apps::{ use tracing::{debug, error, info, warn}; use crate::{ - channel_manager::{ChannelManager, ChannelManagerChannel, FULL_EXTRANONCE_SIZE}, + channel_manager::{ChannelManager, FULL_EXTRANONCE_SIZE}, error::{self, JDCError, JDCErrorKind}, jd_mode::{get_jd_mode, JdMode}, utils::create_close_channel_msg, @@ -97,10 +97,8 @@ impl RouteMessageTo<'_> { /// - [`RouteMessageTo::JobDeclarator`] → Sends the job declaration message to the JDS. /// - [`RouteMessageTo::TemplateProvider`] → Sends the template distribution message to the /// template provider. - pub async fn forward( - self, - channel_manager_channel: &ChannelManagerChannel, - ) -> Result<(), JDCErrorKind> { + pub async fn forward(self, channel_manager: &ChannelManager) -> Result<(), JDCErrorKind> { + let channel_manager_channel = &channel_manager.channel_manager_channel; match self { RouteMessageTo::Downstream((downstream_id, message)) => { _ = channel_manager_channel.downstream_sender.send(( @@ -113,10 +111,12 @@ impl RouteMessageTo<'_> { if get_jd_mode() != JdMode::SoloMining { let message_static = message.into_static(); let sv2_frame: Sv2Frame = AnyMessage::Mining(message_static).try_into()?; + let sent_bytes = sv2_frame.encoded_length() as u64; _ = channel_manager_channel .upstream_sender .send(sv2_frame) .await; + channel_manager.record_upstream_sent_bytes(sent_bytes); } } RouteMessageTo::JobDeclarator(message) => { @@ -201,6 +201,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { downstream.downstream_data.super_safe_lock(|data| { data.extended_channels.remove(&msg.channel_id); data.standard_channels.remove(&msg.channel_id); + data.bytes_by_channel.remove(&msg.channel_id); }); channel_manager_data .vardiff @@ -453,7 +454,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { })?; for messages in messages { - let _ = messages.forward(&self.channel_manager_channel).await; + let _ = messages.forward(self).await; } Ok(()) } @@ -716,7 +717,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { })?; for messages in messages { - let _ = messages.forward(&self.channel_manager_channel).await; + let _ = messages.forward(self).await; } Ok(()) @@ -911,7 +912,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { }); for messages in messages { - let _ = messages.forward(&self.channel_manager_channel).await; + let _ = messages.forward(self).await; } Ok(()) @@ -1111,7 +1112,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { })?; for messages in messages { - let _ = messages.forward(&self.channel_manager_channel).await; + let _ = messages.forward(self).await; } Ok(()) @@ -1334,7 +1335,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { })?; for messages in messages { - _ = messages.forward(&self.channel_manager_channel).await; + _ = messages.forward(self).await; } Ok(()) diff --git a/miner-apps/jd-client/src/lib/channel_manager/extensions_message_handler.rs b/miner-apps/jd-client/src/lib/channel_manager/extensions_message_handler.rs index b8ebabca7..0a4ed1f41 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/extensions_message_handler.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/extensions_message_handler.rs @@ -144,6 +144,7 @@ impl HandleExtensionsFromServerAsync for ChannelManager { AnyMessage::Extensions(new_require_extensions.into_static().into()) .try_into() .map_err(JDCError::shutdown)?; + let sent_bytes = sv2_frame.encoded_length() as u64; self.channel_manager_channel .upstream_sender @@ -153,6 +154,7 @@ impl HandleExtensionsFromServerAsync for ChannelManager { error!("Failed to send message to upstream: {:?}", e); JDCError::fallback(JDCErrorKind::ChannelErrorSender) })?; + self.record_upstream_sent_bytes(sent_bytes); } Ok(()) diff --git a/miner-apps/jd-client/src/lib/channel_manager/jd_message_handler.rs b/miner-apps/jd-client/src/lib/channel_manager/jd_message_handler.rs index 064f6377e..f7731cc51 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/jd_message_handler.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/jd_message_handler.rs @@ -222,11 +222,13 @@ impl HandleJobDeclarationMessagesFromServerAsync for ChannelManager { let sv2_frame: Sv2Frame = AnyMessage::Mining(message) .try_into() .map_err(JDCError::shutdown)?; + let sent_bytes = sv2_frame.encoded_length() as u64; self.channel_manager_channel .upstream_sender .send(sv2_frame) .await .map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?; + self.record_upstream_sent(channel_id, sent_bytes); info!("Successfully sent SetCustomMiningJob to the upstream with channel_id: {channel_id}"); Ok(()) diff --git a/miner-apps/jd-client/src/lib/channel_manager/mod.rs b/miner-apps/jd-client/src/lib/channel_manager/mod.rs index 6737ffed9..6f7209fbe 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/mod.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/mod.rs @@ -165,6 +165,8 @@ pub struct ChannelManagerData { supported_extensions: Vec, /// Extensions that the JDC requires required_extensions: Vec, + /// Per-channel byte counters for the upstream server channel: (bytes_received, bytes_sent) + pub bytes_by_channel: HashMap, } impl ChannelManagerData { @@ -180,6 +182,7 @@ impl ChannelManagerData { self.template_id_to_upstream_job_id.clear(); self.downstream_channel_id_and_job_id_to_template_id.clear(); self.pending_downstream_requests.clear(); + self.bytes_by_channel.clear(); self.downstream_id_factory = AtomicUsize::new(0); self.request_id_factory = AtomicU32::new(0); @@ -321,6 +324,7 @@ impl ChannelManager { negotiated_extensions: vec![], supported_extensions, required_extensions, + bytes_by_channel: HashMap::new(), })); let channel_manager_channel = ChannelManagerChannel { @@ -692,6 +696,28 @@ impl ChannelManager { Ok(()) } + /// Records bytes sent upstream for the given channel_id. + pub fn record_upstream_sent(&self, channel_id: ChannelId, bytes: u64) { + self.channel_manager_data.super_safe_lock(|data| { + let entry = data.bytes_by_channel.entry(channel_id).or_insert((0, 0)); + entry.1 += bytes; + }); + } + + /// Records bytes sent upstream, resolving the channel_id from current upstream channel. + /// Single lock acquisition instead of separate upstream_channel_id() + record_upstream_sent(). + pub fn record_upstream_sent_bytes(&self, bytes: u64) { + self.channel_manager_data.super_safe_lock(|data| { + let ch_id = data + .upstream_channel + .as_ref() + .map(|ch| ch.get_channel_id()) + .unwrap_or(0); + let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0)); + entry.1 += bytes; + }); + } + /// Handles messages received from the Upstream subsystem. /// /// This method listens for incoming frames on the `upstream_receiver` channel. @@ -700,10 +726,24 @@ impl ChannelManager { /// - If the frame contains any unsupported message type, an error is returned. async fn handle_pool_message_frame(&mut self) -> JDCResult<(), error::ChannelManager> { if let Ok(mut sv2_frame) = self.channel_manager_channel.upstream_receiver.recv().await { + let frame_bytes = sv2_frame.encoded_length() as u64; let header = sv2_frame.get_header().ok_or_else(|| { error!("SV2 frame missing header"); JDCError::fallback(framing_sv2::Error::MissingHeader) })?; + // Count received bytes keyed by channel_id + if header.channel_msg() { + let payload = sv2_frame.payload(); + if payload.len() >= 4 { + let channel_id = u32::from_le_bytes( + payload[..4].try_into().expect("slice is exactly 4 bytes"), + ); + self.channel_manager_data.super_safe_lock(|data| { + let entry = data.bytes_by_channel.entry(channel_id).or_insert((0, 0)); + entry.0 += frame_bytes; + }); + } + } let message_type = header.msg_type(); let extension_type = header.ext_type(); let payload = sv2_frame.payload(); @@ -812,6 +852,7 @@ impl ChannelManager { let sv2_frame: Sv2Frame = AnyMessage::Mining(upstream_message) .try_into() .map_err(JDCError::shutdown)?; + let sent_bytes = sv2_frame.encoded_length() as u64; self.channel_manager_channel .upstream_sender .send(sv2_frame) @@ -819,6 +860,7 @@ impl ChannelManager { .map_err(|_| { JDCError::fallback(JDCErrorKind::ChannelErrorSender) })?; + self.record_upstream_sent_bytes(sent_bytes); } } UpstreamState::Pending => { @@ -873,6 +915,7 @@ impl ChannelManager { let sv2_frame: Sv2Frame = AnyMessage::Mining(message) .try_into() .map_err(JDCError::shutdown)?; + let sent_bytes = sv2_frame.encoded_length() as u64; self.channel_manager_channel .upstream_sender .send(sv2_frame) @@ -880,6 +923,7 @@ impl ChannelManager { .map_err(|_| { JDCError::fallback(JDCErrorKind::ChannelErrorSender) })?; + self.record_upstream_sent_bytes(sent_bytes); } } UpstreamState::Pending => { @@ -1175,7 +1219,7 @@ impl ChannelManager { }); for message in messages { - let _ = message.forward(&self.channel_manager_channel).await; + let _ = message.forward(self).await; } info!("Vardiff update cycle complete"); diff --git a/miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs b/miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs index 0e7fc7297..8bf8b8cf7 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs @@ -223,7 +223,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager { } for message in messages { - let _ = message.forward(&self.channel_manager_channel).await; + let _ = message.forward(self).await; } Ok(()) @@ -586,7 +586,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager { } for message in messages { - let _ = message.forward(&self.channel_manager_channel).await; + let _ = message.forward(self).await; } Ok(()) diff --git a/miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs b/miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs index e48467341..0ae1b4d1d 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs @@ -255,11 +255,13 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { let sv2_frame: Sv2Frame = AnyMessage::Mining(set_custom_job) .try_into() .map_err(JDCError::shutdown)?; + let sent_bytes = sv2_frame.encoded_length() as u64; self.channel_manager_channel .upstream_sender .send(sv2_frame) .await .map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?; + self.record_upstream_sent_bytes(sent_bytes); _ = self.allocate_tokens(1).await; } } @@ -284,11 +286,13 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { let sv2_frame: Sv2Frame = AnyMessage::Mining(close_channel) .try_into() .map_err(JDCError::shutdown)?; + let sent_bytes = sv2_frame.encoded_length() as u64; self.channel_manager_channel .upstream_sender .send(sv2_frame) .await .map_err(|_e| JDCError::fallback(JDCErrorKind::ChannelErrorSender))?; + self.record_upstream_sent_bytes(sent_bytes); _ = self.allocate_tokens(1).await; } @@ -336,6 +340,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { info!("Received: {}", msg); self.channel_manager_data.super_safe_lock(|data| { + data.bytes_by_channel.remove(&msg.channel_id); data.upstream_channel = None; }); Err(JDCError::fallback(JDCErrorKind::CloseChannel)) @@ -476,7 +481,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { })?; for message in messages_results.into_iter().flatten() { - let _ = message.forward(&self.channel_manager_channel).await; + let _ = message.forward(self).await; } Ok(()) } diff --git a/miner-apps/jd-client/src/lib/downstream/mod.rs b/miner-apps/jd-client/src/lib/downstream/mod.rs index f1594f56f..44b2e0618 100644 --- a/miner-apps/jd-client/src/lib/downstream/mod.rs +++ b/miner-apps/jd-client/src/lib/downstream/mod.rs @@ -20,7 +20,10 @@ use stratum_apps::{ parsers_sv2::{parse_message_frame_with_tlvs, AnyMessage, Mining, Tlv}, }, task_manager::TaskManager, - utils::types::{DownstreamId, Message, Sv2Frame}, + utils::{ + protocol_message_type::mining_message_channel_id, + types::{ChannelId, DownstreamId, Message, Sv2Frame}, + }, }; use bitcoin_core_sv2::CancellationToken; @@ -33,8 +36,6 @@ use crate::{ status::{handle_error, Status, StatusSender}, }; -use stratum_apps::utils::types::ChannelId; - mod common_message_handler; mod extensions_message_handler; @@ -45,6 +46,7 @@ mod extensions_message_handler; /// - An optional [`GroupChannel`] if group channeling is used. /// - Active [`ExtendedChannel`]s keyed by channel ID. /// - Active [`StandardChannel`]s keyed by channel ID. +/// - Per-channel byte counters (bytes_received, bytes_sent) pub struct DownstreamData { pub require_std_job: bool, pub group_channel: GroupChannel<'static, DefaultJobStore>>, @@ -59,6 +61,8 @@ pub struct DownstreamData { pub supported_extensions: Vec, /// Extensions that the JDC requires pub required_extensions: Vec, + /// Per-channel byte counters: (bytes_received, bytes_sent) + pub bytes_by_channel: HashMap, } /// Communication layer for a downstream connection. @@ -144,6 +148,7 @@ impl Downstream { negotiated_extensions: vec![], supported_extensions, required_extensions, + bytes_by_channel: HashMap::new(), })); Downstream { @@ -270,8 +275,10 @@ impl Downstream { return Ok(()); } + let channel_id = mining_message_channel_id(&message); let message = AnyMessage::Mining(message); let sv2_frame: Sv2Frame = message.try_into().map_err(JDCError::shutdown)?; + let frame_bytes = sv2_frame.encoded_length() as u64; self.downstream_channel .downstream_sender @@ -282,6 +289,13 @@ impl Downstream { JDCError::disconnect(JDCErrorKind::ChannelErrorSender, self.downstream_id) })?; + if let Some(ch_id) = channel_id { + self.downstream_data.super_safe_lock(|data| { + let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0)); + entry.1 += frame_bytes; + }); + } + Ok(()) } @@ -293,6 +307,7 @@ impl Downstream { .recv() .await .map_err(|error| JDCError::disconnect(error, self.downstream_id))?; + let frame_bytes = sv2_frame.encoded_length() as u64; let header = sv2_frame .get_header() .expect("frame header must be present"); @@ -305,6 +320,12 @@ impl Downstream { .map_err(|error| JDCError::disconnect(error, self.downstream_id))?; match any_message { AnyMessage::Mining(message) => { + if let Some(ch_id) = mining_message_channel_id(&message) { + self.downstream_data.super_safe_lock(|data| { + let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0)); + entry.0 += frame_bytes; + }); + } self.downstream_channel .channel_manager_sender .send((self.downstream_id, message, tlv_fields)) diff --git a/miner-apps/jd-client/src/lib/monitoring.rs b/miner-apps/jd-client/src/lib/monitoring.rs index 06d0d157e..52857f9ad 100644 --- a/miner-apps/jd-client/src/lib/monitoring.rs +++ b/miner-apps/jd-client/src/lib/monitoring.rs @@ -34,6 +34,12 @@ impl ServerMonitoring for ChannelManager { .load(std::sync::atomic::Ordering::Relaxed) .saturating_sub(1); + let (bytes_received, bytes_sent) = d + .bytes_by_channel + .get(&channel_id) + .copied() + .unwrap_or((0, 0)); + extended_channels.push(ServerExtendedChannelInfo { channel_id, user_identity: user_identity.clone(), @@ -48,6 +54,8 @@ impl ServerMonitoring for ChannelManager { shares_submitted, best_diff: share_accounting.get_best_diff(), blocks_found: share_accounting.get_blocks_found(), + bytes_received, + bytes_sent, }); } @@ -79,6 +87,12 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { let user_identity = extended_channel.get_user_identity(); let share_accounting = extended_channel.get_share_accounting(); + let (bytes_received, bytes_sent) = dd + .bytes_by_channel + .get(&channel_id) + .copied() + .unwrap_or((0, 0)); + extended_channels.push(ExtendedChannelInfo { channel_id, user_identity: user_identity.clone(), @@ -97,6 +111,8 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { last_batch_work_sum: share_accounting.get_last_batch_work_sum(), share_batch_size: share_accounting.get_share_batch_size(), blocks_found: share_accounting.get_blocks_found(), + bytes_received, + bytes_sent, }); } @@ -107,6 +123,12 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { let user_identity = standard_channel.get_user_identity(); let share_accounting = standard_channel.get_share_accounting(); + let (bytes_received, bytes_sent) = dd + .bytes_by_channel + .get(&channel_id) + .copied() + .unwrap_or((0, 0)); + standard_channels.push(StandardChannelInfo { channel_id, user_identity: user_identity.clone(), @@ -123,6 +145,8 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { last_batch_work_sum: share_accounting.get_last_batch_work_sum(), share_batch_size: share_accounting.get_share_batch_size(), blocks_found: share_accounting.get_blocks_found(), + bytes_received, + bytes_sent, }); } diff --git a/miner-apps/translator/src/lib/monitoring.rs b/miner-apps/translator/src/lib/monitoring.rs index 707cb4e81..d8127e8c7 100644 --- a/miner-apps/translator/src/lib/monitoring.rs +++ b/miner-apps/translator/src/lib/monitoring.rs @@ -38,6 +38,12 @@ impl ServerMonitoring for ChannelManager { .map(|v| *v) .unwrap_or(0); + let (bytes_received, bytes_sent) = self + .bytes_by_channel + .get(&channel_id) + .map(|v| *v) + .unwrap_or((0, 0)); + extended_channels.push(ServerExtendedChannelInfo { channel_id, user_identity: user_identity.clone(), @@ -58,6 +64,8 @@ impl ServerMonitoring for ChannelManager { shares_submitted, best_diff: share_accounting.get_best_diff(), blocks_found: share_accounting.get_blocks_found(), + bytes_received, + bytes_sent, }); } } @@ -81,6 +89,12 @@ impl ServerMonitoring for ChannelManager { .map(|v| *v) .unwrap_or(0); + let (bytes_received, bytes_sent) = self + .bytes_by_channel + .get(&channel_id) + .map(|v| *v) + .unwrap_or((0, 0)); + extended_channels.push(ServerExtendedChannelInfo { channel_id, user_identity: user_identity.clone(), @@ -99,6 +113,8 @@ impl ServerMonitoring for ChannelManager { shares_submitted, best_diff: share_accounting.get_best_diff(), blocks_found: share_accounting.get_blocks_found(), + bytes_received, + bytes_sent, }); } } diff --git a/miner-apps/translator/src/lib/sv1/downstream/data.rs b/miner-apps/translator/src/lib/sv1/downstream/data.rs index c09f8a720..81d1e3917 100644 --- a/miner-apps/translator/src/lib/sv1/downstream/data.rs +++ b/miner-apps/translator/src/lib/sv1/downstream/data.rs @@ -1,4 +1,11 @@ -use std::time::Instant; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Instant, +}; + use stratum_apps::{ stratum_core::{ bitcoin::Target, @@ -37,10 +44,19 @@ pub struct DownstreamData { pub upstream_target: Option, // Timestamp of when the last job was received by this downstream, used for keepalive check pub last_job_received_time: Option, + /// Cumulative SV1 TCP bytes received from the miner + pub sv1_bytes_received: Arc, + /// Cumulative SV1 TCP bytes sent to the miner + pub sv1_bytes_sent: Arc, } impl DownstreamData { - pub fn new(hashrate: Option, target: Target) -> Self { + pub fn new( + hashrate: Option, + target: Target, + sv1_bytes_received: Arc, + sv1_bytes_sent: Arc, + ) -> Self { DownstreamData { channel_id: None, extranonce1: vec![0; 8] @@ -62,9 +78,21 @@ impl DownstreamData { pending_share: None, upstream_target: None, last_job_received_time: None, + sv1_bytes_received, + sv1_bytes_sent, } } + /// Returns the current SV1 TCP bytes received from the miner. + pub fn get_sv1_bytes_received(&self) -> u64 { + self.sv1_bytes_received.load(Ordering::Relaxed) + } + + /// Returns the current SV1 TCP bytes sent to the miner. + pub fn get_sv1_bytes_sent(&self) -> u64 { + self.sv1_bytes_sent.load(Ordering::Relaxed) + } + pub fn set_pending_target(&mut self, new_target: Target, downstream_id: DownstreamId) { self.pending_target = Some(new_target); debug!("Downstream {downstream_id}: Set pending target"); diff --git a/miner-apps/translator/src/lib/sv1/downstream/downstream.rs b/miner-apps/translator/src/lib/sv1/downstream/downstream.rs index 560625c57..14c56885e 100644 --- a/miner-apps/translator/src/lib/sv1/downstream/downstream.rs +++ b/miner-apps/translator/src/lib/sv1/downstream/downstream.rs @@ -7,7 +7,7 @@ use crate::{ use async_channel::{Receiver, Sender}; use std::{ sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, time::Instant, @@ -70,8 +70,15 @@ impl Downstream { target: Target, hashrate: Option, connection_token: CancellationToken, + sv1_bytes_received: Arc, + sv1_bytes_sent: Arc, ) -> Self { - let downstream_data = Arc::new(Mutex::new(DownstreamData::new(hashrate, target))); + let downstream_data = Arc::new(Mutex::new(DownstreamData::new( + hashrate, + target, + sv1_bytes_received, + sv1_bytes_sent, + ))); let downstream_channel_state = DownstreamChannelState::new( downstream_sv1_sender, downstream_sv1_receiver, diff --git a/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs b/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs index f1af8cd2e..220434c9e 100644 --- a/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs +++ b/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs @@ -242,6 +242,8 @@ impl Sv1Server { first_target, Some(self.config.downstream_difficulty_config.min_individual_miner_hashrate), connection_token, + connection.bytes_received(), + connection.bytes_sent(), ); // vardiff initialization (only if enabled) self.downstreams.insert(downstream_id, downstream.clone()); diff --git a/miner-apps/translator/src/lib/sv1_monitoring.rs b/miner-apps/translator/src/lib/sv1_monitoring.rs index 44b559bd7..a3eb51c93 100644 --- a/miner-apps/translator/src/lib/sv1_monitoring.rs +++ b/miner-apps/translator/src/lib/sv1_monitoring.rs @@ -26,6 +26,8 @@ fn downstream_to_sv1_client_info(downstream: &Downstream) -> Option>>, /// Extranonce factories containing per channel extranonces pub extranonce_factories: Arc>, + /// Per-channel byte counters for server channels: (bytes_received, bytes_sent) + pub bytes_by_channel: Arc>, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -122,6 +124,7 @@ impl ChannelManager { share_sequence_counters: Arc::new(DashMap::new()), negotiated_extensions: Arc::new(Mutex::new(Vec::new())), extranonce_factories: Arc::new(DashMap::new()), + bytes_by_channel: Arc::new(DashMap::new()), } } @@ -225,12 +228,23 @@ impl ChannelManager { .recv() .await .map_err(TproxyError::fallback)?; + let frame_bytes = sv2_frame.encoded_length() as u64; let mut channel_manager: ChannelManager = (*self).clone(); let header = sv2_frame.get_header().ok_or_else(|| { error!("SV2 frame missing header"); TproxyError::fallback(framing_sv2::Error::MissingHeader) })?; + // Count received bytes keyed by channel_id (from header if channel_msg, else 0) + if header.channel_msg() { + let payload = sv2_frame.payload(); + if payload.len() >= 4 { + let channel_id = + u32::from_le_bytes(payload[..4].try_into().expect("slice is exactly 4 bytes")); + let mut entry = self.bytes_by_channel.entry(channel_id).or_insert((0, 0)); + entry.0 += frame_bytes; + } + } match protocol_message_type(header.ext_type(), header.msg_type()) { MessageType::Mining => { channel_manager @@ -616,23 +630,37 @@ impl ChannelManager { ); TproxyError::shutdown(framing_sv2::Error::ExpectedSv2Frame) })?; + let sent_bytes = sv2_frame.encoded_length() as u64; + let upstream_ch_id = m.channel_id; self.channel_state.upstream_sender.send(sv2_frame).await.map_err(|e| { error!("Failed to send submit shares extended message to upstream: {:?}", e); TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) })?; + let mut entry = self + .bytes_by_channel + .entry(upstream_ch_id) + .or_insert((0, 0)); + entry.1 += sent_bytes; sent = true; } } if !sent { + let upstream_ch_id = m.channel_id; let message = Mining::SubmitSharesExtended(m); let sv2_frame: Sv2Frame = AnyMessage::Mining(message) .try_into() .map_err(TproxyError::shutdown)?; + let sent_bytes = sv2_frame.encoded_length() as u64; self.channel_state.upstream_sender.send(sv2_frame).await.map_err(|e| { error!("Failed to send submit shares extended message to upstream: {:?}", e); TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) })?; + let mut entry = self + .bytes_by_channel + .entry(upstream_ch_id) + .or_insert((0, 0)); + entry.1 += sent_bytes; } } } @@ -661,10 +689,12 @@ impl ChannelManager { m.channel_id ); // Forward UpdateChannel message to upstream + let upstream_ch_id = m.channel_id; let message = Mining::UpdateChannel(m); let sv2_frame: Sv2Frame = AnyMessage::Mining(message) .try_into() .map_err(TproxyError::shutdown)?; + let sent_bytes = sv2_frame.encoded_length() as u64; self.channel_state .upstream_sender @@ -674,6 +704,11 @@ impl ChannelManager { error!("Failed to send UpdateChannel message to upstream: {:?}", e); TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) })?; + let mut entry = self + .bytes_by_channel + .entry(upstream_ch_id) + .or_insert((0, 0)); + entry.1 += sent_bytes; } Mining::CloseChannel(m) => { debug!("Received CloseChannel from Sv1Server: {m}"); @@ -691,6 +726,7 @@ impl ChannelManager { debug!("Removed channel {} from group channel before sending CloseChannel to upstream", m.channel_id); } } + self.bytes_by_channel.remove(&m.channel_id); let message = Mining::CloseChannel(m); let sv2_frame: Sv2Frame = AnyMessage::Mining(message) diff --git a/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs b/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs index 9d067c7e3..5cdf2691f 100644 --- a/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs +++ b/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs @@ -317,12 +317,15 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { if let Some((_, group_channel)) = group_channel { for channel_id in group_channel.get_channel_ids() { self.extended_channels.remove(channel_id); + self.bytes_by_channel.remove(channel_id); } + self.bytes_by_channel.remove(&m.channel_id); // if the message was not sent to a group channel, and we're not working in // aggregated mode, } else if self.extended_channels.contains_key(&m.channel_id) { // remove the channel from the extended channels map self.extended_channels.remove(&m.channel_id); + self.bytes_by_channel.remove(&m.channel_id); // remove the channel from any group channels that contain it for mut group_channel in self.group_channels.iter_mut() { diff --git a/pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs b/pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs index 643716fbc..2520974ac 100644 --- a/pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs +++ b/pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs @@ -95,6 +95,7 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { .super_safe_lock(|downstream_data| { downstream_data.standard_channels.remove(&msg.channel_id); downstream_data.extended_channels.remove(&msg.channel_id); + downstream_data.bytes_by_channel.remove(&msg.channel_id); }); channel_manager_data .vardiff diff --git a/pool-apps/pool/src/lib/downstream/mod.rs b/pool-apps/pool/src/lib/downstream/mod.rs index 7917f1045..9aae406b5 100644 --- a/pool-apps/pool/src/lib/downstream/mod.rs +++ b/pool-apps/pool/src/lib/downstream/mod.rs @@ -25,7 +25,7 @@ use stratum_apps::{ }, task_manager::TaskManager, utils::{ - protocol_message_type::{protocol_message_type, MessageType}, + protocol_message_type::{mining_message_channel_id, protocol_message_type, MessageType}, types::{ChannelId, DownstreamId, Message, Sv2Frame}, }, }; @@ -49,6 +49,7 @@ mod extensions_message_handler; /// - Active [`ExtendedChannel`]s keyed by channel ID. /// - Active [`StandardChannel`]s keyed by channel ID. /// - Extensions that have been successfully negotiated with this client +/// - Per-channel byte counters (bytes_received, bytes_sent) pub struct DownstreamData { pub group_channel: GroupChannel<'static, DefaultJobStore>>, pub extended_channels: @@ -58,6 +59,8 @@ pub struct DownstreamData { pub channel_id_factory: AtomicU32, /// Extensions that have been successfully negotiated with this client pub negotiated_extensions: Vec, + /// Per-channel byte counters: (bytes_received, bytes_sent) + pub bytes_by_channel: HashMap, } /// Communication layer for a downstream connection. @@ -144,6 +147,7 @@ impl Downstream { group_channel, channel_id_factory, negotiated_extensions: vec![], + bytes_by_channel: HashMap::new(), })); Downstream { @@ -275,8 +279,10 @@ impl Downstream { return Ok(()); } + let channel_id = mining_message_channel_id(&msg); let message = AnyMessage::Mining(msg); let std_frame: Sv2Frame = message.try_into().map_err(PoolError::shutdown)?; + let frame_bytes = std_frame.encoded_length() as u64; self.downstream_channel .downstream_sender @@ -287,6 +293,13 @@ impl Downstream { PoolError::disconnect(PoolErrorKind::ChannelErrorSender, self.downstream_id) })?; + if let Some(ch_id) = channel_id { + self.downstream_data.super_safe_lock(|data| { + let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0)); + entry.1 += frame_bytes; + }); + } + Ok(()) } @@ -306,6 +319,7 @@ impl Downstream { match protocol_message_type(header.ext_type(), header.msg_type()) { MessageType::Mining => { debug!("Received mining SV2 frame from downstream."); + let frame_bytes = sv2_frame.encoded_length() as u64; let negotiated_extensions = self .downstream_data .super_safe_lock(|data| data.negotiated_extensions.clone()); @@ -328,6 +342,12 @@ impl Downstream { )); } }; + if let Some(ch_id) = mining_message_channel_id(&mining_message) { + self.downstream_data.super_safe_lock(|data| { + let entry = data.bytes_by_channel.entry(ch_id).or_insert((0, 0)); + entry.0 += frame_bytes; + }); + } self.downstream_channel .channel_manager_sender .send((self.downstream_id, mining_message, tlv_fields)) diff --git a/pool-apps/pool/src/lib/monitoring.rs b/pool-apps/pool/src/lib/monitoring.rs index 48be56dca..bcf755ec7 100644 --- a/pool-apps/pool/src/lib/monitoring.rs +++ b/pool-apps/pool/src/lib/monitoring.rs @@ -24,6 +24,11 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { let requested_max_target = extended_channel.get_requested_max_target(); let user_identity = extended_channel.get_user_identity(); let share_accounting = extended_channel.get_share_accounting(); + let (bytes_received, bytes_sent) = dd + .bytes_by_channel + .get(&channel_id) + .copied() + .unwrap_or((0, 0)); extended_channels.push(ExtendedChannelInfo { channel_id, @@ -43,6 +48,8 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { last_batch_work_sum: share_accounting.get_last_batch_work_sum(), share_batch_size: share_accounting.get_share_batch_size(), blocks_found: share_accounting.get_blocks_found(), + bytes_received, + bytes_sent, }); } @@ -52,6 +59,11 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { let requested_max_target = standard_channel.get_requested_max_target(); let user_identity = standard_channel.get_user_identity(); let share_accounting = standard_channel.get_share_accounting(); + let (bytes_received, bytes_sent) = dd + .bytes_by_channel + .get(&channel_id) + .copied() + .unwrap_or((0, 0)); standard_channels.push(StandardChannelInfo { channel_id, @@ -69,6 +81,8 @@ fn downstream_to_sv2_client_info(client: &Downstream) -> Option { last_batch_work_sum: share_accounting.get_last_batch_work_sum(), share_batch_size: share_accounting.get_share_batch_size(), blocks_found: share_accounting.get_blocks_found(), + bytes_received, + bytes_sent, }); } diff --git a/stratum-apps/src/monitoring/README.md b/stratum-apps/src/monitoring/README.md index a2a45c7d7..64adb5f14 100644 --- a/stratum-apps/src/monitoring/README.md +++ b/stratum-apps/src/monitoring/README.md @@ -75,6 +75,8 @@ tokio::spawn(async move { - `sv2_server_channel_hashrate{channel_id, user_identity}` - Per-channel hashrate - `sv2_server_shares_accepted_total{channel_id, user_identity}` - Per-channel shares - `sv2_server_blocks_found_total` - Total blocks found across all current server channels +- `sv2_server_bytes_received_total` - Total bytes received from the server +- `sv2_server_bytes_sent_total` - Total bytes sent to the server **Clients:** - `sv2_clients_total` - Connected client count @@ -83,7 +85,25 @@ tokio::spawn(async move { - `sv2_client_channel_hashrate{client_id, channel_id, user_identity}` - Per-channel hashrate - `sv2_client_shares_accepted_total{client_id, channel_id, user_identity}` - Per-channel shares - `sv2_client_blocks_found_total` - Total blocks found across all current client channels +- `sv2_client_bytes_received_total` - Total bytes received from all clients +- `sv2_client_bytes_sent_total` - Total bytes sent to all clients **Sv1 (Translator Proxy only):** - `sv1_clients_total` - Sv1 client count - `sv1_hashrate_total` - Sv1 total hashrate +- `sv1_client_bytes_received_total{client_id, user_identity}` - Bytes received per SV1 client +- `sv1_client_bytes_sent_total{client_id, user_identity}` - Bytes sent per SV1 client + +## Metric Design Notes + +### Bytes Metrics + +Prometheus exposes **aggregate** byte counters (`_total` suffix, scalar Gauges) for capacity +planning, anomaly detection, and reflection attack detection at the system level. Per-channel +and per-client byte detail is available via the JSON REST API (`/api/v1/server/channels`, +`/api/v1/clients/{id}/channels`, `/api/v1/sv1/clients`) for drill-down without inflating +Prometheus time series cardinality. + +The SV1 per-client byte metrics (`sv1_client_bytes_received_total`, `sv1_client_bytes_sent_total`) +are per-client GaugeVecs because each SV1 client maps 1:1 to a TCP connection, making them the +natural unit for bandwidth asymmetry monitoring on the translator proxy. diff --git a/stratum-apps/src/monitoring/client.rs b/stratum-apps/src/monitoring/client.rs index ba8ebfcb8..0c4f378f1 100644 --- a/stratum-apps/src/monitoring/client.rs +++ b/stratum-apps/src/monitoring/client.rs @@ -26,6 +26,8 @@ pub struct ExtendedChannelInfo { pub last_batch_work_sum: f64, pub share_batch_size: usize, pub blocks_found: u32, + pub bytes_received: u64, + pub bytes_sent: u64, } /// Information about a standard channel @@ -46,6 +48,8 @@ pub struct StandardChannelInfo { pub last_batch_work_sum: f64, pub share_batch_size: usize, pub blocks_found: u32, + pub bytes_received: u64, + pub bytes_sent: u64, } /// Full information about a single Sv2 client including all channels @@ -103,6 +107,8 @@ pub struct Sv2ClientsSummary { pub extended_channels: usize, pub standard_channels: usize, pub total_hashrate: f32, + pub total_bytes_received: u64, + pub total_bytes_sent: u64, } /// Trait for monitoring Sv2 clients (downstream connections) @@ -126,12 +132,33 @@ pub trait Sv2ClientsMonitoring: Send + Sync { let extended: usize = clients.iter().map(|c| c.extended_channels.len()).sum(); let standard: usize = clients.iter().map(|c| c.standard_channels.len()).sum(); + let total_bytes_received: u64 = clients + .iter() + .flat_map(|c| { + c.extended_channels + .iter() + .map(|ch| ch.bytes_received) + .chain(c.standard_channels.iter().map(|ch| ch.bytes_received)) + }) + .sum(); + let total_bytes_sent: u64 = clients + .iter() + .flat_map(|c| { + c.extended_channels + .iter() + .map(|ch| ch.bytes_sent) + .chain(c.standard_channels.iter().map(|ch| ch.bytes_sent)) + }) + .sum(); + Sv2ClientsSummary { total_clients: clients.len(), total_channels: extended + standard, extended_channels: extended, standard_channels: standard, total_hashrate: clients.iter().map(|c| c.total_hashrate()).sum(), + total_bytes_received, + total_bytes_sent, } } } diff --git a/stratum-apps/src/monitoring/http_server.rs b/stratum-apps/src/monitoring/http_server.rs index a0d396793..c6bca47ee 100644 --- a/stratum-apps/src/monitoring/http_server.rs +++ b/stratum-apps/src/monitoring/http_server.rs @@ -774,6 +774,9 @@ async fn handle_prometheus_metrics(State(state): State) -> Response } if let Some(ref server) = snapshot.server_info { + let mut server_bytes_received: u64 = 0; + let mut server_bytes_sent: u64 = 0; + for channel in &server.extended_channels { let channel_id = channel.channel_id.to_string(); let user = &channel.user_identity; @@ -791,6 +794,8 @@ async fn handle_prometheus_metrics(State(state): State) -> Response .with_label_values(&[&channel_id, user]) .set(hashrate as f64); } + server_bytes_received += channel.bytes_received; + server_bytes_sent += channel.bytes_sent; } for channel in &server.standard_channels { @@ -810,6 +815,15 @@ async fn handle_prometheus_metrics(State(state): State) -> Response .with_label_values(&[&channel_id, user]) .set(hashrate as f64); } + server_bytes_received += channel.bytes_received; + server_bytes_sent += channel.bytes_sent; + } + + if let Some(ref metric) = state.metrics.sv2_server_bytes_received_total { + metric.set(server_bytes_received as f64); + } + if let Some(ref metric) = state.metrics.sv2_server_bytes_sent_total { + metric.set(server_bytes_sent as f64); } if let Some(ref metric) = state.metrics.sv2_server_blocks_found_total { @@ -846,6 +860,8 @@ async fn handle_prometheus_metrics(State(state): State) -> Response } let mut client_blocks_total: u64 = 0; + let mut client_bytes_received_total: u64 = 0; + let mut client_bytes_sent_total: u64 = 0; for client in snapshot.sv2_clients.as_deref().unwrap_or(&[]) { let client_id = client.client_id.to_string(); @@ -865,6 +881,8 @@ async fn handle_prometheus_metrics(State(state): State) -> Response .set(channel.nominal_hashrate as f64); } client_blocks_total += channel.blocks_found as u64; + client_bytes_received_total += channel.bytes_received; + client_bytes_sent_total += channel.bytes_sent; } for channel in &client.standard_channels { @@ -882,12 +900,20 @@ async fn handle_prometheus_metrics(State(state): State) -> Response .set(channel.nominal_hashrate as f64); } client_blocks_total += channel.blocks_found as u64; + client_bytes_received_total += channel.bytes_received; + client_bytes_sent_total += channel.bytes_sent; } } if let Some(ref metric) = state.metrics.sv2_client_blocks_found_total { metric.set(client_blocks_total as f64); } + if let Some(ref metric) = state.metrics.sv2_client_bytes_received_total { + metric.set(client_bytes_received_total as f64); + } + if let Some(ref metric) = state.metrics.sv2_client_bytes_sent_total { + metric.set(client_bytes_sent_total as f64); + } } // Collect SV1 client metrics @@ -900,6 +926,31 @@ async fn handle_prometheus_metrics(State(state): State) -> Response } } + // Reset per-client SV1 byte metrics before repopulating + if let Some(ref metric) = state.metrics.sv1_client_bytes_received_total { + metric.reset(); + } + if let Some(ref metric) = state.metrics.sv1_client_bytes_sent_total { + metric.reset(); + } + + if let Some(ref sv1_clients) = snapshot.sv1_clients { + for client in sv1_clients { + let client_id = client.client_id.to_string(); + let user = &client.user_identity; + if let Some(ref metric) = state.metrics.sv1_client_bytes_received_total { + metric + .with_label_values(&[&client_id, user]) + .set(client.bytes_received as f64); + } + if let Some(ref metric) = state.metrics.sv1_client_bytes_sent_total { + metric + .with_label_values(&[&client_id, user]) + .set(client.bytes_sent as f64); + } + } + } + // Encode and return metrics let encoder = TextEncoder::new(); let metric_families = state.metrics.registry.gather(); diff --git a/stratum-apps/src/monitoring/prometheus_metrics.rs b/stratum-apps/src/monitoring/prometheus_metrics.rs index ecf21a177..861110b1e 100644 --- a/stratum-apps/src/monitoring/prometheus_metrics.rs +++ b/stratum-apps/src/monitoring/prometheus_metrics.rs @@ -15,6 +15,8 @@ pub struct PrometheusMetrics { pub sv2_server_channel_hashrate: Option, pub sv2_server_shares_accepted_total: Option, pub sv2_server_blocks_found_total: Option, + pub sv2_server_bytes_received_total: Option, + pub sv2_server_bytes_sent_total: Option, // Clients metrics (downstream connections) pub sv2_clients_total: Option, pub sv2_client_channels: Option, @@ -22,9 +24,13 @@ pub struct PrometheusMetrics { pub sv2_client_channel_hashrate: Option, pub sv2_client_shares_accepted_total: Option, pub sv2_client_blocks_found_total: Option, + pub sv2_client_bytes_received_total: Option, + pub sv2_client_bytes_sent_total: Option, // SV1 metrics pub sv1_clients_total: Option, pub sv1_hashrate_total: Option, + pub sv1_client_bytes_received_total: Option, + pub sv1_client_bytes_sent_total: Option, } impl PrometheusMetrics { @@ -46,6 +52,8 @@ impl PrometheusMetrics { sv2_server_channel_hashrate, sv2_server_shares_accepted_total, sv2_server_blocks_found_total, + sv2_server_channel_bytes_received_total, + sv2_server_channel_bytes_sent_total, ) = if enable_server_metrics { let channels = GaugeVec::new( Opts::new("sv2_server_channels", "Number of server channels by type"), @@ -83,15 +91,29 @@ impl PrometheusMetrics { )?; registry.register(Box::new(blocks_found.clone()))?; + let bytes_received = Gauge::new( + "sv2_server_bytes_received_total", + "Total bytes received from the server across all channels", + )?; + registry.register(Box::new(bytes_received.clone()))?; + + let bytes_sent = Gauge::new( + "sv2_server_bytes_sent_total", + "Total bytes sent to the server across all channels", + )?; + registry.register(Box::new(bytes_sent.clone()))?; + ( Some(channels), Some(hashrate), Some(channel_hashrate), Some(shares_accepted), Some(blocks_found), + Some(bytes_received), + Some(bytes_sent), ) } else { - (None, None, None, None, None) + (None, None, None, None, None, None, None) }; // Clients metrics (downstream connections) @@ -102,6 +124,8 @@ impl PrometheusMetrics { sv2_client_channel_hashrate, sv2_client_shares_accepted_total, sv2_client_blocks_found_total, + sv2_client_channel_bytes_received_total, + sv2_client_channel_bytes_sent_total, ) = if enable_clients_metrics { let clients_total = Gauge::new("sv2_clients_total", "Total number of connected clients")?; @@ -143,6 +167,18 @@ impl PrometheusMetrics { )?; registry.register(Box::new(blocks_found.clone()))?; + let bytes_received = Gauge::new( + "sv2_client_bytes_received_total", + "Total bytes received from all clients across all channels", + )?; + registry.register(Box::new(bytes_received.clone()))?; + + let bytes_sent = Gauge::new( + "sv2_client_bytes_sent_total", + "Total bytes sent to all clients across all channels", + )?; + registry.register(Box::new(bytes_sent.clone()))?; + ( Some(clients_total), Some(channels), @@ -150,22 +186,52 @@ impl PrometheusMetrics { Some(channel_hashrate), Some(shares_accepted), Some(blocks_found), + Some(bytes_received), + Some(bytes_sent), ) } else { - (None, None, None, None, None, None) + (None, None, None, None, None, None, None, None) }; // SV1 metrics - let (sv1_clients_total, sv1_hashrate_total) = if enable_sv1_metrics { + let ( + sv1_clients_total, + sv1_hashrate_total, + sv1_client_bytes_received_total, + sv1_client_bytes_sent_total, + ) = if enable_sv1_metrics { let clients = Gauge::new("sv1_clients_total", "Total number of SV1 clients")?; registry.register(Box::new(clients.clone()))?; let hashrate = Gauge::new("sv1_hashrate_total", "Total hashrate from SV1 clients")?; registry.register(Box::new(hashrate.clone()))?; - (Some(clients), Some(hashrate)) + let bytes_received = GaugeVec::new( + Opts::new( + "sv1_client_bytes_received_total", + "Total bytes received per SV1 client", + ), + &["client_id", "user_identity"], + )?; + registry.register(Box::new(bytes_received.clone()))?; + + let bytes_sent = GaugeVec::new( + Opts::new( + "sv1_client_bytes_sent_total", + "Total bytes sent per SV1 client", + ), + &["client_id", "user_identity"], + )?; + registry.register(Box::new(bytes_sent.clone()))?; + + ( + Some(clients), + Some(hashrate), + Some(bytes_received), + Some(bytes_sent), + ) } else { - (None, None) + (None, None, None, None) }; Ok(Self { @@ -176,14 +242,20 @@ impl PrometheusMetrics { sv2_server_channel_hashrate, sv2_server_shares_accepted_total, sv2_server_blocks_found_total, + sv2_server_bytes_received_total: sv2_server_channel_bytes_received_total, + sv2_server_bytes_sent_total: sv2_server_channel_bytes_sent_total, sv2_clients_total, sv2_client_channels, sv2_client_hashrate_total, sv2_client_channel_hashrate, sv2_client_shares_accepted_total, sv2_client_blocks_found_total, + sv2_client_bytes_received_total: sv2_client_channel_bytes_received_total, + sv2_client_bytes_sent_total: sv2_client_channel_bytes_sent_total, sv1_clients_total, sv1_hashrate_total, + sv1_client_bytes_received_total, + sv1_client_bytes_sent_total, }) } } diff --git a/stratum-apps/src/monitoring/server.rs b/stratum-apps/src/monitoring/server.rs index 876bbf746..b876dd01b 100644 --- a/stratum-apps/src/monitoring/server.rs +++ b/stratum-apps/src/monitoring/server.rs @@ -23,6 +23,8 @@ pub struct ServerExtendedChannelInfo { pub shares_submitted: u32, pub best_diff: f64, pub blocks_found: u32, + pub bytes_received: u64, + pub bytes_sent: u64, } /// Information about a standard channel opened with the server @@ -39,6 +41,8 @@ pub struct ServerStandardChannelInfo { pub shares_submitted: u32, pub best_diff: f64, pub blocks_found: u32, + pub bytes_received: u64, + pub bytes_sent: u64, } /// Information about the server (upstream connection) @@ -75,6 +79,8 @@ pub struct ServerSummary { pub extended_channels: usize, pub standard_channels: usize, pub total_hashrate: f32, + pub total_bytes_received: u64, + pub total_bytes_sent: u64, } /// Trait for monitoring the server (upstream connection) @@ -86,11 +92,26 @@ pub trait ServerMonitoring: Send + Sync { fn get_server_summary(&self) -> ServerSummary { let server = self.get_server(); + let total_bytes_received: u64 = server + .extended_channels + .iter() + .map(|c| c.bytes_received) + .chain(server.standard_channels.iter().map(|c| c.bytes_received)) + .sum(); + let total_bytes_sent: u64 = server + .extended_channels + .iter() + .map(|c| c.bytes_sent) + .chain(server.standard_channels.iter().map(|c| c.bytes_sent)) + .sum(); + ServerSummary { total_channels: server.total_channels(), extended_channels: server.extended_channels.len(), standard_channels: server.standard_channels.len(), total_hashrate: server.total_hashrate(), + total_bytes_received, + total_bytes_sent, } } } diff --git a/stratum-apps/src/monitoring/sv1.rs b/stratum-apps/src/monitoring/sv1.rs index 99d177be6..2d9c37b31 100644 --- a/stratum-apps/src/monitoring/sv1.rs +++ b/stratum-apps/src/monitoring/sv1.rs @@ -19,6 +19,8 @@ pub struct Sv1ClientInfo { pub extranonce2_len: usize, pub version_rolling_mask: Option, pub version_rolling_min_bit: Option, + pub bytes_received: u64, + pub bytes_sent: u64, } /// Aggregate information about SV1 client connections @@ -26,6 +28,8 @@ pub struct Sv1ClientInfo { pub struct Sv1ClientsSummary { pub total_clients: usize, pub total_hashrate: f32, + pub total_bytes_received: u64, + pub total_bytes_sent: u64, } /// Trait for monitoring SV1 client connections @@ -50,6 +54,8 @@ pub trait Sv1ClientsMonitoring: Send + Sync { Sv1ClientsSummary { total_clients: clients.len(), total_hashrate: clients.iter().filter_map(|c| c.hashrate).sum(), + total_bytes_received: clients.iter().map(|c| c.bytes_received).sum(), + total_bytes_sent: clients.iter().map(|c| c.bytes_sent).sum(), } } } diff --git a/stratum-apps/src/network_helpers/sv1_connection.rs b/stratum-apps/src/network_helpers/sv1_connection.rs index 8398f0ac2..3e0853264 100644 --- a/stratum-apps/src/network_helpers/sv1_connection.rs +++ b/stratum-apps/src/network_helpers/sv1_connection.rs @@ -1,3 +1,8 @@ +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + use async_channel::{unbounded, Receiver, Sender}; use futures::StreamExt; use stratum_core::sv1_api::json_rpc; @@ -21,6 +26,8 @@ use tracing::{error, trace, warn}; pub struct ConnectionSV1 { receiver: Receiver, sender: Sender, + bytes_received: Arc, + bytes_sent: Arc, } struct ConnectionState { @@ -64,6 +71,11 @@ impl ConnectionSV1 { let buffer_read_half = BufReader::new(read_half); let buffer_write_half = BufWriter::new(write_half); + let bytes_received = Arc::new(AtomicU64::new(0)); + let bytes_sent = Arc::new(AtomicU64::new(0)); + let reader_bytes = bytes_received.clone(); + let writer_bytes = bytes_sent.clone(); + let connection_state = ConnectionState::new( receiver_outgoing.clone(), sender_outgoing.clone(), @@ -77,11 +89,11 @@ impl ConnectionSV1 { trace!("ConnectionSV1: received cancellation signal."); connection_state.close(); } - _ = Self::run_reader(buffer_read_half, sender_incoming.clone()) => { + _ = Self::run_reader(buffer_read_half, sender_incoming.clone(), reader_bytes) => { trace!("Reader task exited. Closing writer sender."); connection_state.close(); } - _ = Self::run_writer(buffer_write_half, receiver_outgoing.clone()) => { + _ = Self::run_writer(buffer_write_half, receiver_outgoing.clone(), writer_bytes) => { trace!("Writer task exited. Closing reader sender."); connection_state.close(); } @@ -91,27 +103,34 @@ impl ConnectionSV1 { Self { receiver: receiver_incoming, sender: sender_outgoing, + bytes_received, + bytes_sent, } } async fn run_reader( reader: BufReader, sender: Sender, + bytes_received: Arc, ) { let mut lines = FramedRead::new(reader, LinesCodec::new_with_max_length(MAX_LINE_LENGTH)); while let Some(result) = lines.next().await { match result { - Ok(line) => match serde_json::from_str::(&line) { - Ok(msg) => { - if sender.send(msg).await.is_err() { - warn!("Receiver dropped, stopping reader"); - break; + Ok(line) => { + // +1 accounts for the newline character stripped by LinesCodec + bytes_received.fetch_add((line.len() + 1) as u64, Ordering::Relaxed); + match serde_json::from_str::(&line) { + Ok(msg) => { + if sender.send(msg).await.is_err() { + warn!("Receiver dropped, stopping reader"); + break; + } + } + Err(e) => { + error!("Failed to deserialize message: {e:?}"); } } - Err(e) => { - error!("Failed to deserialize message: {e:?}"); - } - }, + } Err(e) => { error!("Error reading from stream: {e:?}"); break; @@ -123,12 +142,15 @@ impl ConnectionSV1 { async fn run_writer( mut writer: BufWriter, receiver: Receiver, + bytes_sent: Arc, ) { while let Ok(msg) = receiver.recv().await { match serde_json::to_string(&msg) { Ok(line) => { let data = format!("{line}\n"); - if writer.write_all(data.as_bytes()).await.is_err() { + let data_bytes = data.as_bytes(); + bytes_sent.fetch_add(data_bytes.len() as u64, Ordering::Relaxed); + if writer.write_all(data_bytes).await.is_err() { error!("Failed to write to stream"); break; } @@ -164,6 +186,16 @@ impl ConnectionSV1 { pub fn sender(&self) -> Sender { self.sender.clone() } + + /// Get a reference to the bytes received counter. + pub fn bytes_received(&self) -> Arc { + self.bytes_received.clone() + } + + /// Get a reference to the bytes sent counter. + pub fn bytes_sent(&self) -> Arc { + self.bytes_sent.clone() + } } #[cfg(test)] diff --git a/stratum-apps/src/utils/protocol_message_type.rs b/stratum-apps/src/utils/protocol_message_type.rs index 2024c7b83..50d657033 100644 --- a/stratum-apps/src/utils/protocol_message_type.rs +++ b/stratum-apps/src/utils/protocol_message_type.rs @@ -1,3 +1,7 @@ +use crate::stratum_core::parsers_sv2::Mining; + +use super::types::ChannelId; + use stratum_core::extensions_sv2::{ EXTENSION_TYPE_EXTENSIONS_NEGOTIATION, MESSAGE_TYPE_REQUEST_EXTENSIONS, MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR, MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS, @@ -128,6 +132,37 @@ pub enum MessageType { Unknown, } +/// Returns the channel_id from a Mining message if it is a channel-specific message. +/// +/// Connection-level messages (OpenChannel, SetCustomMiningJob, SetGroupChannel, etc.) +/// return `None` since they do not carry a channel_id field. +pub fn mining_message_channel_id(msg: &Mining<'_>) -> Option { + match msg { + Mining::CloseChannel(m) => Some(m.channel_id), + Mining::NewExtendedMiningJob(m) => Some(m.channel_id), + Mining::NewMiningJob(m) => Some(m.channel_id), + Mining::SetExtranoncePrefix(m) => Some(m.channel_id), + Mining::SetNewPrevHash(m) => Some(m.channel_id), + Mining::SetTarget(m) => Some(m.channel_id), + Mining::SubmitSharesError(m) => Some(m.channel_id), + Mining::SubmitSharesExtended(m) => Some(m.channel_id), + Mining::SubmitSharesStandard(m) => Some(m.channel_id), + Mining::SubmitSharesSuccess(m) => Some(m.channel_id), + Mining::UpdateChannel(m) => Some(m.channel_id), + Mining::UpdateChannelError(m) => Some(m.channel_id), + // Connection-level messages have no channel_id + Mining::OpenExtendedMiningChannel(_) + | Mining::OpenExtendedMiningChannelSuccess(_) + | Mining::OpenMiningChannelError(_) + | Mining::OpenStandardMiningChannel(_) + | Mining::OpenStandardMiningChannelSuccess(_) + | Mining::SetCustomMiningJob(_) + | Mining::SetCustomMiningJobError(_) + | Mining::SetCustomMiningJobSuccess(_) + | Mining::SetGroupChannel(_) => None, + } +} + pub fn protocol_message_type(extension_type: u16, message_type: u8) -> MessageType { // Remove the channel_msg bit (bit 15) from extension_type to ensure correct matching const CHANNEL_MSG_MASK: u16 = 0b1000_0000_0000_0000;