Skip to content

Commit 35a852a

Browse files
authored
Merge branch 'main' into feat/coinbase-rotation
2 parents 28ef713 + 5905549 commit 35a852a

File tree

2 files changed

+226
-43
lines changed

2 files changed

+226
-43
lines changed

integration-tests/tests/translator_integration.rs

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,3 +1613,184 @@ async fn translator_does_not_shutdown_on_missing_downstream_channel() {
16131613

16141614
assert!(TcpListener::bind(tproxy_addr).await.is_err());
16151615
}
1616+
1617+
/// This test verifies that in aggregated mode, a new downstream connection that arrives
1618+
/// between a future NewExtendedMiningJob and its corresponding SetNewPrevHash will correctly
1619+
/// receive the future job and be able to submit shares after SetNewPrevHash activates the job.
1620+
///
1621+
/// This is a regression test for the "Failed to set new prev hash: JobIdNotFound" error
1622+
/// that occurred when new downstream channels were created while a future job was pending.
1623+
///
1624+
/// See: https://github.com/stratum-mining/sv2-apps/issues/223
1625+
#[tokio::test]
1626+
async fn aggregated_translator_handles_downstream_connecting_during_future_job() {
1627+
start_tracing();
1628+
1629+
let mock_upstream_addr = get_available_address();
1630+
let mock_upstream = MockUpstream::new(mock_upstream_addr);
1631+
let send_to_tproxy = mock_upstream.start().await;
1632+
1633+
// ignore SubmitSharesSuccess messages to simplify the test flow
1634+
let ignore_submit_shares_success = IgnoreMessage::new(
1635+
MessageDirection::ToDownstream,
1636+
MESSAGE_TYPE_SUBMIT_SHARES_SUCCESS,
1637+
);
1638+
let (sniffer, sniffer_addr) = start_sniffer(
1639+
"future_job_test",
1640+
mock_upstream_addr,
1641+
false,
1642+
vec![ignore_submit_shares_success.into()],
1643+
None,
1644+
);
1645+
1646+
// Start translator in aggregated mode
1647+
let (_tproxy, tproxy_addr) =
1648+
start_sv2_translator(&[sniffer_addr], true, vec![], vec![], None).await;
1649+
1650+
sniffer
1651+
.wait_for_message_type_and_clean_queue(
1652+
MessageDirection::ToUpstream,
1653+
MESSAGE_TYPE_SETUP_CONNECTION,
1654+
)
1655+
.await;
1656+
1657+
let setup_connection_success = AnyMessage::Common(CommonMessages::SetupConnectionSuccess(
1658+
SetupConnectionSuccess {
1659+
used_version: 2,
1660+
flags: 0,
1661+
},
1662+
));
1663+
send_to_tproxy.send(setup_connection_success).await.unwrap();
1664+
1665+
// Keep references to minerd processes and SV1 sniffers so they don't get dropped
1666+
let mut minerd_vec = Vec::new();
1667+
1668+
// Start SV1 sniffer for the first miner
1669+
let (sv1_sniffer_1, sv1_sniffer_addr_1) = start_sv1_sniffer(tproxy_addr);
1670+
1671+
// Start the first minerd (through SV1 sniffer) to trigger OpenExtendedMiningChannel
1672+
let (minerd_process_1, _minerd_addr_1) =
1673+
start_minerd(sv1_sniffer_addr_1, None, None, false).await;
1674+
minerd_vec.push(minerd_process_1);
1675+
1676+
sniffer
1677+
.wait_for_message_type(
1678+
MessageDirection::ToUpstream,
1679+
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL,
1680+
)
1681+
.await;
1682+
1683+
let open_extended_mining_channel: OpenExtendedMiningChannel = loop {
1684+
match sniffer.next_message_from_downstream() {
1685+
Some((_, AnyMessage::Mining(parsers_sv2::Mining::OpenExtendedMiningChannel(msg)))) => {
1686+
break msg;
1687+
}
1688+
_ => continue,
1689+
};
1690+
};
1691+
1692+
// Send OpenExtendedMiningChannelSuccess for the aggregated channel
1693+
let open_extended_mining_channel_success = AnyMessage::Mining(
1694+
parsers_sv2::Mining::OpenExtendedMiningChannelSuccess(OpenExtendedMiningChannelSuccess {
1695+
request_id: open_extended_mining_channel.request_id,
1696+
channel_id: 2, // aggregated channel ID
1697+
target: hex::decode("0000137c578190689425e3ecf8449a1af39db0aed305d9206f45ac32fe8330fc")
1698+
.unwrap()
1699+
.try_into()
1700+
.unwrap(),
1701+
// full extranonce has a total of 12 bytes
1702+
extranonce_size: 8,
1703+
extranonce_prefix: vec![0x00, 0x01, 0x00, 0x00].try_into().unwrap(),
1704+
group_channel_id: 1,
1705+
}),
1706+
);
1707+
send_to_tproxy
1708+
.send(open_extended_mining_channel_success)
1709+
.await
1710+
.unwrap();
1711+
1712+
sniffer
1713+
.wait_for_message_type_and_clean_queue(
1714+
MessageDirection::ToDownstream,
1715+
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL_SUCCESS,
1716+
)
1717+
.await;
1718+
1719+
// Send a FUTURE job (min_ntime: None) - this job is not active yet!
1720+
let future_job = AnyMessage::Mining(parsers_sv2::Mining::NewExtendedMiningJob(
1721+
NewExtendedMiningJob {
1722+
channel_id: 2,
1723+
job_id: 1,
1724+
min_ntime: Sv2Option::new(None), // This makes it a future job!
1725+
version: 0x20000000,
1726+
version_rolling_allowed: true,
1727+
merkle_path: Seq0255::new(vec![]).unwrap(),
1728+
coinbase_tx_prefix: hex::decode("02000000010000000000000000000000000000000000000000000000000000000000000000ffffffff265200162f5374726174756d2056322053524920506f6f6c2f2f0c").unwrap().try_into().unwrap(),
1729+
coinbase_tx_suffix: hex::decode("feffffff0200f2052a01000000160014ebe1b7dcc293ccaa0ee743a86f89df8258c208fc0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf901000000").unwrap().try_into().unwrap(),
1730+
},
1731+
));
1732+
1733+
send_to_tproxy.send(future_job).await.unwrap();
1734+
sniffer
1735+
.wait_for_message_type_and_clean_queue(
1736+
MessageDirection::ToDownstream,
1737+
MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB,
1738+
)
1739+
.await;
1740+
1741+
// CRITICAL: Start a SECOND minerd BEFORE sending SetNewPrevHash
1742+
// This is the race condition we're testing - the new downstream connects
1743+
// while a future job is pending but not yet activated
1744+
1745+
// Start SV1 sniffer for the second miner
1746+
let (sv1_sniffer_2, sv1_sniffer_addr_2) = start_sv1_sniffer(tproxy_addr);
1747+
1748+
let (minerd_process_2, _minerd_addr_2) =
1749+
start_minerd(sv1_sniffer_addr_2, None, None, false).await;
1750+
minerd_vec.push(minerd_process_2);
1751+
1752+
// Give time for the second minerd to connect and the channel to be created
1753+
tokio::time::sleep(Duration::from_millis(1000)).await;
1754+
1755+
// Now send SetNewPrevHash to activate the future job
1756+
// Without the fix, this would cause "Failed to set new prev hash: JobIdNotFound"
1757+
// because the second downstream's channel wouldn't have the future job
1758+
let set_new_prev_hash =
1759+
AnyMessage::Mining(parsers_sv2::Mining::SetNewPrevHash(SetNewPrevHash {
1760+
channel_id: 2,
1761+
job_id: 1,
1762+
prev_hash: hex::decode(
1763+
"3ab7089cd2cd30f133552cfde82c4cb239cd3c2310306f9d825e088a1772cc39",
1764+
)
1765+
.unwrap()
1766+
.try_into()
1767+
.unwrap(),
1768+
min_ntime: 1766782170,
1769+
nbits: 0x207fffff,
1770+
}));
1771+
1772+
send_to_tproxy.send(set_new_prev_hash).await.unwrap();
1773+
sniffer
1774+
.wait_for_message_type_and_clean_queue(
1775+
MessageDirection::ToDownstream,
1776+
MESSAGE_TYPE_MINING_SET_NEW_PREV_HASH,
1777+
)
1778+
.await;
1779+
1780+
// Verify BOTH miners receive the mining.notify message
1781+
sv1_sniffer_1
1782+
.wait_for_message(&["mining.notify"], MessageDirection::ToDownstream)
1783+
.await;
1784+
sv1_sniffer_2
1785+
.wait_for_message(&["mining.notify"], MessageDirection::ToDownstream)
1786+
.await;
1787+
1788+
// Verify BOTH miners submit shares (mining.submit)
1789+
// This proves both miners are working correctly after the future job was activated
1790+
sv1_sniffer_1
1791+
.wait_for_message(&["mining.submit"], MessageDirection::ToUpstream)
1792+
.await;
1793+
sv1_sniffer_2
1794+
.wait_for_message(&["mining.submit"], MessageDirection::ToUpstream)
1795+
.await;
1796+
}

miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs

Lines changed: 45 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -360,60 +360,62 @@ impl ChannelManager {
360360
);
361361
TproxyError::shutdown(TproxyErrorKind::ChannelErrorSender)
362362
})?;
363-
// get the last active job from the upstream extended channel
364-
let last_active_job =
363+
// Initialize the new downstream channel with state from upstream:
364+
// chain tip, active job, and any pending future jobs.
365+
let active_job_for_sv1_server =
365366
self.channel_manager_data.super_safe_lock(|c| {
366-
c.upstream_extended_channel
367+
let (last_active_job, future_jobs, last_chain_tip) = c
368+
.upstream_extended_channel
367369
.as_ref()
368370
.and_then(|ch| ch.read().ok())
369-
.and_then(|ch| ch.get_active_job().map(|j| j.0.clone()))
370-
});
371+
.map(|ch| {
372+
let active =
373+
ch.get_active_job().map(|j| j.0.clone());
374+
let futures = ch
375+
.get_future_jobs()
376+
.values()
377+
.map(|j| j.0.clone())
378+
.collect::<Vec<_>>();
379+
let chain_tip = ch.get_chain_tip().cloned();
380+
(active, futures, chain_tip)
381+
})?;
382+
383+
let channel = c.extended_channels.get(&next_channel_id)?;
384+
let mut channel = channel.write().ok()?;
385+
386+
if let Some(chain_tip) = last_chain_tip {
387+
channel.set_chain_tip(chain_tip);
388+
}
371389

