Skip to content

Commit 48cf7f9

Browse files
committed
refactore: remove some waiitng logic in proper helper and add even more helpers
1 parent e5ef344 commit 48cf7f9

File tree

1 file changed

+37
-63
lines changed
  • crates/optimism/rpc/src/eth

1 file changed

+37
-63
lines changed

crates/optimism/rpc/src/eth/mod.rs

Lines changed: 37 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
OpEthApiError, SequencerClient,
1414
};
1515
use alloy_consensus::BlockHeader;
16-
use alloy_primitives::U256;
16+
use alloy_primitives::{B256, U256};
1717
use eyre::WrapErr;
1818
use op_alloy_network::Optimism;
1919
pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
@@ -24,7 +24,7 @@ use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
2424
use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
2525
use reth_optimism_flashblocks::{
2626
BuildStateRx, ExecutionPayloadBaseV1, FlashBlockCompleteSequenceRx, FlashBlockService,
27-
PendingBlockRx, WsFlashBlockStream,
27+
PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
2828
};
2929
use reth_rpc::eth::{core::EthApiInner, DevSigner};
3030
use reth_rpc_eth_api::{
@@ -47,14 +47,10 @@ use std::{
4747
fmt::{self, Formatter},
4848
marker::PhantomData,
4949
sync::Arc,
50-
time::{Duration, Instant},
50+
time::Duration,
5151
};
5252
use tokio::sync::watch;
53-
use tracing::{debug, info};
54-
55-
/// Maximum flashblock index in a sequence before the cycle resets.
56-
/// After this index, we don't wait for new flashblocks as the sequence is ending.
57-
const MAX_FLASHBLOCK_INDEX: u64 = 9;
53+
use tracing::info;
5854

5955
/// Maximum duration to wait for a fresh flashblock when one is being built.
6056
const MAX_WAIT_DURATION: Duration = Duration::from_millis(20);
@@ -123,27 +119,46 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
123119
self.inner.flashblock_rx.as_ref().map(|rx| rx.resubscribe())
124120
}
125121

122+
/// Checks if a flashblock build is currently in progress.
123+
fn is_flashblock_building(&self) -> bool {
124+
self.inner.build_state_rx.as_ref().map(|rx| *rx.borrow()).unwrap_or(false)
125+
}
126+
127+
/// Extracts pending block if it matches the expected parent hash.
128+
fn extract_matching_block(
129+
&self,
130+
block: Option<&PendingFlashBlock<N::Primitives>>,
131+
parent_hash: B256,
132+
) -> Option<PendingBlock<N::Primitives>> {
133+
block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
134+
}
135+
126136
/// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
127137
pub const fn builder() -> OpEthApiBuilder<Rpc> {
128138
OpEthApiBuilder::new()
129139
}
130140

131-
/// Checks if we should wait for a fresher flashblock based on build state and current block
132-
/// freshness.
133-
pub fn check_in_progress_flashblock(&self, flashblock_index: u64) -> bool {
134-
// No build state tracking available
135-
let Some(build_state_rx) = self.inner.build_state_rx.as_ref() else {
136-
return false;
137-
};
141+
/// Awaits a fresh flashblock if one is being built, otherwise returns current.
142+
async fn flashblock(
143+
&self,
144+
parent_hash: B256,
145+
) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
146+
let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) };
147+
148+
if self.is_flashblock_building() {
149+
let mut rx_clone = rx.clone();
138150

139-
// No build currently in progress
140-
if !*build_state_rx.borrow() {
141-
return false;
151+
// Wait up to 20ms for a new flashblock to arrive
152+
if tokio::time::timeout(MAX_WAIT_DURATION, rx_clone.changed()).await.is_ok() {
153+
let fresh = rx_clone.borrow();
154+
if let Some(block) = self.extract_matching_block(fresh.as_ref(), parent_hash) {
155+
return Ok(Some(block));
156+
}
157+
}
142158
}
143159

144-
// It is not necessary to wait if it is the end of the 10-flashblock sequence,
145-
// ie. after 9 blocks.
146-
flashblock_index < MAX_FLASHBLOCK_INDEX
160+
// Fall back to current block
161+
Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
147162
}
148163

149164
/// Returns a [`PendingBlock`] that is built out of flashblocks.
@@ -162,48 +177,7 @@ impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
162177
PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
163178
};
164179

165-
let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) };
166-
167-
let (_expires_at, last_flashblock_index, _parent_hash, pending) = {
168-
let pending_block = rx.borrow();
169-
let Some(pending_block) = pending_block.as_ref() else { return Ok(None) };
170-
171-
let now = Instant::now();
172-
173-
// Is the pending block not expired and latest is its parent?
174-
if pending.evm_env.block_env.number != U256::from(pending_block.block().number()) ||
175-
parent.hash() != pending_block.block().parent_hash() ||
176-
now > pending_block.expires_at
177-
{
178-
return Ok(None);
179-
}
180-
181-
(
182-
pending_block.expires_at,
183-
pending_block.last_flashblock_index,
184-
pending_block.block().parent_hash(),
185-
pending_block.pending.clone(),
186-
)
187-
};
188-
189-
// Check if there is a current block in progress
190-
if self.check_in_progress_flashblock(last_flashblock_index) {
191-
debug!("Waiting for fresh flashblock");
192-
let mut rx_clone = rx.clone();
193-
194-
// Wait up to 20ms for a new flashblock to arrive
195-
if tokio::time::timeout(MAX_WAIT_DURATION, rx_clone.changed()).await.is_ok() {
196-
debug!("Got fresh flashblock within timeout!");
197-
let fresh = rx_clone.borrow();
198-
if let Some(fresh_block) = fresh.as_ref() &&
199-
fresh_block.block().parent_hash() == parent.hash()
200-
{
201-
return Ok(Some(fresh_block.pending.clone()));
202-
}
203-
}
204-
}
205-
206-
Ok(Some(pending))
180+
self.flashblock(parent.hash()).await
207181
}
208182
}
209183

0 commit comments

Comments
 (0)