Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration-tests/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions miner-apps/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pool-apps/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1,746 changes: 913 additions & 833 deletions pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs

Large diffs are not rendered by default.

240 changes: 118 additions & 122 deletions pool-apps/pool/src/lib/channel_manager/mod.rs

Large diffs are not rendered by default.

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions pool-apps/pool/src/lib/downstream/common_message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ impl HandleCommonMessagesFromClientAsync for Downstream {
&self,
_client_id: Option<usize>,
) -> Result<Vec<u16>, Self::Error> {
Ok(self
.downstream_data
.super_safe_lock(|data| data.negotiated_extensions.clone()))
Ok(self.negotiated_extensions.get())
}

async fn handle_setup_connection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ impl HandleExtensionsFromClientAsync for Downstream {
&self,
_client_id: Option<usize>,
) -> Result<Vec<u16>, Self::Error> {
Ok(self
.downstream_data
.super_safe_lock(|data| data.negotiated_extensions.clone()))
Ok(self.negotiated_extensions.get())
}

async fn handle_request_extensions(
Expand Down Expand Up @@ -114,9 +112,7 @@ impl HandleExtensionsFromClientAsync for Downstream {
);

// Store the negotiated extensions in the shared downstream data
self.downstream_data.super_safe_lock(|data| {
data.negotiated_extensions = supported.clone();
});
self.negotiated_extensions.set(supported.clone());

let success = RequestExtensionsSuccess {
request_id: msg.request_id,
Expand Down
66 changes: 24 additions & 42 deletions pool-apps/pool/src/lib/downstream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, AtomicU32},
Arc,
},
use std::sync::{
atomic::{AtomicBool, AtomicU32},
Arc,
};

