Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 58 additions & 24 deletions swap/src/asb/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ where
/// Drained once the computation resolves.
pending_quote_channels: Vec<(ResponseChannel<BidQuote>, PeerId)>,

/// In-flight wallet snapshot computations for swap setup.
/// Each future waits for the swap setup handler to request a wallet snapshot,
/// then computes it concurrently.
#[allow(clippy::type_complexity)]
inflight_wallet_snapshots: FuturesUnordered<
BoxFuture<
'static,
Result<(
bitcoin::Amount,
bmrng::Responder<(WalletSnapshot, bitcoin::Amount, bool)>,
WalletSnapshot,
)>,
>,
>,

/// Channel for sending transfer proofs to Bobs. The sender is shared with every EventLoopHandle.
/// The receiver is polled by the event loop to send transfer proofs over the network to Bob.
///
Expand Down Expand Up @@ -188,6 +203,7 @@ where
inflight_encrypted_signatures: Default::default(),
inflight_quote_computation: Default::default(),
pending_quote_channels: Default::default(),
inflight_wallet_snapshots: Default::default(),
outgoing_transfer_proofs_requests,
outgoing_transfer_proofs_sender,
service_requests,
Expand Down Expand Up @@ -215,6 +231,8 @@ where
.push(future::pending().boxed());
self.inflight_quote_computation
.push(future::pending().boxed());
self.inflight_wallet_snapshots
.push(future::pending().boxed());

let swaps = match self.db.all().await {
Ok(swaps) => swaps,
Expand Down Expand Up @@ -264,33 +282,20 @@ where
swarm_event = self.swarm.select_next_some() => {
match swarm_event {
SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { mut send_wallet_snapshot }) => {
let (btc, responder) = match send_wallet_snapshot.recv().await {
Ok((btc, responder)) => (btc, responder),
Err(error) => {
tracing::error!("Swap request will be ignored because of a failure when requesting information for the wallet snapshot: {:#}", error);
continue;
}
};
let bitcoin_wallet = self.bitcoin_wallet.clone();
let monero_wallet = self.monero_wallet.clone();
let external_redeem_address = self.external_redeem_address.clone();

// TODO: propagate error to the swap_setup routine instead of swallowing it
let (btc_amnesty_amount, should_publish_tx_withhold )= match apply_anti_spam_policy(btc, &self.refund_policy) {
Ok(amount) => amount,
Err(error) => {
tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error);
continue;
}
};
self.inflight_wallet_snapshots.push(async move {
// Wait for the swap setup handler to request the wallet snapshot
let (btc, responder) = send_wallet_snapshot.recv().await?;

let wallet_snapshot = match capture_wallet_snapshot(self.bitcoin_wallet.clone(), &self.monero_wallet, &self.external_redeem_address, btc).await {
Ok(wallet_snapshot) => wallet_snapshot,
Err(error) => {
tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error);
continue;
}
};
// Compute the wallet snapshot
let wallet_snapshot = capture_wallet_snapshot(bitcoin_wallet, &monero_wallet, &external_redeem_address, btc).await?;

// Ignore result, we should never hit this because the receiver will alive as long as the connection is.
let _ = responder.respond((wallet_snapshot, btc_amnesty_amount, should_publish_tx_withhold));
// This is used further down to then actually respond to the swap setup handler
Ok((btc, responder, wallet_snapshot))
}.boxed());
}
SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted{peer_id, swap_id, state3}) => {
if let Err(error) = self.handle_execution_setup_done(peer_id, swap_id, state3).await {
Expand Down Expand Up @@ -513,6 +518,35 @@ where
}
}
},

// Swap setup routine:
// 1. We receive a `SwapSetupInitiated` event with a `send_wallet_snapshot` receiver
// 2. We push a future to `inflight_wallet_snapshots` that waits for the swap setup handler to
// request the wallet snapshot (with the BTC amount), then computes it
// 3. Once the future resolves, we compute the amnesty amount and respond to the swap setup handler
Some(result) = self.inflight_wallet_snapshots.next() => {
let (btc, responder, wallet_snapshot) = match result {
Ok((btc, responder, wallet_snapshot)) => (btc, responder, wallet_snapshot),
Err(error) => {
// TODO: Propagate error to the swap_setup handler instead of swallowing it
tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error);
continue;
}
};

let (btc_amnesty_amount, should_publish_tx_withhold) = match apply_anti_spam_policy(btc, &self.refund_policy) {
Ok(amount) => amount,
Err(error) => {
// TODO: Propagate error to the swap_setup handler instead of swallowing it
tracing::error!("Swap request will be ignored because we were unable to compute the amnesty amount for the swap: {:#}", error);
continue;
}
};

if responder.respond((wallet_snapshot, btc_amnesty_amount, should_publish_tx_withhold)).is_err() {
tracing::warn!("Failed to send wallet snapshot and amnesty amount back to swap setup handler, connection may have been dropped");
}
},
Some(request) = self.service_requests.recv() => {
match request {
EventLoopRequest::GetMultiaddresses { respond_to } => {
Expand Down
Loading