Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 124 additions & 116 deletions backend/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,8 @@ impl WalletSyncService {
let is_confirmed = tx_item.chain_position.is_confirmed();

// Preserve existing timestamp if transaction already exists, otherwise use current time
let first_seen_at = existing_transactions
.iter()
.find(|tx| tx.txid == txid)
let first_seen_at = existing_tx_map
.get(&txid)
.map(|tx| tx.first_seen_at)
.unwrap_or_else(|| {
std::time::SystemTime::now()
Expand Down Expand Up @@ -490,7 +489,7 @@ impl WalletSyncService {
) = tx_data;

// Check if this is an existing transaction
let existing_tx = existing_transactions.iter().find(|tx| tx.txid == *txid);
let existing_tx = existing_tx_map.get(txid).copied();

if let Some(existing) = existing_tx {
// Existing transaction - check if it's transitioning from pending to confirmed
Expand Down Expand Up @@ -568,19 +567,20 @@ impl WalletSyncService {
}

// Get ALL transactions (including non-canonical/conflicted ones) for RBF detection
let all_txs_from_bdk: Vec<String> = wallet
.tx_graph()
.full_txs()
.map(|tx| tx.txid.to_string())
.collect();
let canonical_txids: Vec<String> = canonical_transactions_data
let canonical_txids: std::collections::HashSet<Txid> = canonical_transactions_data
.iter()
.map(|(txid, _, _, _, _, _)| txid.clone())
.filter_map(|(txid, _, _, _, _, _)| {
Txid::from_str(txid)
.inspect_err(|e| warn!("[{}] Failed to parse canonical txid {}: {}", wallet_checksum, txid, e))
.ok()
})
.collect();

// Find transactions that exist in full graph but NOT in canonical set (these are conflicted/replaced)
let conflicted_txids: Vec<String> = all_txs_from_bdk
.into_iter()
let conflicted_txids: Vec<Txid> = wallet
.tx_graph()
.full_txs()
.map(|tx| tx.txid)
.filter(|txid| !canonical_txids.contains(txid))
.collect();

Expand Down Expand Up @@ -723,16 +723,20 @@ impl WalletSyncService {
);

// Get all transactions from BDK with full details for input comparison
let all_bdk_txs: Vec<_> = wallet.tx_graph().full_txs().collect();
let bdk_tx_map: std::collections::HashMap<Txid, _> = wallet
.tx_graph()
.full_txs()
.map(|tx| (tx.txid, tx))
.collect();

for conflicted_txid in &conflicted_txids {
let conflicted_txid_str = conflicted_txid.to_string();
// Check if this conflicted transaction is in our pending transactions using in-memory lookup
if let Some(pending_tx) = existing_tx_map.get(conflicted_txid) {
if let Some(pending_tx) = existing_tx_map.get(&conflicted_txid_str) {
if pending_tx.transaction_status == "pending" {
// Find the conflicted transaction's inputs
let conflicted_tx_inputs: Option<Vec<_>> = all_bdk_txs
.iter()
.find(|tx| tx.txid.to_string() == *conflicted_txid)
let conflicted_tx_inputs: Option<Vec<_>> = bdk_tx_map
.get(conflicted_txid)
.map(|tx| {
tx.tx
.input
Expand All @@ -754,9 +758,10 @@ impl WalletSyncService {
}

// Check if this canonical transaction shares any inputs with the conflicted one
if let Some(canonical_tx) = all_bdk_txs
.iter()
.find(|tx| tx.txid.to_string() == *canonical_txid)
if let Some(canonical_tx) = Txid::from_str(canonical_txid)
.inspect_err(|e| warn!("[{}] Failed to parse canonical txid {}: {}", wallet_checksum, canonical_txid, e))
.ok()
.and_then(|txid| bdk_tx_map.get(&txid))
{
let canonical_inputs: Vec<_> = canonical_tx
.tx
Expand Down Expand Up @@ -793,7 +798,7 @@ impl WalletSyncService {
.metadata_db
.mark_transaction_replaced(
wallet_checksum,
conflicted_txid,
&conflicted_txid_str,
canonical_txid,
)
.await?;
Expand Down Expand Up @@ -1138,12 +1143,20 @@ impl WalletSyncService {
}

// Get all BDK transactions with full details
let all_bdk_txs: Vec<_> = wallet.tx_graph().full_txs().collect();
let bdk_tx_map: std::collections::HashMap<Txid, _> = wallet
.tx_graph()
.full_txs()
.map(|tx| (tx.txid, tx))
.collect();

// Build a map of unconfirmed transaction outputs: (txid, vout) -> txid
let mut unconfirmed_outputs: HashMap<(String, u32), String> = HashMap::new();
for (txid, _, _, _, _, _) in &unconfirmed_txs {
if let Some(bdk_tx) = all_bdk_txs.iter().find(|tx| tx.txid.to_string() == *txid) {
if let Some(bdk_tx) = Txid::from_str(txid)
.inspect_err(|e| warn!("[{}] Failed to parse txid {}: {}", wallet_checksum, txid, e))
.ok()
.and_then(|t| bdk_tx_map.get(&t))
{
for (vout, _) in bdk_tx.tx.output.iter().enumerate() {
unconfirmed_outputs.insert((txid.clone(), vout as u32), txid.clone());
}
Expand All @@ -1152,9 +1165,10 @@ impl WalletSyncService {

// Check each unconfirmed transaction to see if it spends from another unconfirmed transaction
for (child_txid, _, _, _, _, _) in &unconfirmed_txs {
if let Some(bdk_tx) = all_bdk_txs
.iter()
.find(|tx| tx.txid.to_string() == *child_txid)
if let Some(bdk_tx) = Txid::from_str(child_txid)
.inspect_err(|e| warn!("[{}] Failed to parse child txid {}: {}", wallet_checksum, child_txid, e))
.ok()
.and_then(|t| bdk_tx_map.get(&t))
{
// Check each input of this transaction
for input in &bdk_tx.tx.input {
Expand Down Expand Up @@ -1538,9 +1552,9 @@ impl WalletSyncService {
.metadata_db
.get_transactions_by_wallet_checksum(wallet_checksum, None)
.await?;
let existing_txids: std::collections::HashSet<String> = existing_transactions
let existing_tx_map: std::collections::HashMap<&str, &_> = existing_transactions
.iter()
.map(|tx| tx.txid.clone())
.map(|tx| (tx.txid.as_str(), tx))
.collect();

let mut has_changes = false;
Expand All @@ -1549,51 +1563,48 @@ impl WalletSyncService {
for hist_entry in &history {
let txid_str = hist_entry.tx_hash.to_string();

if existing_txids.contains(&txid_str) {
if let Some(existing) = existing_tx_map.get(txid_str.as_str()) {
// Check if an existing pending transaction got confirmed
if is_tx_confirmed(hist_entry.height, &txid_str) {
if let Some(_existing) = existing_transactions
.iter()
.find(|tx| tx.txid == txid_str && tx.block_height.is_none())
{
// Transaction just confirmed
let confirmed_at =
match client.get_block_header(hist_entry.height as u32).await {
Ok(header) => header.timestamp,
Err(_) if hist_entry.height == 0 => GENESIS_BLOCK_TIMESTAMP,
Err(_) => std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

self.metadata_db
.update_transaction_confirmation(
wallet_checksum,
&txid_str,
hist_entry.height as u32,
confirmed_at,
)
.await?;
if is_tx_confirmed(hist_entry.height, &txid_str)
&& existing.block_height.is_none()
{
// Transaction just confirmed
let confirmed_at =
match client.get_block_header(hist_entry.height as u32).await {
Ok(header) => header.timestamp,
Err(_) if hist_entry.height == 0 => GENESIS_BLOCK_TIMESTAMP,
Err(_) => std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

self.metadata_db
.update_transaction_confirmation(
wallet_checksum,
&txid_str,
hist_entry.height as u32,
confirmed_at,
)
.await?;

// Send confirmation notification
if !suppress_notifications {
if let Some(updated_tx) = self
.metadata_db
.get_transaction_by_txid(wallet_checksum, &txid_str)
.await?
{
self.send_confirmed_transaction_notification(&updated_tx)
.await?;
}
// Send confirmation notification
if !suppress_notifications {
if let Some(updated_tx) = self
.metadata_db
.get_transaction_by_txid(wallet_checksum, &txid_str)
.await?
{
self.send_confirmed_transaction_notification(&updated_tx)
.await?;
}

has_changes = true;
debug!(
"[{}] Address watch tx confirmed: {} at height {}",
wallet_checksum, txid_str, hist_entry.height
);
}

has_changes = true;
debug!(
"[{}] Address watch tx confirmed: {} at height {}",
wallet_checksum, txid_str, hist_entry.height
);
}
continue;
}
Expand Down Expand Up @@ -1825,66 +1836,63 @@ impl WalletSyncService {
.metadata_db
.get_transactions_by_wallet_checksum(wallet_checksum, None)
.await?;
let existing_txids: std::collections::HashSet<String> = existing_transactions
let existing_tx_map: std::collections::HashMap<&str, &_> = existing_transactions
.iter()
.map(|tx| tx.txid.clone())
.map(|tx| (tx.txid.as_str(), tx))
.collect();

let mut has_changes = false;

for hist_entry in &history {
let txid_str = hist_entry.tx_hash.to_string();

if existing_txids.contains(&txid_str) {
if let Some(existing) = existing_tx_map.get(txid_str.as_str()) {
// Check if an existing pending transaction got confirmed
if is_tx_confirmed(hist_entry.height, &txid_str) {
if let Some(_existing) = existing_transactions
.iter()
.find(|tx| tx.txid == txid_str && tx.block_height.is_none())
if is_tx_confirmed(hist_entry.height, &txid_str)
&& existing.block_height.is_none()
{
let confirmed_at = match block_header_cache
.get(&(hist_entry.height as u32))
{
let confirmed_at = match block_header_cache
.get(&(hist_entry.height as u32))
{
Some(&ts) => ts,
None => {
let ts = match client
.get_block_header(hist_entry.height as u32)
.await
{
Ok(header) => header.timestamp,
Err(_) if hist_entry.height == 0 => GENESIS_BLOCK_TIMESTAMP,
Err(_) => std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
block_header_cache.insert(hist_entry.height as u32, ts);
ts
}
};

self.metadata_db
.update_transaction_confirmation(
wallet_checksum,
&txid_str,
hist_entry.height as u32,
confirmed_at,
)
.await?;

if !suppress_notifications {
if let Some(updated_tx) = self
.metadata_db
.get_transaction_by_txid(wallet_checksum, &txid_str)
.await?
Some(&ts) => ts,
None => {
let ts = match client
.get_block_header(hist_entry.height as u32)
.await
{
self.send_confirmed_transaction_notification(&updated_tx)
.await?;
}
Ok(header) => header.timestamp,
Err(_) if hist_entry.height == 0 => GENESIS_BLOCK_TIMESTAMP,
Err(_) => std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
block_header_cache.insert(hist_entry.height as u32, ts);
ts
}
};

self.metadata_db
.update_transaction_confirmation(
wallet_checksum,
&txid_str,
hist_entry.height as u32,
confirmed_at,
)
.await?;

has_changes = true;
if !suppress_notifications {
if let Some(updated_tx) = self
.metadata_db
.get_transaction_by_txid(wallet_checksum, &txid_str)
.await?
{
self.send_confirmed_transaction_notification(&updated_tx)
.await?;
}
}

has_changes = true;
}
continue;
}
Expand Down
Loading