Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,15 @@ use super::{client::Client, config::NodeConfig, node::Node};
#[cfg(feature = "rusqlite")]
use crate::db::error::SqlInitializationError;
#[cfg(feature = "rusqlite")]
use crate::db::sqlite::{headers::SqliteHeaderDb, peers::SqlitePeerDb};
use crate::db::sqlite::peers::SqlitePeerDb;
use crate::network::dns::{DnsResolver, DNS_RESOLVER_PORT};
use crate::network::ConnectionType;
use crate::{
chain::checkpoints::HeaderCheckpoint,
db::traits::{HeaderStore, PeerStore},
};
use crate::{chain::checkpoints::HeaderCheckpoint, db::traits::PeerStore};
use crate::{LogLevel, PeerStoreSizeConfig, TrustedPeer};

#[cfg(feature = "rusqlite")]
/// The default node returned from the [`NodeBuilder`].
pub type NodeDefault = Node<SqliteHeaderDb, SqlitePeerDb>;
pub type NodeDefault = Node<SqlitePeerDb>;

const MIN_PEERS: u8 = 1;
const MAX_PEERS: u8 = 15;
Expand Down Expand Up @@ -219,26 +216,18 @@ impl NodeBuilder {
#[cfg(feature = "rusqlite")]
pub fn build(&mut self) -> Result<(NodeDefault, Client), SqlInitializationError> {
let peer_store = SqlitePeerDb::new(self.network, self.config.data_path.clone())?;
let header_store = SqliteHeaderDb::new(self.network, self.config.data_path.clone())?;
Ok(Node::new(
self.network,
core::mem::take(&mut self.config),
peer_store,
header_store,
))
}

/// Consume the node builder by using custom database implementations, receiving a [`Node`] and [`Client`].
pub fn build_with_databases<H: HeaderStore + 'static, P: PeerStore + 'static>(
pub fn build_with_databases<P: PeerStore + 'static>(
&mut self,
peer_store: P,
header_store: H,
) -> (Node<H, P>, Client) {
Node::new(
self.network,
core::mem::take(&mut self.config),
peer_store,
header_store,
)
) -> (Node<P>, Client) {
Node::new(self.network, core::mem::take(&mut self.config), peer_store)
}
}
136 changes: 4 additions & 132 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
extern crate alloc;
use std::{
collections::{BTreeMap, HashSet},
ops::Range,
sync::Arc,
};

Expand All @@ -18,37 +17,33 @@ use super::{
error::{CFHeaderSyncError, CFilterSyncError, HeaderSyncError},
graph::{AcceptHeaderChanges, BlockTree, HeaderRejection},
CFHeaderChanges, Filter, FilterCheck, FilterHeaderRequest, FilterRequest, FilterRequestState,
HeaderChainChanges, HeightExt, HeightMonitor, PeerId,
HeaderChainChanges, HeightMonitor, PeerId,
};
#[cfg(feature = "filter-control")]
use crate::IndexedFilter;
use crate::{
chain::header_batch::HeadersBatch,
db::{traits::HeaderStore, BlockHeaderChanges},
dialog::Dialog,
error::HeaderPersistenceError,
messages::{Event, Warning},
Info, Progress,
};

const REORG_LOOKBACK: u32 = 7;
const FILTER_BASIC: u8 = 0x00;
const CF_HEADER_BATCH_SIZE: u32 = 1_999;
const FILTER_BATCH_SIZE: u32 = 999;