use async_channel::{unbounded, Receiver, Sender};
use bitcoin_core_sv2::template_distribution_protocol::CancellationToken;
use stratum_apps::{
custom_mutex::Mutex,
network_helpers::noise_stream::NoiseTcpStream,
shared::{Shared, SharedMap},
stratum_core::{
channels_sv2::server::{
extended::ExtendedChannel,
Expand Down Expand Up @@ -42,27 +39,6 @@ use crate::{
mod common_message_handler;
mod extensions_message_handler;

/// Holds state related to a downstream connection's mining channels.
///
/// This includes:
/// - Whether the downstream requires a standard job (`require_std_job`).
/// - A [`GroupChannel`].
/// - Active [`ExtendedChannel`]s keyed by channel ID.
/// - Active [`StandardChannel`]s keyed by channel ID.
/// - Extensions that have been successfully negotiated with this client
pub struct DownstreamData {
pub group_channel: GroupChannel<'static, DefaultJobStore<ExtendedJob<'static>>>,
pub extended_channels:
HashMap<ChannelId, ExtendedChannel<'static, DefaultJobStore<ExtendedJob<'static>>>>,
pub standard_channels:
HashMap<ChannelId, StandardChannel<'static, DefaultJobStore<StandardJob<'static>>>>,
pub channel_id_factory: AtomicU32,
/// Extensions that have been successfully negotiated with this client
pub negotiated_extensions: Vec<u16>,
/// Payout mode derived from user_identity (None until channel is opened)
pub payout_mode: Option<PayoutMode>,
}

/// Communication layer for a downstream connection.
///
/// Provides the messaging primitives for interacting with the
Expand All @@ -86,7 +62,6 @@ pub struct DownstreamChannel {
/// Represents a downstream client connected to this node.
#[derive(Clone)]
pub struct Downstream {
pub downstream_data: Arc<Mutex<DownstreamData>>,
downstream_channel: DownstreamChannel,
pub downstream_id: usize,
pub requires_standard_jobs: Arc<AtomicBool>,
Expand All @@ -95,6 +70,19 @@ pub struct Downstream {
pub supported_extensions: Vec<u16>,
/// Extensions that the pool requires
pub required_extensions: Vec<u16>,
/// Channel id factory
pub channel_id_factory: Arc<AtomicU32>,
/// Extended Channels
pub extended_channels:
SharedMap<ChannelId, ExtendedChannel<'static, DefaultJobStore<ExtendedJob<'static>>>>,
/// Standard Channels
pub standard_channels:
SharedMap<ChannelId, StandardChannel<'static, DefaultJobStore<StandardJob<'static>>>>,
/// Group Channel
pub group_channel: Shared<GroupChannel<'static, DefaultJobStore<ExtendedJob<'static>>>>,
/// Extensions that have been successfully negotiated with this client
pub negotiated_extensions: Shared<Vec<u16>>,
pub payout_mode: Shared<Option<PayoutMode>>,
}

#[cfg_attr(not(test), hotpath::measure_all)]
Expand Down Expand Up @@ -141,23 +129,19 @@ impl Downstream {
connection_token,
};

let downstream_data = Arc::new(Mutex::new(DownstreamData {
extended_channels: HashMap::new(),
standard_channels: HashMap::new(),
group_channel,
channel_id_factory,
negotiated_extensions: vec![],
payout_mode: None,
}));

Downstream {
downstream_channel,
downstream_data,
downstream_id,
requires_standard_jobs: Arc::new(AtomicBool::new(false)),
requires_custom_work: Arc::new(AtomicBool::new(false)),
channel_id_factory: Arc::new(channel_id_factory),
extended_channels: SharedMap::new(),
standard_channels: SharedMap::new(),
group_channel: Shared::new(group_channel),
negotiated_extensions: Shared::new(vec![]),
supported_extensions,
required_extensions,
payout_mode: Shared::new(None),
}
}

Expand Down Expand Up @@ -310,9 +294,7 @@ impl Downstream {
match protocol_message_type(header.ext_type(), header.msg_type()) {
MessageType::Mining => {
debug!("Received mining SV2 frame from downstream.");
let negotiated_extensions = self
.downstream_data
.super_safe_lock(|data| data.negotiated_extensions.clone());
let negotiated_extensions = self.negotiated_extensions.get();
let (any_message, tlv_fields) = parse_message_frame_with_tlvs(
header,
sv2_frame.payload(),
Expand Down
141 changes: 66 additions & 75 deletions pool-apps/pool/src/lib/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,98 +11,89 @@ use crate::{channel_manager::ChannelManager, downstream::Downstream};

/// Helper to convert a Downstream to Sv2ClientInfo.
/// Returns None if the lock cannot be acquired (graceful degradation for monitoring).
fn downstream_to_sv2_client_info(client: &Downstream) -> Option<Sv2ClientInfo> {
client
.downstream_data
.safe_lock(|dd| {
let mut extended_channels = Vec::new();
let mut standard_channels = Vec::new();
fn downstream_to_sv2_client_info(client: &Downstream) -> Sv2ClientInfo {
let mut extended_channels = Vec::new();
let mut standard_channels = Vec::new();

for (_channel_id, extended_channel) in dd.extended_channels.iter() {
let channel_id = extended_channel.get_channel_id();
let target = extended_channel.get_target();
let requested_max_target = extended_channel.get_requested_max_target();
let user_identity = extended_channel.get_user_identity();
let share_accounting = extended_channel.get_share_accounting();
client
.extended_channels
.for_each(|channel_id, extended_channel| {
let target = extended_channel.get_target();
let requested_max_target = extended_channel.get_requested_max_target();
let user_identity = extended_channel.get_user_identity();
let share_accounting = extended_channel.get_share_accounting();

extended_channels.push(ExtendedChannelInfo {
channel_id,
user_identity: user_identity.clone(),
nominal_hashrate: extended_channel.get_nominal_hashrate(),
target_hex: hex::encode(target.to_be_bytes()),
requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()),
extranonce_prefix_hex: hex::encode(extended_channel.get_extranonce_prefix()),
full_extranonce_size: extended_channel.get_full_extranonce_size(),
rollable_extranonce_size: extended_channel.get_rollable_extranonce_size(),
expected_shares_per_minute: extended_channel.get_shares_per_minute(),
shares_accepted: share_accounting.get_shares_accepted(),
share_work_sum: share_accounting.get_share_work_sum(),
last_share_sequence_number: share_accounting.get_last_share_sequence_number(),
best_diff: share_accounting.get_best_diff(),
last_batch_accepted: share_accounting.get_last_batch_accepted(),
last_batch_work_sum: share_accounting.get_last_batch_work_sum(),
share_batch_size: share_accounting.get_share_batch_size(),
blocks_found: share_accounting.get_blocks_found(),
});
}
extended_channels.push(ExtendedChannelInfo {
channel_id,
user_identity: user_identity.clone(),
nominal_hashrate: extended_channel.get_nominal_hashrate(),
target_hex: hex::encode(target.to_be_bytes()),
requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()),
extranonce_prefix_hex: hex::encode(extended_channel.get_extranonce_prefix()),
full_extranonce_size: extended_channel.get_full_extranonce_size(),
rollable_extranonce_size: extended_channel.get_rollable_extranonce_size(),
expected_shares_per_minute: extended_channel.get_shares_per_minute(),
shares_accepted: share_accounting.get_shares_accepted(),
share_work_sum: share_accounting.get_share_work_sum(),
last_share_sequence_number: share_accounting.get_last_share_sequence_number(),
best_diff: share_accounting.get_best_diff(),
last_batch_accepted: share_accounting.get_last_batch_accepted(),
last_batch_work_sum: share_accounting.get_last_batch_work_sum(),
share_batch_size: share_accounting.get_share_batch_size(),
blocks_found: share_accounting.get_blocks_found(),
});
});

for (_channel_id, standard_channel) in dd.standard_channels.iter() {
let channel_id = standard_channel.get_channel_id();
let target = standard_channel.get_target();
let requested_max_target = standard_channel.get_requested_max_target();
let user_identity = standard_channel.get_user_identity();
let share_accounting = standard_channel.get_share_accounting();
client
.standard_channels
.for_each(|channel_id, standard_channel| {
let target = standard_channel.get_target();
let requested_max_target = standard_channel.get_requested_max_target();
let user_identity = standard_channel.get_user_identity();
let share_accounting = standard_channel.get_share_accounting();

standard_channels.push(StandardChannelInfo {
channel_id,
user_identity: user_identity.clone(),
nominal_hashrate: standard_channel.get_nominal_hashrate(),
target_hex: hex::encode(target.to_be_bytes()),
requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()),
extranonce_prefix_hex: hex::encode(standard_channel.get_extranonce_prefix()),
expected_shares_per_minute: standard_channel.get_shares_per_minute(),
shares_accepted: share_accounting.get_shares_accepted(),
share_work_sum: share_accounting.get_share_work_sum(),
last_share_sequence_number: share_accounting.get_last_share_sequence_number(),
best_diff: share_accounting.get_best_diff(),
last_batch_accepted: share_accounting.get_last_batch_accepted(),
last_batch_work_sum: share_accounting.get_last_batch_work_sum(),
share_batch_size: share_accounting.get_share_batch_size(),
blocks_found: share_accounting.get_blocks_found(),
});
}
standard_channels.push(StandardChannelInfo {
channel_id,
user_identity: user_identity.clone(),
nominal_hashrate: standard_channel.get_nominal_hashrate(),
target_hex: hex::encode(target.to_be_bytes()),
requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()),
extranonce_prefix_hex: hex::encode(standard_channel.get_extranonce_prefix()),
expected_shares_per_minute: standard_channel.get_shares_per_minute(),
shares_accepted: share_accounting.get_shares_accepted(),
share_work_sum: share_accounting.get_share_work_sum(),
last_share_sequence_number: share_accounting.get_last_share_sequence_number(),
best_diff: share_accounting.get_best_diff(),
last_batch_accepted: share_accounting.get_last_batch_accepted(),
last_batch_work_sum: share_accounting.get_last_batch_work_sum(),
share_batch_size: share_accounting.get_share_batch_size(),
blocks_found: share_accounting.get_blocks_found(),
});
});

Sv2ClientInfo {
client_id: client.downstream_id,
extended_channels,
standard_channels,
}
})
.ok()
Sv2ClientInfo {
client_id: client.downstream_id,
extended_channels,
standard_channels,
}
}

impl Sv2ClientsMonitoring for ChannelManager {
fn get_sv2_clients(&self) -> Vec<Sv2ClientInfo> {
// Clone Downstream references and release lock immediately to avoid contention
// with template distribution and message handling
let downstream_refs: Vec<Downstream> = self
.channel_manager_data
.safe_lock(|data| data.downstream.values().cloned().collect())
.unwrap_or_default();
let mut downstream_refs: Vec<Downstream> = Vec::new();
self.downstream
.for_each(|_id, downstream| downstream_refs.push(downstream.clone()));

downstream_refs
.iter()
.filter_map(downstream_to_sv2_client_info)
.map(downstream_to_sv2_client_info)
.collect()
}

fn get_sv2_client_by_id(&self, client_id: usize) -> Option<Sv2ClientInfo> {
self.channel_manager_data
.safe_lock(|d| {
d.downstream
.get(&client_id)
.and_then(downstream_to_sv2_client_info)
})
.unwrap_or(None)
self.downstream
.with(&client_id, downstream_to_sv2_client_info)
}
}
1 change: 1 addition & 0 deletions stratum-apps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ utoipa-swagger-ui = { version = "9.0.2", features = ["axum"], optional = true }
# Common external dependencies that roles always need
ext-config = { version = "0.14.0", features = ["toml"], package = "config" }
shellexpand = "3.1.1"
dashmap = "6.1.0"

[features]
default = ["network", "config", "std"]
Expand Down
3 changes: 3 additions & 0 deletions stratum-apps/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,6 @@ pub mod coinbase_output_constraints;

/// Fallback coordinator
pub mod fallback_coordinator;

/// Synchronous and shared data structure wrapper
pub mod shared;
Loading
Loading