Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 84 additions & 27 deletions swap/src/asb/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,33 @@ where
/// Drained once the computation resolves.
pending_quote_channels: Vec<(ResponseChannel<BidQuote>, PeerId)>,

/// Futures waiting for `send_wallet_snapshot.recv()` to resolve.
/// Each future yields the BTC amount and the responder to send the wallet snapshot back.
#[allow(clippy::type_complexity)]
inflight_wallet_snapshot_requests: FuturesUnordered<
BoxFuture<
'static,
Result<(
bitcoin::Amount,
bmrng::Responder<(WalletSnapshot, bitcoin::Amount, bool)>,
)>,
>,
>,

/// In-flight `capture_wallet_snapshot` computations.
/// Each future yields the BTC amount, responder, and the computed wallet snapshot.
#[allow(clippy::type_complexity)]
inflight_wallet_snapshots: FuturesUnordered<
BoxFuture<
'static,
Result<(
bitcoin::Amount,
bmrng::Responder<(WalletSnapshot, bitcoin::Amount, bool)>,
WalletSnapshot,
)>,
>,
>,

/// Channel for sending transfer proofs to Bobs. The sender is shared with every EventLoopHandle.
/// The receiver is polled by the event loop to send transfer proofs over the network to Bob.
///
Expand Down Expand Up @@ -188,6 +215,8 @@ where
inflight_encrypted_signatures: Default::default(),
inflight_quote_computation: Default::default(),
pending_quote_channels: Default::default(),
inflight_wallet_snapshot_requests: Default::default(),
inflight_wallet_snapshots: Default::default(),
outgoing_transfer_proofs_requests,
outgoing_transfer_proofs_sender,
service_requests,
Expand Down Expand Up @@ -215,6 +244,10 @@ where
.push(future::pending().boxed());
self.inflight_quote_computation
.push(future::pending().boxed());
self.inflight_wallet_snapshot_requests
.push(future::pending().boxed());
self.inflight_wallet_snapshots
.push(future::pending().boxed());

let swaps = match self.db.all().await {
Ok(swaps) => swaps,
Expand Down Expand Up @@ -264,33 +297,9 @@ where
swarm_event = self.swarm.select_next_some() => {
match swarm_event {
SwarmEvent::Behaviour(OutEvent::SwapSetupInitiated { mut send_wallet_snapshot }) => {
let (btc, responder) = match send_wallet_snapshot.recv().await {
Ok((btc, responder)) => (btc, responder),
Err(error) => {
tracing::error!("Swap request will be ignored because of a failure when requesting information for the wallet snapshot: {:#}", error);
continue;
}
};

// TODO: propagate error to the swap_setup routine instead of swallowing it
let (btc_amnesty_amount, should_publish_tx_withhold )= match apply_anti_spam_policy(btc, &self.refund_policy) {
Ok(amount) => amount,
Err(error) => {
tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error);
continue;
}
};

let wallet_snapshot = match capture_wallet_snapshot(self.bitcoin_wallet.clone(), &self.monero_wallet, &self.external_redeem_address, btc).await {
Ok(wallet_snapshot) => wallet_snapshot,
Err(error) => {
tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error);
continue;
}
};

// Ignore result, we should never hit this because the receiver will alive as long as the connection is.
let _ = responder.respond((wallet_snapshot, btc_amnesty_amount, should_publish_tx_withhold));
self.inflight_wallet_snapshot_requests.push(async move {
send_wallet_snapshot.recv().await.map_err(Into::into)
}.boxed());
}
SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted{peer_id, swap_id, state3}) => {
if let Err(error) = self.handle_execution_setup_done(peer_id, swap_id, state3).await {
Expand Down Expand Up @@ -513,6 +522,54 @@ where
}
}
},

// Swap setup routine:
// 1. We receive a `SwapSetupInitiated` event with a `send_wallet_snapshot` receiver
// 2. We push a future to `inflight_wallet_snapshot_requests` that resolves when the swap setup handler knows the BTC amount
// and requests the wallet snapshot and amnesty amount to continue with the swap setup
// 3. We push a future to `inflight_wallet_snapshots` that resolves when the wallet snapshot is computed
// 4. We wait for the wallet snapshot future to resolve, compute the amnesty amount, and respond to the swap setup handler
Some(result) = self.inflight_wallet_snapshot_requests.next() => {
let (btc, responder) = match result {
Ok((btc, responder)) => (btc, responder),
Err(error) => {
tracing::error!("Swap request will be ignored because of a failure when requesting information for the wallet snapshot: {:#}", error);
continue;
}
};

let bitcoin_wallet = self.bitcoin_wallet.clone();
let monero_wallet = self.monero_wallet.clone();
let external_redeem_address = self.external_redeem_address.clone();

self.inflight_wallet_snapshots.push(async move {
let wallet_snapshot = capture_wallet_snapshot(bitcoin_wallet, &monero_wallet, &external_redeem_address, btc).await?;
Ok((btc, responder, wallet_snapshot))
}.boxed());
},
Some(result) = self.inflight_wallet_snapshots.next() => {
let (btc, responder, wallet_snapshot) = match result {
Ok((btc, responder, wallet_snapshot)) => (btc, responder, wallet_snapshot),
Err(error) => {
// TODO: Propagate error to the swap_setup handler instead of swallowing it
tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error);
continue;
}
};

let (btc_amnesty_amount, should_publish_tx_withhold) = match apply_anti_spam_policy(btc, &self.refund_policy) {
Ok(amount) => amount,
Err(error) => {
// TODO: Propagate error to the swap_setup handler instead of swallowing it
tracing::error!("Swap request will be ignored because we were unable to compute the amnesty amount for the swap: {:#}", error);
continue;
}
};

if responder.respond((wallet_snapshot, btc_amnesty_amount, should_publish_tx_withhold)).is_err() {
tracing::warn!("Failed to send wallet snapshot and amnesty amount back to swap setup handler, connection may have been dropped");
}
},
Some(request) = self.service_requests.recv() => {
match request {
EventLoopRequest::GetMultiaddresses { respond_to } => {
Expand Down
Loading