Skip to content

Commit a841550

Browse files
committed
chore(application): take ExecutionEngineSocket instead of cloning it
1 parent 3d39d89 commit a841550

File tree

19 files changed

+67
-67
lines changed

19 files changed

+67
-67
lines changed

core/application/src/app.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::marker::PhantomData;
2+
use std::sync::{Arc, Mutex};
23

34
use affair::{Executor, TokioSpawn};
45
use anyhow::{anyhow, Result};
@@ -15,7 +16,7 @@ use crate::config::{Config, StorageConfig};
1516
use crate::env::{Env, UpdateWorker};
1617
use crate::query_runner::QueryRunner;
1718
pub struct Application<C: Collection> {
18-
update_socket: ExecutionEngineSocket,
19+
update_socket: Arc<Mutex<Option<ExecutionEngineSocket>>>,
1920
query_runner: QueryRunner,
2021
collection: PhantomData<C>,
2122
}
@@ -63,7 +64,9 @@ impl<C: Collection> ApplicationInterface<C> for Application<C> {
6364

6465
Ok(Self {
6566
query_runner: env.query_runner(),
66-
update_socket: TokioSpawn::spawn_async(UpdateWorker::<C>::new(env, blockstore)),
67+
update_socket: Arc::new(Mutex::new(Some(TokioSpawn::spawn_async(
68+
UpdateWorker::<C>::new(env, blockstore),
69+
)))),
6770
collection: PhantomData,
6871
})
6972
}
@@ -74,8 +77,8 @@ impl<C: Collection> ApplicationInterface<C> for Application<C> {
7477
/// # Safety
7578
///
7679
/// See the safety document for the [`ExecutionEngineSocket`].
77-
fn transaction_executor(&self) -> ExecutionEngineSocket {
78-
self.update_socket.clone()
80+
fn transaction_executor(&self) -> Option<ExecutionEngineSocket> {
81+
self.update_socket.lock().unwrap().take()
7982
}
8083

8184
/// Returns the instance of a sync query runner which can be used to run queries without

core/application/src/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ fn init_app(config: Option<Config>) -> (ExecutionEngineSocket, QueryRunner) {
570570
fn do_init_app(config: Config) -> (ExecutionEngineSocket, QueryRunner) {
571571
let app = Application::<TestBinding>::init(config, Default::default()).unwrap();
572572

573-
(app.transaction_executor(), app.sync_query())
573+
(app.transaction_executor().unwrap(), app.sync_query())
574574
}
575575

576576
/// Initialize application with provided committee.

core/blockstore/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ mod tests {
179179
let consensus = MockConsensus::<TestBinding>::init(
180180
consensus_config,
181181
&signer,
182-
update_socket,
182+
update_socket.unwrap(),
183183
query_runner.clone(),
184184
infusion::Blank::default(),
185185
None,

core/consensus/src/consensus.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::narwhal::{NarwhalArgs, NarwhalService};
4444

4545
pub struct Consensus<C: Collection> {
4646
/// Inner state of the consensus
47-
/// todo(dalton): We can probably get a little more effecient then a mutex here
47+
/// todo(dalton): We can probably get a little more efficient then a mutex here
4848
/// maybe a once box
4949
#[allow(clippy::type_complexity)]
5050
epoch_state: Mutex<
@@ -55,13 +55,13 @@ pub struct Consensus<C: Collection> {
5555
>,
5656
>,
5757
>,
58-
/// This socket recieves signed transactions and forwards them to an active committee member to
58+
/// This socket receives signed transactions and forwards them to an active committee member to
5959
/// be ordered
6060
mempool_socket: MempoolSocket,
6161
/// Timestamp of the narwhal certificate that caused an epoch change
6262
/// is sent through this channel to notify that epoch chould change.
6363
reconfigure_notify: Arc<Notify>,
64-
/// A notifier that is notified everytime a new block is proccessed
64+
/// A notifier that is notified every time a new block is processed
6565
new_block_notify: Arc<Notify>,
6666
/// Called from the shutdown function to notify the start event loop to
6767
/// exit.
@@ -93,7 +93,7 @@ struct EpochState<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static> {
9393
txn_socket: SubmitTxSocket,
9494
/// Interface for sending messages through the gossip layer
9595
pub_sub: P,
96-
/// Narhwal sends payloads ready for broadcast to this reciever
96+
/// Narhwal sends payloads ready for broadcast to this receiver
9797
rx_narwhal_batches: Option<mpsc::Receiver<(AuthenticStampedParcel, bool)>>,
9898
/// To notify when consensus is shutting down
9999
shutdown_notify: Arc<Notify>,

core/consensus/src/execution.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ impl ToDigest for AuthenticStampedParcel {
4747
}
4848

4949
/// A message an authority sends out attest that an Authentic stamp parcel is accurate. When an edge
50-
/// node gets 2f+1 of these it commmits the transactions in the parcel
50+
/// node gets 2f+1 of these it commits the transactions in the parcel
5151
#[derive(Serialize, Deserialize, Debug, Clone)]
5252
pub struct CommitteeAttestation {
5353
/// The digest we are attesting is correct
5454
pub digest: Digest,
55-
/// We send random bytes with this messsage so it gives it a unique hash and differentiates it
55+
/// We send random bytes with this message so it gives it a unique hash and differentiates it
5656
/// from the other committee members attestation broadcasts
5757
pub node_index: NodeIndex,
5858
pub epoch: Epoch,
@@ -61,9 +61,9 @@ pub struct CommitteeAttestation {
6161
pub struct Execution<Q: SyncQueryRunnerInterface> {
6262
/// Managing certificates generated by narwhal.
6363
executor: ExecutionEngineSocket,
64-
/// Used to signal internal consensus proccesses that it is time to reconfigure for a new epoch
64+
/// Used to signal internal consensus processes that it is time to reconfigure for a new epoch
6565
reconfigure_notify: Arc<Notify>,
66-
/// Notifier that notifies everytime a block is executed on application state
66+
/// Notifier that notifies every time a block is executed on application state
6767
new_block_notify: Arc<Notify>,
6868
/// Used to send payloads to the edge node consensus to broadcast out to other nodes
6969
tx_narwhal_batches: mpsc::Sender<(AuthenticStampedParcel, bool)>,
@@ -127,7 +127,7 @@ impl<Q: SyncQueryRunnerInterface> Execution<Q> {
127127
}
128128

129129
// If we have the archive socket that means our node is in archive node and we should send
130-
// the block and the reciept to be indexed
130+
// the block and the receipt to be indexed
131131
if let (Some(block), Some(socket)) = (archive_block, &self.index_socket) {
132132
if let Err(e) = socket
133133
.run(IndexRequest {
@@ -189,7 +189,7 @@ impl<Q: SyncQueryRunnerInterface> ExecutionState for Execution<Q> {
189189
let epoch_changed = self.submit_batch(batch_payload, parcel.to_digest()).await;
190190

191191
if let Err(e) = self.tx_narwhal_batches.send((parcel, epoch_changed)).await {
192-
// This shouldnt ever happen. But if it does there is no critical tasks
192+
// This shouldn't ever happen. But if it does there is no critical tasks
193193
// happening on the other end of this that would require a
194194
// panic
195195
error!("Narwhal failed to send batch payload to edge consensus: {e:?}");

core/dack-aggregator/src/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ async fn init_aggregator(path: PathBuf) -> Node<TestBinding> {
100100
let consensus = MockConsensus::<TestBinding>::init(
101101
consensus_config,
102102
&signer,
103-
update_socket.clone(),
103+
update_socket.unwrap(),
104104
query_runner.clone(),
105105
infusion::Blank::default(),
106106
None,

core/fetcher/src/tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,10 @@ async fn get_fetchers(
157157
.unwrap();
158158
app.start().await;
159159

160+
let update_socket = app.transaction_executor().unwrap();
160161
let mut peers = Vec::new();
161162
for (i, signer_config) in signers_configs.into_iter().enumerate() {
162-
let (update_socket, query_runner) = (app.transaction_executor(), app.sync_query());
163+
let query_runner = app.sync_query();
163164
let mut signer = Signer::<TestBinding>::init(signer_config, query_runner.clone()).unwrap();
164165
let topology = Topology::<TestBinding>::init(
165166
TopologyConfig::default(),

core/indexer/src/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ async fn test_submission() {
121121
let consensus = MockConsensus::<TestBinding>::init(
122122
consensus_config,
123123
&signer,
124-
update_socket,
124+
update_socket.unwrap(),
125125
query_runner.clone(),
126126
infusion::Blank::default(),
127127
None,

core/interfaces/src/application.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub trait ApplicationInterface<C: Collection>:
6666
/// # Safety
6767
///
6868
/// See the safety document for the [`ExecutionEngineSocket`].
69-
fn transaction_executor(&self) -> ExecutionEngineSocket;
69+
fn transaction_executor(&self) -> Option<ExecutionEngineSocket>;
7070

7171
/// Returns the instance of a sync query runner which can be used to run queries without
7272
/// blocking or awaiting. A naive (& blocking) implementation can achieve this by simply
@@ -111,7 +111,7 @@ pub trait SyncQueryRunnerInterface: Clone + Send + Sync + 'static {
111111
/// Query Pub Key to Node Index Table
112112
fn pubkey_to_index(&self, pub_key: &NodePublicKey) -> Option<NodeIndex>;
113113

114-
/// Query Committe Table
114+
/// Query Committee Table
115115
fn get_committe_info<V>(
116116
&self,
117117
epoch: &Epoch,

core/interfaces/src/consensus.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::{
1717
BroadcastInterface,
1818
ConfigProviderInterface,
1919
IndexSocket,
20+
NotifierInterface,
2021
};
2122

2223
/// A socket that gives services and other sub-systems the required functionality to
@@ -39,8 +40,17 @@ pub trait ConsensusInterface<C: Collection>:
3940
app: ::ApplicationInterface,
4041
broadcast: ::BroadcastInterface,
4142
archive: ::ArchiveInterface,
43+
notifier: ::NotifierInterface,
4244
) {
43-
let executor = app.transaction_executor();
45+
let executor = app
46+
.transaction_executor()
47+
.expect("ConsensusInferface::_init - Update Socket should be available");
48+
let epoch_change_notifier = notifier.emitters();
49+
executor.inject(move |res| {
50+
if res.change_epoch {
51+
epoch_change_notifier.notify_waiters();
52+
}
53+
});
4454
let sqr = app.sync_query();
4555
let pubsub = broadcast.get_pubsub(crate::types::Topic::Consensus);
4656
Self::init(
@@ -71,7 +81,7 @@ pub trait ConsensusInterface<C: Collection>:
7181
#[blank = Socket::raw_bounded(64).0]
7282
fn mempool(&self) -> MempoolSocket;
7383

74-
/// Returns a tokio Notifier that notifies everytime a new block is finished being processed
84+
/// Returns a tokio Notifier that notifies every time a new block is finished being processed
7585
#[blank = Default::default()]
7686
fn new_block_notifier(&self) -> Arc<Notify>;
7787
}

0 commit comments

Comments
 (0)