Skip to content

Commit e81017e

Browse files
committed
refactor(consensus): get rid of injection of notifications into the Affair socket
1 parent 6482209 commit e81017e

File tree

29 files changed

+169
-58
lines changed

29 files changed

+169
-58
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4533,6 +4533,7 @@ dependencies = [
45334533
"lightning-application",
45344534
"lightning-indexer",
45354535
"lightning-interfaces",
4536+
"lightning-notifier",
45364537
"lightning-rep-collector",
45374538
"lightning-signer",
45384539
"lightning-test-utils",
@@ -4692,6 +4693,7 @@ dependencies = [
46924693
"infusion",
46934694
"lightning-application",
46944695
"lightning-interfaces",
4696+
"lightning-notifier",
46954697
"lightning-signer",
46964698
"lightning-test-utils",
46974699
"queue-file",
@@ -4839,6 +4841,7 @@ dependencies = [
48394841
"infusion",
48404842
"lightning-application",
48414843
"lightning-interfaces",
4844+
"lightning-notifier",
48424845
"lightning-rep-collector",
48434846
"lightning-signer",
48444847
"lightning-test-utils",
@@ -4980,6 +4983,7 @@ dependencies = [
49804983
"lightning-blockstore",
49814984
"lightning-indexer",
49824985
"lightning-interfaces",
4986+
"lightning-notifier",
49834987
"lightning-origin-http",
49844988
"lightning-origin-ipfs",
49854989
"lightning-signer",
@@ -5009,6 +5013,7 @@ dependencies = [
50095013
"lightning-blockstore",
50105014
"lightning-indexer",
50115015
"lightning-interfaces",
5016+
"lightning-notifier",
50125017
"lightning-signer",
50135018
"lightning-test-utils",
50145019
"reqwest",
@@ -5034,6 +5039,7 @@ dependencies = [
50345039
"lightning-blockstore",
50355040
"lightning-indexer",
50365041
"lightning-interfaces",
5042+
"lightning-notifier",
50375043
"lightning-signer",
50385044
"lightning-test-utils",
50395045
"multihash 0.19.1",
@@ -5151,6 +5157,7 @@ dependencies = [
51515157
"lightning-application",
51525158
"lightning-broadcast",
51535159
"lightning-interfaces",
5160+
"lightning-notifier",
51545161
"lightning-pool",
51555162
"lightning-rep-collector",
51565163
"lightning-signer",
@@ -5271,6 +5278,7 @@ dependencies = [
52715278
"infusion",
52725279
"lightning-application",
52735280
"lightning-interfaces",
5281+
"lightning-notifier",
52745282
"lightning-test-utils",
52755283
"lightning-utils",
52765284
"resolved-pathbuf",

core/blockstore/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ lightning-signer = { path="../signer" }
3131
lightning-application = { path="../application", features = ["test"] }
3232
lightning-rep-collector = { path="../rep-collector" }
3333
lightning-indexer = { path = "../indexer" }
34+
lightning-notifier = { path="../notifier" }

core/blockstore/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ mod tests {
2525
ConsensusInterface,
2626
IncrementalPutInterface,
2727
IndexerInterface,
28+
NotifierInterface,
2829
SignerInterface,
2930
SyncQueryRunnerInterface,
3031
WithStartAndShutdown,
3132
};
33+
use lightning_notifier::Notifier;
3234
use lightning_signer::{Config as SignerConfig, Signer};
3335
use lightning_test_utils::consensus::{Config as ConsensusConfig, MockConsensus};
3436
use tokio::test;
@@ -58,6 +60,7 @@ mod tests {
5860
SignerInterface = Signer<Self>;
5961
ConsensusInterface = MockConsensus<Self>;
6062
IndexerInterface = Indexer<Self>;
63+
NotifierInterface = Notifier<Self>;
6164
});
6265

6366
fn create_content() -> Vec<u8> {
@@ -169,6 +172,8 @@ mod tests {
169172

170173
let mut signer = Signer::<TestBinding>::init(signer_config, query_runner.clone()).unwrap();
171174

175+
let notifier = Notifier::<TestBinding>::init(&app);
176+
172177
let consensus_config = ConsensusConfig {
173178
min_ordering_time: 0,
174179
max_ordering_time: 1,
@@ -183,6 +188,7 @@ mod tests {
183188
query_runner.clone(),
184189
infusion::Blank::default(),
185190
None,
191+
&notifier,
186192
)
187193
.unwrap();
188194

core/consensus/src/consensus.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use lightning_interfaces::types::{Epoch, EpochInfo, UpdateMethod};
1616
use lightning_interfaces::{
1717
ApplicationInterface,
1818
BroadcastInterface,
19+
EpochNotifierEmitter,
1920
IndexSocket,
21+
NotifierInterface,
2022
PubSub,
2123
SyncQueryRunnerInterface,
2224
};
@@ -52,6 +54,7 @@ pub struct Consensus<C: Collection> {
5254
EpochState<
5355
c![C::ApplicationInterface::SyncExecutor],
5456
c![C::BroadcastInterface::PubSub<PubSubMsg>],
57+
c![C::NotifierInterface::EpochEmitter],
5558
>,
5659
>,
5760
>,
@@ -71,7 +74,11 @@ pub struct Consensus<C: Collection> {
7174
}
7275

7376
/// This struct contains mutable state only for the current epoch.
74-
struct EpochState<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static> {
77+
struct EpochState<
78+
Q: SyncQueryRunnerInterface,
79+
P: PubSub<PubSubMsg> + 'static,
80+
EN: EpochNotifierEmitter,
81+
> {
7582
/// The node public key of the node.
7683
node_public_key: NodePublicKey,
7784
/// The consensus public key of the node.
@@ -85,7 +92,7 @@ struct EpochState<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static> {
8592
/// Path to the database used by the narwhal implementation
8693
pub store_path: ResolvedPathBuf,
8794
/// Narwhal execution state.
88-
execution_state: Arc<Execution<Q>>,
95+
execution_state: Arc<Execution<Q, EN>>,
8996
/// Used to send transactions to consensus
9097
/// We still use this socket on consensus struct because a node is not always on the committee,
9198
/// so its not always sending a transaction to its own mempool. The signer interface
@@ -100,14 +107,16 @@ struct EpochState<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static> {
100107
}
101108

102109
#[allow(clippy::too_many_arguments)]
103-
impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static> EpochState<Q, P> {
110+
impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, EN: EpochNotifierEmitter>
111+
EpochState<Q, P, EN>
112+
{
104113
fn new(
105114
node_public_key: NodePublicKey,
106115
consensus_public_key: ConsensusPublicKey,
107116
query_runner: Q,
108117
narwhal_args: NarwhalArgs,
109118
store_path: ResolvedPathBuf,
110-
execution_state: Arc<Execution<Q>>,
119+
execution_state: Arc<Execution<Q, EN>>,
111120
txn_socket: SubmitTxSocket,
112121
pub_sub: P,
113122
rx_narwhal_batches: mpsc::Receiver<(AuthenticStampedParcel, bool)>,
@@ -384,6 +393,7 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
384393
query_runner: c!(C::ApplicationInterface::SyncExecutor),
385394
pubsub: c!(C::BroadcastInterface::PubSub<Self::Certificate>),
386395
indexer_socket: Option<IndexSocket>,
396+
notifier: &c!(C::NotifierInterface),
387397
) -> anyhow::Result<Self> {
388398
// Spawn the registry for narwhal
389399
let registry = Registry::new();
@@ -413,6 +423,7 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
413423
tx_narwhal_batches,
414424
query_runner.clone(),
415425
indexer_socket,
426+
notifier.epoch_emitter(),
416427
));
417428

418429
let shutdown_notify = Arc::new(Notify::new());

core/consensus/src/edge_node/consensus.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@ use std::time::Duration;
44

55
use fleek_crypto::NodePublicKey;
66
use lightning_interfaces::types::Epoch;
7-
use lightning_interfaces::{BroadcastEventInterface, PubSub, SyncQueryRunnerInterface, ToDigest};
7+
use lightning_interfaces::{
8+
BroadcastEventInterface,
9+
EpochNotifierEmitter,
10+
PubSub,
11+
SyncQueryRunnerInterface,
12+
ToDigest,
13+
};
814
use lightning_utils::application::QueryRunnerExt;
915
use quick_cache::unsync::Cache;
1016
use tokio::pin;
@@ -24,9 +30,13 @@ pub struct EdgeConsensus {
2430
}
2531

2632
impl EdgeConsensus {
27-
pub fn spawn<P: PubSub<PubSubMsg> + 'static, Q: SyncQueryRunnerInterface>(
33+
pub fn spawn<
34+
P: PubSub<PubSubMsg> + 'static,
35+
Q: SyncQueryRunnerInterface,
36+
EN: EpochNotifierEmitter,
37+
>(
2838
pub_sub: P,
29-
execution: Arc<Execution<Q>>,
39+
execution: Arc<Execution<Q, EN>>,
3040
query_runner: Q,
3141
node_public_key: NodePublicKey,
3242
rx_narwhal_batches: mpsc::Receiver<(AuthenticStampedParcel, bool)>,
@@ -62,10 +72,14 @@ impl EdgeConsensus {
6272

6373
/// Creates and event loop which consumes messages from pubsub and sends them to the
6474
/// right destination.
65-
async fn message_receiver_worker<P: PubSub<PubSubMsg>, Q: SyncQueryRunnerInterface>(
75+
async fn message_receiver_worker<
76+
P: PubSub<PubSubMsg>,
77+
Q: SyncQueryRunnerInterface,
78+
EN: EpochNotifierEmitter,
79+
>(
6680
mut pub_sub: P,
6781
shutdown_notify: Arc<Notify>,
68-
execution: Arc<Execution<Q>>,
82+
execution: Arc<Execution<Q, EN>>,
6983
query_runner: Q,
7084
node_public_key: NodePublicKey,
7185
mut rx_narwhal_batch: mpsc::Receiver<(AuthenticStampedParcel, bool)>,

core/consensus/src/edge_node/transaction_store.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use std::time::{Duration, SystemTime};
44

55
use lightning_interfaces::types::{Digest as BroadcastDigest, NodeIndex};
6-
use lightning_interfaces::SyncQueryRunnerInterface;
6+
use lightning_interfaces::{EpochNotifierEmitter, SyncQueryRunnerInterface};
77
use lightning_utils::application::QueryRunnerExt;
88

99
use super::ring_buffer::RingBuffer;
@@ -107,12 +107,12 @@ impl TransactionStore {
107107

108108
// Threshold should be 2f + 1 of the committee
109109
// Returns true if the epoch has changed
110-
pub async fn try_execute<Q: SyncQueryRunnerInterface>(
110+
pub async fn try_execute<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter>(
111111
&mut self,
112112
digest: Digest,
113113
threshold: usize,
114114
query_runner: &Q,
115-
execution: &Arc<Execution<Q>>,
115+
execution: &Arc<Execution<Q, EN>>,
116116
) -> Result<bool, NotExecuted> {
117117
// get the current chain head
118118
let head = query_runner.get_last_block();
@@ -141,11 +141,11 @@ impl TransactionStore {
141141
Ok(epoch_changed)
142142
}
143143

144-
async fn try_execute_internal<Q: SyncQueryRunnerInterface>(
144+
async fn try_execute_internal<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter>(
145145
&mut self,
146146
digest: Digest,
147147
threshold: usize,
148-
execution: &Arc<Execution<Q>>,
148+
execution: &Arc<Execution<Q, EN>>,
149149
head: Digest,
150150
) -> Result<bool, NotExecuted> {
151151
if self.executed.contains(&digest) {
@@ -162,10 +162,10 @@ impl TransactionStore {
162162
Err(NotExecuted::MissingAttestations(digest))
163163
}
164164

165-
async fn try_execute_chain<Q: SyncQueryRunnerInterface>(
165+
async fn try_execute_chain<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter>(
166166
&mut self,
167167
digest: Digest,
168-
execution: &Arc<Execution<Q>>,
168+
execution: &Arc<Execution<Q, EN>>,
169169
head: Digest,
170170
) -> Result<bool, NotExecuted> {
171171
let mut txn_chain = VecDeque::new();

core/consensus/src/execution.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use fastcrypto::hash::HashFunction;
55
use fleek_blake3 as blake3;
66
use lightning_interfaces::types::{Block, Epoch, IndexRequest, NodeIndex, TransactionRequest};
77
use lightning_interfaces::{
8+
EpochNotifierEmitter,
89
ExecutionEngineSocket,
910
IndexSocket,
1011
SyncQueryRunnerInterface,
@@ -58,7 +59,7 @@ pub struct CommitteeAttestation {
5859
pub epoch: Epoch,
5960
}
6061

61-
pub struct Execution<Q: SyncQueryRunnerInterface> {
62+
pub struct Execution<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter> {
6263
/// Managing certificates generated by narwhal.
6364
executor: ExecutionEngineSocket,
6465
/// Used to signal internal consensus processes that it is time to reconfigure for a new epoch
@@ -73,16 +74,18 @@ pub struct Execution<Q: SyncQueryRunnerInterface> {
7374
/// If this socket is present it means the node is in archive node and should send all blocks
7475
/// and transactions it executes to the archiver to be indexed
7576
index_socket: Option<IndexSocket>,
77+
epoch_change_notifier: EN,
7678
}
7779

78-
impl<Q: SyncQueryRunnerInterface> Execution<Q> {
80+
impl<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter> Execution<Q, EN> {
7981
pub fn new(
8082
executor: ExecutionEngineSocket,
8183
reconfigure_notify: Arc<Notify>,
8284
new_block_notify: Arc<Notify>,
8385
tx_narwhal_batches: mpsc::Sender<(AuthenticStampedParcel, bool)>,
8486
query_runner: Q,
8587
index_socket: Option<IndexSocket>,
88+
epoch_change_notifier: EN,
8689
) -> Self {
8790
Self {
8891
executor,
@@ -91,6 +94,7 @@ impl<Q: SyncQueryRunnerInterface> Execution<Q> {
9194
tx_narwhal_batches,
9295
query_runner,
9396
index_socket,
97+
epoch_change_notifier,
9498
}
9599
}
96100

@@ -124,6 +128,7 @@ impl<Q: SyncQueryRunnerInterface> Execution<Q> {
124128

125129
if results.change_epoch {
126130
change_epoch = true;
131+
self.epoch_change_notifier.epoch_changed();
127132
}
128133

129134
// If we have the archive socket that means our node is in archive node and we should send
@@ -147,7 +152,7 @@ impl<Q: SyncQueryRunnerInterface> Execution<Q> {
147152
}
148153

149154
#[async_trait]
150-
impl<Q: SyncQueryRunnerInterface> ExecutionState for Execution<Q> {
155+
impl<Q: SyncQueryRunnerInterface, EN: EpochNotifierEmitter> ExecutionState for Execution<Q, EN> {
151156
async fn handle_consensus_output(&self, consensus_output: ConsensusOutput) {
152157
for (cert, batches) in consensus_output.batches {
153158
let current_epoch = self.query_runner.get_current_epoch();

core/dack-aggregator/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
lightning-interfaces = {path="../interfaces"}
9+
lightning-interfaces = { path="../interfaces" }
1010
anyhow.workspace = true
1111
serde.workspace = true
1212
bincode.workspace = true
@@ -17,8 +17,9 @@ resolved-pathbuf.workspace = true
1717
queue-file = "1.4.10"
1818

1919
[dev-dependencies]
20-
lightning-test-utils = {path="../test-utils"}
21-
lightning-signer = {path="../signer"}
22-
lightning-application = {path="../application", features = ["test"]}
20+
lightning-test-utils = { path="../test-utils" }
21+
lightning-signer = { path = "../signer" }
22+
lightning-application = { path = "../application", features = ["test"] }
23+
lightning-notifier = { path = "../notifier" }
2324
infusion.workspace = true
2425
fleek-crypto.workspace = true

0 commit comments

Comments
 (0)