Skip to content

Commit c9d1b41

Browse files
committed
feat(example): use changeset staging with rpc polling example
1 parent ded0bfe commit c9d1b41

File tree

1 file changed

+15
-9
lines changed
  • example-crates/example_bitcoind_rpc_polling/src

1 file changed

+15
-9
lines changed

example-crates/example_bitcoind_rpc_polling/src/main.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ use bdk_bitcoind_rpc::{
1111
bitcoincore_rpc::{Auth, Client, RpcApi},
1212
Emitter,
1313
};
14-
use bdk_chain::persist::PersistBackend;
14+
use bdk_chain::persist::{PersistBackend, StageExt};
1515
use bdk_chain::{
1616
bitcoin::{constants::genesis_block, Block, Transaction},
1717
indexed_tx_graph, keychain,
1818
local_chain::{self, LocalChain},
19-
ConfirmationTimeHeightAnchor, IndexedTxGraph,
19+
Append, ConfirmationTimeHeightAnchor, IndexedTxGraph,
2020
};
2121
use example_cli::{
2222
anyhow,
@@ -176,6 +176,7 @@ fn main() -> anyhow::Result<()> {
176176
let chain_tip = chain.lock().unwrap().tip();
177177
let rpc_client = rpc_args.new_client()?;
178178
let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height);
179+
let mut db_stage = ChangeSet::default();
179180

180181
let mut last_db_commit = Instant::now();
181182
let mut last_print = Instant::now();
@@ -185,17 +186,18 @@ fn main() -> anyhow::Result<()> {
185186

186187
let mut chain = chain.lock().unwrap();
187188
let mut graph = graph.lock().unwrap();
188-
let mut db = db.lock().unwrap();
189189

190190
let chain_changeset = chain
191191
.apply_update(emission.checkpoint)
192192
.expect("must always apply as we receive blocks in order from emitter");
193193
let graph_changeset = graph.apply_block_relevant(&emission.block, height);
194-
db.write_changes(&(chain_changeset, graph_changeset))?;
194+
db_stage.append((chain_changeset, graph_changeset));
195195

196196
// commit staged db changes in intervals
197197
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
198+
let db = &mut *db.lock().unwrap();
198199
last_db_commit = Instant::now();
200+
db_stage.commit_to(db)?;
199201
println!(
200202
"[{:>10}s] committed to db (took {}s)",
201203
start.elapsed().as_secs_f32(),
@@ -230,8 +232,11 @@ fn main() -> anyhow::Result<()> {
230232
mempool_txs.iter().map(|(tx, time)| (tx, *time)),
231233
);
232234
{
233-
let mut db = db.lock().unwrap();
234-
db.write_changes(&(local_chain::ChangeSet::default(), graph_changeset))?;
235+
let db = &mut *db.lock().unwrap();
236+
db_stage.append_and_commit_to(
237+
(local_chain::ChangeSet::default(), graph_changeset),
238+
db,
239+
)?;
235240
}
236241
}
237242
RpcCommands::Live { rpc_args } => {
@@ -287,9 +292,9 @@ fn main() -> anyhow::Result<()> {
287292
let mut tip_height = 0_u32;
288293
let mut last_db_commit = Instant::now();
289294
let mut last_print = Option::<Instant>::None;
295+
let mut db_stage = ChangeSet::default();
290296

291297
for emission in rx {
292-
let mut db = db.lock().unwrap();
293298
let mut graph = graph.lock().unwrap();
294299
let mut chain = chain.lock().unwrap();
295300

@@ -314,11 +319,12 @@ fn main() -> anyhow::Result<()> {
314319
continue;
315320
}
316321
};
317-
318-
db.write_changes(&changeset)?;
322+
db_stage.append(changeset);
319323

320324
if last_db_commit.elapsed() >= DB_COMMIT_DELAY {
325+
let db = &mut *db.lock().unwrap();
321326
last_db_commit = Instant::now();
327+
db_stage.commit_to(db)?;
322328
println!(
323329
"[{:>10}s] committed to db (took {}s)",
324330
start.elapsed().as_secs_f32(),

0 commit comments

Comments
 (0)