#[derive(Debug)]
pub(crate) struct Chain<H: HeaderStore> {
pub(crate) struct Chain {
pub(crate) header_chain: BlockTree,
request_state: FilterRequestState,
checkpoints: HeaderCheckpoints,
network: Network,
db: Arc<Mutex<H>>,
heights: Arc<Mutex<HeightMonitor>>,
scripts: HashSet<ScriptBuf>,
dialog: Arc<Dialog>,
}

impl<H: HeaderStore> Chain<H> {
impl Chain {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
network: Network,
Expand All @@ -57,7 +52,6 @@ impl<H: HeaderStore> Chain<H> {
checkpoints: HeaderCheckpoints,
dialog: Arc<Dialog>,
height_monitor: Arc<Mutex<HeightMonitor>>,
db: H,
quorum_required: u8,
) -> Self {
let header_chain = BlockTree::new(anchor, network);
Expand All @@ -66,7 +60,6 @@ impl<H: HeaderStore> Chain<H> {
checkpoints,
request_state: FilterRequestState::new(quorum_required),
network,
db: Arc::new(Mutex::new(db)),
heights: height_monitor,
scripts,
dialog,
Expand All @@ -93,76 +86,6 @@ impl<H: HeaderStore> Chain<H> {
}
}

// Load in headers, ideally allowing the difficulty adjustment to be audited and
// reorganizations to be handled gracefully.
pub(crate) async fn load_headers(&mut self) -> Result<(), HeaderPersistenceError<H::Error>> {
let mut db = self.db.lock().await;
// The original height the user requested a scan after
let scan_height = self.header_chain.height();
// The header relevant to compute the next adjustment
let last_adjustment = scan_height.last_epoch_start(self.network);
// Seven blocks ago
let reorg = scan_height.saturating_sub(REORG_LOOKBACK);
// To handle adjustments and reorgs, we would have the minimum of each of these heights
let min_interesting_height = last_adjustment.min(reorg);
let max_interesting_height = last_adjustment.max(reorg);
// Get the maximum of the two interesting heights. In case the minimum is not available
if let Some(header) = db.header_at(max_interesting_height).await.ok().flatten() {
self.header_chain =
BlockTree::from_header(max_interesting_height, header, self.network);
}
// If this succeeds, both reorgs and difficulty adjustments can be handled gracefully.
if let Some(header) = db.header_at(min_interesting_height).await.ok().flatten() {
self.header_chain =
BlockTree::from_header(min_interesting_height, header, self.network);
}
// Now that the block tree is updated to the appropriate start, load in the rest of
// the history from this point onward. This is either: from the user start height,
// from the last difficulty adjustment, or seven blocks ago, depending on what the
// header store was able to provide.
let loaded_headers = db
.load(self.header_chain.height().increment()..)
.await
.map_err(HeaderPersistenceError::Database)?;
for (height, header) in loaded_headers {
let apply_header_changes = self.header_chain.accept_header(header);
match apply_header_changes {
AcceptHeaderChanges::Accepted { connected_at } => {
if height.ne(&connected_at.height) {
self.dialog.send_warning(Warning::CorruptedHeaders);
return Err(HeaderPersistenceError::HeadersDoNotLink);
}
if let Some(checkpoint) = self.checkpoints.next() {
if connected_at.header.block_hash().eq(&checkpoint.hash) {
self.checkpoints.advance()
}
}
}
AcceptHeaderChanges::Rejected(reject_reason) => match reject_reason {
HeaderRejection::UnknownPrevHash(_) => {
return Err(HeaderPersistenceError::CannotLocateHistory);
}
HeaderRejection::InvalidPow { expected, got } => {
crate::log!(
self.dialog,
format!(
"Unexpected invalid proof of work when importing a block header. expected {}, got: {}",
expected.to_consensus(),
got.to_consensus()
)
);
}
},
_ => (),
}
}
// Because the user requested a scan after the `scan_height`, the filters below this point
// may be assumed as checked. Note that in a reorg, filters below this height may still be
// retrieved, as this only considers the canonical chain as checked.
self.header_chain.assume_checked_to(scan_height);
Ok(())
}

// Sync the chain with headers from a peer, adjusting to reorgs if needed
pub(crate) async fn sync_chain(
&mut self,
Expand All @@ -176,7 +99,6 @@ impl<H: HeaderStore> Chain<H> {
// We check first if the peer is sending us nonsense
self.sanity_check(&header_batch)?;
let next_checkpoint = self.checkpoints.next().copied();
let mut db = self.db.lock().await;
let mut reorged_hashes = None;
let mut fork_added = None;
for header in header_batch.into_iter() {
Expand All @@ -191,7 +113,6 @@ impl<H: HeaderStore> Chain<H> {
connected_at.header.block_hash()
)
);
db.stage(BlockHeaderChanges::Connected(connected_at));
if let Some(checkpoint) = next_checkpoint {
if connected_at.height.eq(&checkpoint.height) {
if connected_at.header.block_hash().eq(&checkpoint.hash) {
Expand Down Expand Up @@ -238,10 +159,6 @@ impl<H: HeaderStore> Chain<H> {
.map(|index| index.header.block_hash())
.collect();
reorged_hashes = Some(removed_hashes);
db.stage(BlockHeaderChanges::Reorganized {
accepted: accepted.clone(),
reorganized: disconnected.clone(),
});
let disconnected_event = Event::BlocksDisconnected {
accepted,
disconnected,
Expand All @@ -260,12 +177,6 @@ impl<H: HeaderStore> Chain<H> {
},
}
}
if let Err(e) = db.write().await {
self.dialog.send_warning(Warning::FailedPersistence {
warning: format!("Could not save headers to disk: {e}"),
});
}
drop(db);
match reorged_hashes {
Some(reorgs) => {
self.clear_compact_filter_queue();
Expand Down Expand Up @@ -530,44 +441,6 @@ impl<H: HeaderStore> Chain<H> {
self.scripts.insert(script);
}

// Fetch a header from the cache or disk.
pub(crate) async fn fetch_header(
&mut self,
height: u32,
) -> Result<Option<Header>, HeaderPersistenceError<H::Error>> {
match self.header_chain.header_at_height(height) {
Some(header) => Ok(Some(header)),
None => {
let mut db = self.db.lock().await;
let header_opt = db.header_at(height).await;
if header_opt.is_err() {
self.dialog
.send_warning(Warning::FailedPersistence {
warning: format!(
"Unexpected error fetching a header from the header store at height {height}"
),
});
}
header_opt.map_err(HeaderPersistenceError::Database)
}
}
}

pub(crate) async fn fetch_header_range(
&self,
range: Range<u32>,
) -> Result<BTreeMap<u32, Header>, HeaderPersistenceError<H::Error>> {
let mut db = self.db.lock().await;
let range_opt = db.load(range).await;
if range_opt.is_err() {
self.dialog.send_warning(Warning::FailedPersistence {
warning: "Unexpected error fetching a range of headers from the header store"
.to_string(),
});
}
range_opt.map_err(HeaderPersistenceError::Database)
}

// Reset the compact filter queue because we received a new block
pub(crate) fn clear_compact_filter_queue(&mut self) {
self.request_state.agreement_state.reset_agreements();
Expand Down Expand Up @@ -638,7 +511,7 @@ mod tests {
anchor: HeaderCheckpoint,
height_monitor: Arc<Mutex<HeightMonitor>>,
peers: u8,
) -> Chain<()> {
) -> Chain {
let (log_tx, _) = tokio::sync::mpsc::channel::<String>(1);
let (info_tx, _) = tokio::sync::mpsc::channel::<Info>(1);
let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::<Warning>();
Expand All @@ -658,7 +531,6 @@ mod tests {
event_tx,
)),
height_monitor,
(),
peers,
)
}
Expand Down
21 changes: 0 additions & 21 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,6 @@ trait HeightExt: Clone + Copy + std::hash::Hash + PartialEq + Eq + PartialOrd +
fn from_u64_checked(height: u64) -> Option<Self>;

fn is_adjustment_multiple(&self, params: impl AsRef<Params>) -> bool;

fn last_epoch_start(&self, params: impl AsRef<Params>) -> Self;
}

impl HeightExt for u32 {
Expand All @@ -255,12 +253,6 @@ impl HeightExt for u32 {
fn from_u64_checked(height: u64) -> Option<Self> {
height.try_into().ok()
}

fn last_epoch_start(&self, params: impl AsRef<Params>) -> Self {
let diff_adjustment_interval = params.as_ref().difficulty_adjustment_interval() as u32;
let floor = self / diff_adjustment_interval;
floor * diff_adjustment_interval
}
}

// Emulation of `GetBlockSubsidy` in Bitcoin Core: https://github.com/bitcoin/bitcoin/blob/master/src/validation.cpp#L1944
Expand All @@ -277,7 +269,6 @@ pub(crate) fn block_subsidy(height: u32) -> Amount {
#[cfg(test)]
mod tests {
use super::*;
use bitcoin::Network;

#[test]
fn test_height_monitor() {
Expand Down Expand Up @@ -306,18 +297,6 @@ mod tests {
assert!(height_monitor.max().unwrap().eq(&12));
}

#[test]
fn test_height_ext() {
assert!(2016.is_adjustment_multiple(Network::Bitcoin));
assert!(4032.is_adjustment_multiple(Network::Bitcoin));
let height = 2300;
assert_eq!(height.last_epoch_start(Network::Bitcoin), 2016);
let height = 4033;
assert_eq!(height.last_epoch_start(Network::Bitcoin), 4032);
let height = 4032;
assert_eq!(height.last_epoch_start(Network::Bitcoin), 4032);
}

#[test]
fn test_subsidy_calculation() {
let first_subsidy = block_subsidy(2);
Expand Down
Loading
Loading