372-
// get the last chain tip from the upstream extended channel
373-
let last_chain_tip =
374-
self.channel_manager_data.super_safe_lock(|c| {
375-
c.upstream_extended_channel
376-
.as_ref()
377-
.and_then(|ch| ch.read().ok())
378-
.and_then(|ch| ch.get_chain_tip().cloned())
379-
});
380-
// update the downstream channel with the active job and the chain
381-
// tip
382-
if let Some(mut job) = last_active_job {
383-
if let Some(last_chain_tip) = last_chain_tip {
384-
// update the downstream channel with the active chain tip
385-
self.channel_manager_data.super_safe_lock(|c| {
386-
if let Some(ch) =
387-
c.extended_channels.get(&next_channel_id)
388-
{
389-
ch.write()
390-
.unwrap()
391-
.set_chain_tip(last_chain_tip.clone());
392-
}
393-
});
394-
}
395-
job.channel_id = next_channel_id;
396-
// update the downstream channel with the active job
397-
self.channel_manager_data.super_safe_lock(|c| {
398-
if let Some(ch) = c.extended_channels.get(&next_channel_id)
399-
{
400-
let _ = ch
401-
.write()
402-
.unwrap()
403-
.on_new_extended_mining_job(job.clone());
390+
if let Some(mut job) = last_active_job.clone() {
391+
job.channel_id = next_channel_id;
392+
let _ = channel.on_new_extended_mining_job(job);
404393
}
405-
});
406394

407-
// set the channel id to the aggregated channel id
408-
// before sending the message to the SV1Server
409-
job.channel_id = AGGREGATED_CHANNEL_ID;
395+
// Also add any future jobs so SetNewPrevHash won't fail
396+
for mut future_job in future_jobs {
397+
future_job.channel_id = next_channel_id;
398+
let _ = channel.on_new_extended_mining_job(future_job);
399+
}
400+
401+
// set the channel id to the aggregated channel id
402+
// before sending the message to the Sv1Server
403+
last_active_job.map(|mut job| {
404+
job.channel_id = AGGREGATED_CHANNEL_ID;
405+
job
406+
})
407+
});
410408

409+
if let Some(job) = active_job_for_sv1_server {
411410
self.channel_state
412411
.sv1_server_sender
413-
.send((Mining::NewExtendedMiningJob(job.clone()), None))
412+
.send((Mining::NewExtendedMiningJob(job), None))
414413
.await
415414
.map_err(|e| {
416-
error!("Failed to send last new extended mining job to SV1Server: {:?}", e);
415+
error!(
416+
"Failed to send active extended mining job to Sv1Server: {:?}",
417+
e
418+
);
417419
TproxyError::shutdown(TproxyErrorKind::ChannelErrorSender)
418420
})?;
419421
}

0 commit comments

Comments
 (0)