Skip to content

Commit 8146e2a

Browse files
committed
refactor(notifier): apply CR changes
1 parent 7155676 commit 8146e2a

File tree

8 files changed

+67
-149
lines changed

8 files changed

+67
-149
lines changed

core/consensus/src/consensus.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ use lightning_interfaces::signer::{SignerInterface, SubmitTxSocket};
1515
use lightning_interfaces::types::{Epoch, EpochInfo, UpdateMethod};
1616
use lightning_interfaces::{
1717
ApplicationInterface,
18-
BlockNotifierEmitter,
1918
BroadcastInterface,
20-
EpochNotifierEmitter,
19+
Emitter,
2120
IndexSocket,
2221
NotifierInterface,
2322
PubSub,
@@ -55,8 +54,7 @@ pub struct Consensus<C: Collection> {
5554
EpochState<
5655
c![C::ApplicationInterface::SyncExecutor],
5756
c![C::BroadcastInterface::PubSub<PubSubMsg>],
58-
c![C::NotifierInterface::EpochEmitter],
59-
c![C::NotifierInterface::BlockEmitter],
57+
c![C::NotifierInterface::Emitter],
6058
>,
6159
>,
6260
>,
@@ -74,12 +72,7 @@ pub struct Consensus<C: Collection> {
7472
}
7573

7674
/// This struct contains mutable state only for the current epoch.
77-
struct EpochState<
78-
Q: SyncQueryRunnerInterface,
79-
P: PubSub<PubSubMsg> + 'static,
80-
EN: EpochNotifierEmitter,
81-
BN: BlockNotifierEmitter,
82-
> {
75+
struct EpochState<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter> {
8376
/// The node public key of the node.
8477
node_public_key: NodePublicKey,
8578
/// The consensus public key of the node.
@@ -93,7 +86,7 @@ struct EpochState<
9386
/// Path to the database used by the narwhal implementation
9487
pub store_path: ResolvedPathBuf,
9588
/// Narwhal execution state.
96-
execution_state: Arc<Execution<Q, EN, BN>>,
89+
execution_state: Arc<Execution<Q, NE>>,
9790
/// Used to send transactions to consensus
9891
/// We still use this socket on consensus struct because a node is not always on the committee,
9992
/// so its not always sending a transaction to its own mempool. The signer interface
@@ -108,20 +101,16 @@ struct EpochState<
108101
}
109102

110103
#[allow(clippy::too_many_arguments)]
111-
impl<
112-
Q: SyncQueryRunnerInterface,
113-
P: PubSub<PubSubMsg> + 'static,
114-
EN: EpochNotifierEmitter,
115-
BN: BlockNotifierEmitter,
116-
> EpochState<Q, P, EN, BN>
104+
impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter>
105+
EpochState<Q, P, NE>
117106
{
118107
fn new(
119108
node_public_key: NodePublicKey,
120109
consensus_public_key: ConsensusPublicKey,
121110
query_runner: Q,
122111
narwhal_args: NarwhalArgs,
123112
store_path: ResolvedPathBuf,
124-
execution_state: Arc<Execution<Q, EN, BN>>,
113+
execution_state: Arc<Execution<Q, NE>>,
125114
txn_socket: SubmitTxSocket,
126115
pub_sub: P,
127116
rx_narwhal_batches: mpsc::Receiver<(AuthenticStampedParcel, bool)>,
@@ -426,8 +415,7 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
426415
tx_narwhal_batches,
427416
query_runner.clone(),
428417
indexer_socket,
429-
notifier.new_epoch_emitter(),
430-
notifier.new_block_emitter(),
418+
notifier.get_emitter(),
431419
));
432420

433421
let shutdown_notify = Arc::new(Notify::new());

core/consensus/src/edge_node/consensus.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ use std::time::Duration;
55
use fleek_crypto::NodePublicKey;
66
use lightning_interfaces::types::Epoch;
77
use lightning_interfaces::{
8-
BlockNotifierEmitter,
98
BroadcastEventInterface,
10-
EpochNotifierEmitter,
9+
Emitter,
1110
PubSub,
1211
SyncQueryRunnerInterface,
1312
ToDigest,
@@ -31,14 +30,9 @@ pub struct EdgeConsensus {
3130
}
3231

3332
impl EdgeConsensus {
34-
pub fn spawn<
35-
P: PubSub<PubSubMsg> + 'static,
36-
Q: SyncQueryRunnerInterface,
37-
EN: EpochNotifierEmitter,
38-
BN: BlockNotifierEmitter,
39-
>(
33+
pub fn spawn<P: PubSub<PubSubMsg> + 'static, Q: SyncQueryRunnerInterface, NE: Emitter>(
4034
pub_sub: P,
41-
execution: Arc<Execution<Q, EN, BN>>,
35+
execution: Arc<Execution<Q, NE>>,
4236
query_runner: Q,
4337
node_public_key: NodePublicKey,
4438
rx_narwhal_batches: mpsc::Receiver<(AuthenticStampedParcel, bool)>,
@@ -74,15 +68,10 @@ impl EdgeConsensus {
7468

7569
/// Creates and event loop which consumes messages from pubsub and sends them to the
7670
/// right destination.
77-
async fn message_receiver_worker<
78-
P: PubSub<PubSubMsg>,
79-
Q: SyncQueryRunnerInterface,
80-
EN: EpochNotifierEmitter,
81-
BN: BlockNotifierEmitter,
82-
>(
71+
async fn message_receiver_worker<P: PubSub<PubSubMsg>, Q: SyncQueryRunnerInterface, NE: Emitter>(
8372
mut pub_sub: P,
8473
shutdown_notify: Arc<Notify>,
85-
execution: Arc<Execution<Q, EN, BN>>,
74+
execution: Arc<Execution<Q, NE>>,
8675
query_runner: Q,
8776
node_public_key: NodePublicKey,
8877
mut rx_narwhal_batch: mpsc::Receiver<(AuthenticStampedParcel, bool)>,

core/consensus/src/edge_node/transaction_store.rs

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use std::time::{Duration, SystemTime};
44

55
use lightning_interfaces::types::{Digest as BroadcastDigest, NodeIndex};
6-
use lightning_interfaces::{BlockNotifierEmitter, EpochNotifierEmitter, SyncQueryRunnerInterface};
6+
use lightning_interfaces::{Emitter, SyncQueryRunnerInterface};
77
use lightning_utils::application::QueryRunnerExt;
88

99
use super::ring_buffer::RingBuffer;
@@ -107,16 +107,12 @@ impl TransactionStore {
107107

108108
// Threshold should be 2f + 1 of the committee
109109
// Returns true if the epoch has changed
110-
pub async fn try_execute<
111-
Q: SyncQueryRunnerInterface,
112-
EN: EpochNotifierEmitter,
113-
BN: BlockNotifierEmitter,
114-
>(
110+
pub async fn try_execute<Q: SyncQueryRunnerInterface, NE: Emitter>(
115111
&mut self,
116112
digest: Digest,
117113
threshold: usize,
118114
query_runner: &Q,
119-
execution: &Arc<Execution<Q, EN, BN>>,
115+
execution: &Arc<Execution<Q, NE>>,
120116
) -> Result<bool, NotExecuted> {
121117
// get the current chain head
122118
let head = query_runner.get_last_block();
@@ -145,15 +141,11 @@ impl TransactionStore {
145141
Ok(epoch_changed)
146142
}
147143

148-
async fn try_execute_internal<
149-
Q: SyncQueryRunnerInterface,
150-
EN: EpochNotifierEmitter,
151-
BN: BlockNotifierEmitter,
152-
>(
144+
async fn try_execute_internal<Q: SyncQueryRunnerInterface, NE: Emitter>(
153145
&mut self,
154146
digest: Digest,
155147
threshold: usize,
156-
execution: &Arc<Execution<Q, EN, BN>>,
148+
execution: &Arc<Execution<Q, NE>>,
157149
head: Digest,
158150
) -> Result<bool, NotExecuted> {
159151
if self.executed.contains(&digest) {
@@ -170,14 +162,10 @@ impl TransactionStore {
170162
Err(NotExecuted::MissingAttestations(digest))
171163
}
172164

173-
async fn try_execute_chain<
174-
Q: SyncQueryRunnerInterface,
175-
EN: EpochNotifierEmitter,
176-
BN: BlockNotifierEmitter,
177-
>(
165+
async fn try_execute_chain<Q: SyncQueryRunnerInterface, NE: Emitter>(
178166
&mut self,
179167
digest: Digest,
180-
execution: &Arc<Execution<Q, EN, BN>>,
168+
execution: &Arc<Execution<Q, NE>>,
181169
head: Digest,
182170
) -> Result<bool, NotExecuted> {
183171
let mut txn_chain = VecDeque::new();

core/consensus/src/execution.rs

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ use fastcrypto::hash::HashFunction;
55
use fleek_blake3 as blake3;
66
use lightning_interfaces::types::{Block, Epoch, IndexRequest, NodeIndex, TransactionRequest};
77
use lightning_interfaces::{
8-
BlockNotifierEmitter,
9-
EpochNotifierEmitter,
8+
Emitter,
109
ExecutionEngineSocket,
1110
IndexSocket,
1211
SyncQueryRunnerInterface,
@@ -60,11 +59,7 @@ pub struct CommitteeAttestation {
6059
pub epoch: Epoch,
6160
}
6261

63-
pub struct Execution<
64-
Q: SyncQueryRunnerInterface,
65-
EN: EpochNotifierEmitter,
66-
BN: BlockNotifierEmitter,
67-
> {
62+
pub struct Execution<Q: SyncQueryRunnerInterface, NE: Emitter> {
6863
/// Managing certificates generated by narwhal.
6964
executor: ExecutionEngineSocket,
7065
/// Used to signal internal consensus processes that it is time to reconfigure for a new epoch
@@ -77,32 +72,26 @@ pub struct Execution<
7772
/// If this socket is present it means the node is in archive node and should send all blocks
7873
/// and transactions it executes to the archiver to be indexed
7974
index_socket: Option<IndexSocket>,
80-
/// Notifier that notifies every time an epoch is changed
81-
epoch_change_notifier: EN,
82-
/// Notifier that notifies every time a block is executed on application state
83-
new_block_notifier: BN,
75+
/// Notifications emitter
76+
notifier: NE,
8477
}
8578

86-
impl<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter, BN: BlockNotifierEmitter>
87-
Execution<Q, EN, BN>
88-
{
79+
impl<Q: SyncQueryRunnerInterface, NE: Emitter> Execution<Q, NE> {
8980
pub fn new(
9081
executor: ExecutionEngineSocket,
9182
reconfigure_notify: Arc<Notify>,
9283
tx_narwhal_batches: mpsc::Sender<(AuthenticStampedParcel, bool)>,
9384
query_runner: Q,
9485
index_socket: Option<IndexSocket>,
95-
epoch_change_notifier: EN,
96-
new_block_notifier: BN,
86+
notifier: NE,
9787
) -> Self {
9888
Self {
9989
executor,
10090
reconfigure_notify,
10191
tx_narwhal_batches,
10292
query_runner,
10393
index_socket,
104-
epoch_change_notifier,
105-
new_block_notifier,
94+
notifier,
10695
}
10796
}
10897

@@ -136,7 +125,7 @@ impl<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter, BN: BlockNotifierEmi
136125

137126
if results.change_epoch {
138127
change_epoch = true;
139-
self.epoch_change_notifier.epoch_changed();
128+
self.notifier.epoch_changed();
140129
}
141130

142131
// If we have the archive socket that means our node is in archive node and we should send
@@ -152,16 +141,14 @@ impl<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter, BN: BlockNotifierEmi
152141
error!("We could not send a message to the archiver: {e}");
153142
}
154143
}
155-
self.new_block_notifier.new_block();
144+
self.notifier.new_block();
156145

157146
change_epoch
158147
}
159148
}
160149

161150
#[async_trait]
162-
impl<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter, BN: BlockNotifierEmitter> ExecutionState
163-
for Execution<Q, EN, BN>
164-
{
151+
impl<Q: SyncQueryRunnerInterface, NE: Emitter> ExecutionState for Execution<Q, NE> {
165152
async fn handle_consensus_output(&self, consensus_output: ConsensusOutput) {
166153
for (cert, batches) in consensus_output.batches {
167154
let current_epoch = self.query_runner.get_current_epoch();

core/interfaces/src/notifier.rs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,38 +15,27 @@ pub enum Notification {
1515

1616
#[infusion::service]
1717
pub trait NotifierInterface<C: Collection>: Sync + Send + Clone {
18-
type EpochEmitter: EpochNotifierEmitter;
19-
type BlockEmitter: BlockNotifierEmitter;
18+
type Emitter: Emitter;
2019

2120
fn _init(app: ::ApplicationInterface) {
2221
ok!(Self::init(app))
2322
}
24-
2523
fn init(app: &c!(C::ApplicationInterface)) -> Self;
26-
27-
/// Returns a reference to the emitter end of this notifier. Should only be used if we are
28-
/// interested (and responsible) for triggering a notification around new block.
29-
fn new_block_emitter(&self) -> Self::BlockEmitter;
30-
3124
/// Returns a reference to the emitter end of this notifier. Should only be used if we are
3225
/// interested (and responsible) for triggering a notification around new epoch.
33-
fn new_epoch_emitter(&self) -> Self::EpochEmitter;
26+
fn get_emitter(&self) -> Self::Emitter;
3427

3528
fn notify_on_new_block(&self, tx: mpsc::Sender<Notification>);
3629

3730
fn notify_on_new_epoch(&self, tx: mpsc::Sender<Notification>);
3831

3932
fn notify_before_epoch_change(&self, duration: Duration, tx: mpsc::Sender<Notification>);
4033
}
41-
4234
#[infusion::blank]
43-
pub trait EpochNotifierEmitter: Clone + Send + Sync + 'static {
35+
pub trait Emitter: Clone + Send + Sync + 'static {
4436
/// Notify the waiters about epoch change.
4537
fn epoch_changed(&self);
46-
}
4738

48-
#[infusion::blank]
49-
pub trait BlockNotifierEmitter: Clone + Send + Sync + 'static {
5039
/// Notify the waiters about new block.
5140
fn new_block(&self);
5241
}

core/mock/src/consensus.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ use lightning_interfaces::infu_collection::{c, Collection};
1212
use lightning_interfaces::types::{Block, TransactionRequest};
1313
use lightning_interfaces::{
1414
ApplicationInterface,
15-
BlockNotifierEmitter,
1615
BroadcastInterface,
1716
ConfigConsumer,
1817
ConsensusInterface,
18+
Emitter,
1919
ExecutionEngineSocket,
2020
IndexSocket,
2121
MempoolSocket,
@@ -160,11 +160,7 @@ impl<C: Collection> ConsensusInterface<C> for MockConsensus<C> {
160160
notifier: &c!(C::NotifierInterface),
161161
) -> anyhow::Result<Self> {
162162
let (tx, rx) = mpsc::channel(128);
163-
tokio::spawn(run_consensus::<C>(
164-
rx,
165-
executor,
166-
notifier.new_block_emitter(),
167-
));
163+
tokio::spawn(run_consensus::<C>(rx, executor, notifier.get_emitter()));
168164

169165
let mempool = TokioSpawn::spawn_async(MempoolSocketWorker {
170166
sender: tx.clone(),
@@ -199,7 +195,7 @@ impl<C: Collection> ConfigConsumer for MockConsensus<C> {
199195
async fn run_consensus<C: Collection>(
200196
mut rx: mpsc::Receiver<TransactionRequest>,
201197
exec: ExecutionEngineSocket,
202-
new_block_notifier: c!(C::NotifierInterface::BlockEmitter),
198+
notifier: c!(C::NotifierInterface::Emitter),
203199
) {
204200
let mut ticker = tokio::time::interval(Duration::from_millis(1));
205201
let mut buffer = Vec::<TransactionRequest>::with_capacity(32);
@@ -219,7 +215,7 @@ async fn run_consensus<C: Collection>(
219215

220216
let _ = exec.run(block).await;
221217

222-
new_block_notifier.new_block();
218+
notifier.new_block();
223219
}
224220
tmp = rx.recv() => {
225221
if let Some(tx) = tmp {

0 commit comments

Comments
 (0)