Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 41 additions & 44 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,12 @@ pub(crate) struct ConnectionState {

pub(crate) struct ThreadPool {
pub(crate) threads: Mutex<Vec<JoinHandle<()>>>,
pub(crate) port: u16,
}

impl ThreadPool {
pub(crate) fn new(port: u16) -> Self {
pub(crate) fn new() -> Self {
Self {
threads: Mutex::new(Vec::new()),
port,
}
}

Expand All @@ -196,31 +194,22 @@ impl ThreadPool {
.threads
.lock()
.map_err(|_| MakerError::General("Failed to lock threads"))?;

log::info!("Joining {} threads", threads.len());

let mut joined_count = 0;
while let Some(thread) = threads.pop() {
let thread_name = thread.thread().name().unwrap().to_string();
println!("joining thread: {thread_name}");

let thread_name = thread.thread().name().unwrap_or("unknown").to_string();
match thread.join() {
Ok(_) => {
log::info!("[{}] Thread {} joined", self.port, thread_name);
log::info!("Thread {} joined", thread_name);
joined_count += 1;
}
Err(e) => {
log::error!(
"[{}] Error {:?} while joining thread {}",
self.port,
e,
thread_name
);
log::error!("Error {:?} while joining thread {}", e, thread_name);
}
}
}

log::info!("Successfully joined {joined_count} threads",);
log::info!("Successfully joined {joined_count} threads");
Ok(())
}
}
Expand Down Expand Up @@ -324,8 +313,6 @@ impl Maker {
log::info!("Sync at:----Maker init----");
wallet.sync_and_save()?;

let network_port = config.network_port;

Ok(Self {
behavior,
config,
Expand All @@ -335,7 +322,7 @@ impl Maker {
highest_fidelity_proof: RwLock::new(None),
is_setup_complete: AtomicBool::new(false),
data_dir,
thread_pool: Arc::new(ThreadPool::new(network_port)),
thread_pool: Arc::new(ThreadPool::new()),
watch_service,
})
}
Expand Down Expand Up @@ -619,17 +606,17 @@ pub(crate) fn check_for_broadcasted_contracts(maker: Arc<Maker>) -> Result<(), M
if transaction_broadcasted {
// Something is broadcasted. Report, Recover and Abort.
log::warn!(
"[{}] Contract txs broadcasted!! txid: {} Recovering from ongoing swaps.",
maker.config.network_port,
"[Peer: {}] Contract txs broadcasted!! txid: {} Recovering from ongoing swaps.",
ip,
txid
);
failed_swap_ip.push(ip.clone());

// Spawn a separate thread to wait for contract maturity and broadcasting timelocked/hashlocked.
let maker_clone = maker.clone();
log::info!(
"[{}] Spawning recovery thread after seeing contracts in mempool",
maker.config.network_port
"[Peer: {}] Spawning recovery thread after seeing contracts in mempool.",
ip
);

let incomings = connection_state.incoming_swapcoins.clone();
Expand Down Expand Up @@ -705,16 +692,16 @@ pub(crate) fn check_for_idle_states(maker: Arc<Maker>) -> Result<(), MakerError>

if no_response_since > IDLE_CONNECTION_TIMEOUT {
log::error!(
"[{}] Potential Dropped Connection from taker. No response since : {} secs. Recovering from swap",
maker.config.network_port,
"[Peer: {}] Potential Dropped Connection from taker. No response since : {} secs. Recovering from swap",
ip,
no_response_since.as_secs()
);
bad_ip.push(ip.clone());
// Spawn a separate thread to wait for contract maturity and broadcasting timelocked,hashlocked
let maker_clone = maker.clone();
log::info!(
"[{}] Spawning recovery thread after Taker dropped",
maker.config.network_port
"[Peer: {}] Spawning recovery thread after Taker dropped",
ip
);

let incomings = state.incoming_swapcoins.clone();
Expand Down Expand Up @@ -765,9 +752,14 @@ pub(crate) fn recover_from_swap(
.map_err(WalletError::Rpc)? as u32;
let timelock_expiry = start_height.saturating_add(timelock);

let anchor_txid = outgoing_swapcoins
.first()
.map(|o| o.contract_tx.compute_txid().to_string())
.unwrap_or_else(|| "unknown".to_string());

log::info!(
"[{}] recover_from_swap started | height={} timelock_expiry={}",
maker.config.network_port,
"[Contract: {}] recover_from_swap started | height={} timelock_expiry={}",
anchor_txid,
start_height,
timelock_expiry
);
Expand All @@ -782,8 +774,7 @@ pub(crate) fn recover_from_swap(

if current_height >= timelock_expiry {
log::info!(
"[{}] timelock expired at {} (expiry={}), using timelock path",
maker.config.network_port,
"timelock expired at {} (expiry={}), using timelock path",
current_height,
timelock_expiry
);
Expand All @@ -802,8 +793,7 @@ pub(crate) fn recover_from_swap(

if all_preimages_known {
log::info!(
"[{}] all preimages known at height {}, using hashlock path",
maker.config.network_port,
"all preimages known at height {}, using hashlock path",
current_height
);
return recover_via_hashlock(maker, incoming_swapcoins);
Expand All @@ -824,14 +814,20 @@ fn recover_via_hashlock(
.write()?
.broadcast_incoming_contracts(incoming.clone())?;

let anchor_id = incoming
.first()
.map(|c| c.contract_tx.compute_txid().to_string())
.unwrap_or_default();

while !maker.shutdown.load(Relaxed) {
let broadcasted = maker
.wallet
.write()?
.spend_from_hashlock_contract(&infos, &maker.watch_service)?;

log::info!(
"[{}] Maker hashlock recovery: {}/{} txs broadcasted",
maker.config.network_port,
"[Contract: {}] Maker hashlock recovery: {}/{} txs broadcasted",
anchor_id,
broadcasted.len(),
incoming.len()
);
Expand Down Expand Up @@ -879,14 +875,20 @@ fn recover_via_timelock(
.write()?
.broadcast_outgoing_contracts(outgoing.clone())?;

let anchor_id = outgoing
.first()
.map(|c| c.contract_tx.compute_txid().to_string())
.unwrap_or_default();

while !maker.shutdown.load(Relaxed) {
let broadcasted = maker
.wallet
.write()?
.spend_from_timelock_contract(&infos, &maker.watch_service)?;

log::info!(
"[{}] Maker timelock recovery: {}/{} txs broadcasted",
maker.config.network_port,
"[Contract: {}] Maker timelock recovery: {}/{} txs broadcasted",
anchor_id,
broadcasted.len(),
outgoing.len()
);
Expand Down Expand Up @@ -943,11 +945,7 @@ fn check_for_watch_response(
}

for transaction in responses {
log::info!(
"[{}] Received WatchResponse with mempool txs: {:?}",
maker.config.network_port,
transaction
);
log::info!("Received WatchResponse with mempool txs: {:?}", transaction);

for input in &transaction.input {
let outpoint = (input.previous_output.txid, input.previous_output.vout);
Expand Down Expand Up @@ -1001,8 +999,7 @@ fn update_swapcoins_with_preimages<T: SwapCoin>(
let redeemscript = coin.get_multisig_redeemscript();
if apply_preimage(&redeemscript, *preimage) {
log::info!(
"[{}] Applied preimage for {} swapcoin {:?}",
maker.config.network_port,
"Applied preimage for {} swapcoin {:?}",
if is_incoming { "incoming" } else { "outgoing" },
redeemscript
);
Expand Down
Loading