From 156ad471b5256a85fc34f915277730b206f9b0a8 Mon Sep 17 00:00:00 2001 From: Charlie Little Date: Wed, 10 Jul 2024 05:43:46 -0500 Subject: [PATCH 1/3] Update mempool transaction scanning --- src/bin/nomic.rs | 3 + src/bitcoin/deposit_index.rs | 2 +- src/bitcoin/relayer.rs | 124 ++++++++++++++++++++--------------- tests/bitcoin.rs | 31 +++++++-- 4 files changed, 99 insertions(+), 61 deletions(-) diff --git a/src/bin/nomic.rs b/src/bin/nomic.rs index 60685bc79..4a1e679be 100644 --- a/src/bin/nomic.rs +++ b/src/bin/nomic.rs @@ -15,6 +15,7 @@ use nomic::app::IbcDest; use nomic::app::InnerApp; use nomic::app::Nom; use nomic::bitcoin::adapter::Adapter; +use nomic::bitcoin::deposit_index::DepositIndex; use nomic::bitcoin::signatory::SignatorySet; use nomic::bitcoin::Nbtc; use nomic::bitcoin::{relayer::Relayer, signer::Signer}; @@ -39,9 +40,11 @@ use std::fs::Permissions; use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; use std::str::FromStr; +use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; use tendermint_rpc::Client as _; +use tokio::sync::Mutex; const BANNER: &str = r#" ███╗ ██╗ ██████╗ ███╗ ███╗ ██╗ ██████╗ diff --git a/src/bitcoin/deposit_index.rs b/src/bitcoin/deposit_index.rs index bcf653cee..17c8ec1b9 100644 --- a/src/bitcoin/deposit_index.rs +++ b/src/bitcoin/deposit_index.rs @@ -30,7 +30,7 @@ pub struct DepositInfo { type ReceiverIndex = HashMap>>; -#[derive(Default)] +#[derive(Default, Debug)] pub struct DepositIndex { pub receiver_index: ReceiverIndex, } diff --git a/src/bitcoin/relayer.rs b/src/bitcoin/relayer.rs index 3fa8bd80b..c82fefbbd 100644 --- a/src/bitcoin/relayer.rs +++ b/src/bitcoin/relayer.rs @@ -1,3 +1,4 @@ +#![feature(async_closure)] use super::signatory::Signatory; use super::SignatorySet; use super::SIGSET_THRESHOLD; @@ -10,6 +11,7 @@ use crate::error::Result; use crate::orga::encoding::Encode; use crate::utils::time_now; use bitcoin::consensus::{Decodable, Encodable}; +use bitcoin::TxOut; use bitcoin::Txid; use bitcoin::{hashes::Hash, Block, BlockHash, Transaction}; use bitcoincore_rpc_async::{json::GetBlockHeaderResult, Client as BitcoinRpcClient, RpcApi}; @@ -45,7 +47,7 @@ pub struct Relayer { btc_client: Arc>, app_client_addr: String, - scripts: Option, + scripts: Arc>>, deposit_buffer: Option, } @@ -54,7 +56,7 @@ impl Relayer { Relayer { btc_client: Arc::new(RwLock::new(btc_client)), app_client_addr, - scripts: None, + scripts: Arc::new(Mutex::new(None)), deposit_buffer: None, } } @@ -120,12 +122,12 @@ impl Relayer { ) -> Result<()> { info!("Starting deposit relay..."); + let index = Arc::new(Mutex::new(DepositIndex::new())); let scripts = WatchedScriptStore::open(store_path, &self.app_client_addr).await?; - self.scripts = Some(scripts); + self.scripts = Arc::new(Mutex::new(Some(scripts))); self.deposit_buffer = Some(deposit_buffer); - let index = Arc::new(Mutex::new(DepositIndex::new())); let (server, mut recv) = self.create_address_server(index.clone())?; let deposit_relay = async { @@ -138,7 +140,24 @@ impl Relayer { } }; - join!(server, deposit_relay); + let mut seen_mempool_txids = HashSet::new(); + + let mempool_relay = async { + loop { + if let Err(e) = self + .scan_for_mempool_deposits(index.clone(), &mut seen_mempool_txids) + .await + { + if !e.to_string().contains("No completed checkpoints yet") { + error!("Mempool deposit relay error: {}", e); + } + } + + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + } + }; + + join!(server, deposit_relay, mempool_relay); Ok(()) } @@ -333,18 +352,14 @@ impl Relayer { } async fn relay_deposits( - &mut self, + &self, recv: &mut Receiver<(Dest, u32)>, index: Arc>, ) -> Result { let mut prev_tip = None; - let mut seen_mempool_txids = HashSet::new(); + let mut seen_mempool_txids: HashSet = HashSet::new(); loop { - self.scan_for_mempool_deposits(index.clone(), &mut seen_mempool_txids) - .await?; - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - self.insert_announced_addrs(recv).await?; let tip = self.sidechain_block_hash().await?; @@ -369,7 +384,7 @@ impl Relayer { } async fn scan_for_deposits( - &mut self, + &self, num_blocks: usize, index: Arc>, ) -> Result { @@ -384,7 +399,7 @@ impl Relayer { for (i, block) in blocks.into_iter().enumerate().rev() { let height = (base_height - i) as u32; - for (tx, matches) in self.relevant_txs(&block) { + for (tx, matches) in self.relevant_txs(&block).await? { for output in matches { if let Err(err) = self .maybe_relay_deposit(tx, height, &block.block_hash(), output, index.clone()) @@ -421,12 +436,12 @@ impl Relayer { let mut script_bytes = vec![]; output.script_pubkey.consensus_encode(&mut script_bytes)?; let script = ::bitcoin::Script::consensus_decode(&mut script_bytes.as_slice())?; - - if self.scripts.is_none() { + let script_guard = self.scripts.lock().await; + if script_guard.is_none() { return Ok(()); } - if let Some((dest, _)) = self.scripts.as_ref().unwrap().scripts.get(&script) { + if let Some((dest, _)) = script_guard.as_ref().unwrap().scripts.get(&script) { let bitcoin_address = bitcoin::Address::from_script( &output.script_pubkey.clone(), super::NETWORK, @@ -575,7 +590,7 @@ impl Relayer { info!("Starting recovery tx relay..."); let scripts = WatchedScriptStore::open(store_path, &self.app_client_addr).await?; - self.scripts = Some(scripts); + self.scripts = Arc::new(Mutex::new(Some(scripts))); loop { if let Err(e) = self.relay_recovery_txs().await { @@ -685,9 +700,9 @@ impl Relayer { if err.to_string().contains("No completed checkpoints yet") { tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; continue; - } else { - return Err(err.into()); } + + return Err(err.into()); } } }; @@ -767,7 +782,7 @@ impl Relayer { Ok(None) } - async fn insert_announced_addrs(&mut self, recv: &mut Receiver<(Dest, u32)>) -> Result<()> { + async fn insert_announced_addrs(&self, recv: &mut Receiver<(Dest, u32)>) -> Result<()> { while let Ok((addr, sigset_index)) = recv.try_recv() { let sigset_res = app_client(&self.app_client_addr) .query(|app| Ok(app.bitcoin.checkpoints.get(sigset_index)?.sigset.clone())) @@ -779,15 +794,15 @@ impl Relayer { continue; } }; - - self.scripts.as_mut().unwrap().insert(addr, &sigset)?; + let mut script_guard = self.scripts.lock().await; + script_guard.as_mut().unwrap().insert(addr, &sigset)?; } let max_age = app_client(&self.app_client_addr) .query(|app| Ok(app.bitcoin.checkpoints.config.max_age)) .await?; - - self.scripts + let mut script_guard = self.scripts.lock().await; + script_guard .as_mut() .unwrap() .scripts @@ -815,43 +830,44 @@ impl Relayer { Ok(blocks) } - pub fn relevant_txs<'a>( + pub async fn relevant_txs<'a>( &'a self, block: &'a Block, - ) -> impl Iterator + 'a)> + 'a { - block - .txdata - .iter() - .map(move |tx| (tx, self.relevant_outputs(tx))) + ) -> Result + 'a)> + 'a> + { + let mut txs = Vec::new(); + for tx in block.txdata.iter() { + txs.push((tx, self.relevant_outputs(tx).await?)); + } + + Ok(txs.into_iter()) } - pub fn relevant_outputs<'a>( + pub async fn relevant_outputs<'a>( &'a self, tx: &'a Transaction, - ) -> impl Iterator + 'a { - tx.output - .iter() - .enumerate() - .filter_map(move |(vout, output)| { - let mut script_bytes = vec![]; - output - .script_pubkey - .consensus_encode(&mut script_bytes) - .unwrap(); - let script = - ::bitcoin::Script::consensus_decode(&mut script_bytes.as_slice()).unwrap(); + ) -> Result + 'a> { + let mut matches = Vec::new(); + for (vout, output) in tx.output.iter().enumerate() { + let mut script_bytes = vec![]; + let encode: usize = output + .script_pubkey + .consensus_encode(&mut script_bytes) + .unwrap(); + let script = ::bitcoin::Script::consensus_decode(&mut script_bytes.as_slice()).unwrap(); + + let script_guard = self.scripts.lock().await; + if let Some((dest, sigset_index)) = script_guard.as_ref().unwrap().scripts.get(&script) + { + matches.push(OutputMatch { + sigset_index, + vout: vout as u32, + dest, + }); + } + } - self.scripts - .as_ref() - .unwrap() - .scripts - .get(&script) - .map(|(dest, sigset_index)| OutputMatch { - sigset_index, - vout: vout as u32, - dest, - }) - }) + Ok(matches.into_iter()) } async fn maybe_relay_deposit( diff --git a/tests/bitcoin.rs b/tests/bitcoin.rs index 75dc84d76..3761a8817 100644 --- a/tests/bitcoin.rs +++ b/tests/bitcoin.rs @@ -16,6 +16,7 @@ use nomic::app::{InnerApp, Nom}; use nomic::bitcoin::adapter::Adapter; use nomic::bitcoin::checkpoint::CheckpointStatus; use nomic::bitcoin::checkpoint::Config as CheckpointConfig; +use nomic::bitcoin::deposit_index::DepositIndex; use nomic::bitcoin::deposit_index::{Deposit, DepositInfo}; use nomic::bitcoin::header_queue::Config as HeaderQueueConfig; use nomic::bitcoin::relayer::DepositAddress; @@ -46,10 +47,12 @@ use serial_test::serial; use std::collections::HashMap; use std::fs; use std::str::FromStr; +use std::sync::Arc; use std::sync::Once; use std::time::Duration; use tempfile::tempdir; use tokio::sync::mpsc; +use tokio::sync::Mutex; static INIT: Once = Once::new(); @@ -1048,13 +1051,29 @@ async fn pending_deposits() { poll_for_active_sigset().await; poll_for_signatory_key(consensus_key).await; - deposit_bitcoin( - &funded_accounts[0].address, - bitcoin::Amount::from_btc(10.0).unwrap(), - &wallet, + let deposit_address = generate_deposit_address(&funded_accounts[0].address) + .await + .unwrap(); + broadcast_deposit_addr( + funded_accounts[0].address.to_string(), + deposit_address.sigset_index, + "http://localhost:8999".to_string(), + deposit_address.deposit_addr.clone(), ) - .await - .unwrap(); + .await?; + tokio::time::sleep(Duration::from_secs(10)).await; + &wallet + .send_to_address( + &bitcoin::Address::from_str(&deposit_address.deposit_addr).unwrap(), + bitcoin::Amount::from_btc(10.0).unwrap(), + None, + None, + None, + None, + None, + None, + ) + .unwrap(); let expected_balance = 0; let balance = poll_for_updated_balance(funded_accounts[0].address, expected_balance).await; From e46fa08d20586c686e2abfa37c97f6535021d9fe Mon Sep 17 00:00:00 2001 From: Charlie Little Date: Wed, 10 Jul 2024 05:45:55 -0500 Subject: [PATCH 2/3] Update recovered expired deposit test --- tests/bitcoin.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/bitcoin.rs b/tests/bitcoin.rs index 3761a8817..dc816fa52 100644 --- a/tests/bitcoin.rs +++ b/tests/bitcoin.rs @@ -1719,7 +1719,7 @@ async fn recover_expired_deposit() { poll_for_bitcoin_header(1185).await.unwrap(); poll_for_completed_checkpoint(3).await; - let expected_balance = 39595129200000; + let expected_balance = 39596871600000; let balance = poll_for_updated_balance(funded_accounts[1].address, expected_balance).await; assert_eq!(balance, Amount::from(expected_balance)); From c237f9ea78f4274833656b060149ef199b7f1c48 Mon Sep 17 00:00:00 2001 From: Charlie Little Date: Tue, 27 Aug 2024 16:05:45 -0500 Subject: [PATCH 3/3] Atomically write deposit addrs to script store --- src/bitcoin/relayer.rs | 59 +++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/src/bitcoin/relayer.rs b/src/bitcoin/relayer.rs index c82fefbbd..ab8fc9a65 100644 --- a/src/bitcoin/relayer.rs +++ b/src/bitcoin/relayer.rs @@ -128,11 +128,11 @@ impl Relayer { self.deposit_buffer = Some(deposit_buffer); - let (server, mut recv) = self.create_address_server(index.clone())?; + let server = self.create_address_server(index.clone())?; let deposit_relay = async { loop { - if let Err(e) = self.relay_deposits(&mut recv, index.clone()).await { + if let Err(e) = self.relay_deposits(index.clone()).await { error!("Deposit relay error: {}", e); } @@ -164,9 +164,7 @@ impl Relayer { fn create_address_server( &self, index: Arc>, - ) -> Result<(impl Future, Receiver<(Dest, u32)>)> { - let (send, recv) = tokio::sync::mpsc::channel(1024); - + ) -> Result> { let sigsets = Arc::new(Mutex::new(BTreeMap::new())); // TODO: pass into closures more cleanly @@ -178,6 +176,7 @@ impl Relayer { None => return Err(Error::Relayer("Deposit buffer not set".to_string())), }; + let watched_script_store = self.scripts.clone(); // TODO: configurable listen address use bytes::Bytes; use warp::Filter; @@ -185,13 +184,15 @@ impl Relayer { .and(warp::path("address")) .and(warp::query::()) .and(warp::filters::body::bytes()) - .map(move |query: DepositAddress, body| (query, send.clone(), sigsets.clone(), body)) + .map(move |query: DepositAddress, body| { + (query, sigsets.clone(), body, watched_script_store.clone()) + }) .and_then( - move |(query, send, sigsets, body): ( + move |(query, sigsets, body, watched_script_store): ( DepositAddress, - tokio::sync::mpsc::Sender<_>, Arc>>, Bytes, + Arc>, )| { async move { let dest = Dest::decode(body.to_vec().as_slice()) @@ -239,21 +240,28 @@ impl Relayer { dest, sigset.create_time, query.sigset_index, - send, + watched_script_store, + sigset.clone(), )) } }, ) .and_then( - move |(dest, create_time, sigset_index, send): ( + move |(dest, create_time, sigset_index, watched_script_store, sigset): ( Dest, u64, u32, - tokio::sync::mpsc::Sender<_>, + Arc>>, + SignatorySet, )| { async move { debug!("Received deposit commitment: {:?}, {}", dest, sigset_index); - send.send((dest, sigset_index)).await.unwrap(); + let mut script_guard = watched_script_store.lock().await; + script_guard + .as_mut() + .unwrap() + .insert(dest, &sigset) + .map_err(|e| warp::reject::custom(Error::from(e)))?; let max_deposit_age = app_client(app_client_addr) .query(|app| Ok(app.bitcoin.config.max_deposit_age)) .await @@ -348,19 +356,15 @@ impl Relayer { ), ) .run(([0, 0, 0, 0], 8999)); - Ok((server, recv)) + Ok(server) } - async fn relay_deposits( - &self, - recv: &mut Receiver<(Dest, u32)>, - index: Arc>, - ) -> Result { + async fn relay_deposits(&self, index: Arc>) -> Result { let mut prev_tip = None; let mut seen_mempool_txids: HashSet = HashSet::new(); loop { - self.insert_announced_addrs(recv).await?; + self.remove_expired().await?; let tip = self.sidechain_block_hash().await?; let prev = prev_tip.unwrap_or(tip); @@ -782,22 +786,7 @@ impl Relayer { Ok(None) } - async fn insert_announced_addrs(&self, recv: &mut Receiver<(Dest, u32)>) -> Result<()> { - while let Ok((addr, sigset_index)) = recv.try_recv() { - let sigset_res = app_client(&self.app_client_addr) - .query(|app| Ok(app.bitcoin.checkpoints.get(sigset_index)?.sigset.clone())) - .await; - let sigset = match sigset_res { - Ok(sigset) => sigset, - Err(err) => { - error!("{}", err); - continue; - } - }; - let mut script_guard = self.scripts.lock().await; - script_guard.as_mut().unwrap().insert(addr, &sigset)?; - } - + async fn remove_expired(&self) -> Result<()> { let max_age = app_client(&self.app_client_addr) .query(|app| Ok(app.bitcoin.checkpoints.config.max_age)) .await?;