Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4541,6 +4541,7 @@ dependencies = [
"lightning-application",
"lightning-indexer",
"lightning-interfaces",
"lightning-notifier",
"lightning-rep-collector",
"lightning-signer",
"lightning-test-utils",
Expand Down Expand Up @@ -4700,6 +4701,7 @@ dependencies = [
"infusion",
"lightning-application",
"lightning-interfaces",
"lightning-notifier",
"lightning-signer",
"lightning-test-utils",
"queue-file",
Expand Down Expand Up @@ -4847,6 +4849,7 @@ dependencies = [
"infusion",
"lightning-application",
"lightning-interfaces",
"lightning-notifier",
"lightning-rep-collector",
"lightning-signer",
"lightning-test-utils",
Expand Down Expand Up @@ -4988,6 +4991,7 @@ dependencies = [
"lightning-blockstore",
"lightning-indexer",
"lightning-interfaces",
"lightning-notifier",
"lightning-origin-http",
"lightning-origin-ipfs",
"lightning-signer",
Expand Down Expand Up @@ -5018,6 +5022,7 @@ dependencies = [
"lightning-blockstore",
"lightning-indexer",
"lightning-interfaces",
"lightning-notifier",
"lightning-signer",
"lightning-test-utils",
"reqwest",
Expand All @@ -5043,6 +5048,7 @@ dependencies = [
"lightning-blockstore",
"lightning-indexer",
"lightning-interfaces",
"lightning-notifier",
"lightning-signer",
"lightning-test-utils",
"multihash 0.19.1",
Expand Down Expand Up @@ -5160,6 +5166,7 @@ dependencies = [
"lightning-application",
"lightning-broadcast",
"lightning-interfaces",
"lightning-notifier",
"lightning-pool",
"lightning-rep-collector",
"lightning-signer",
Expand Down Expand Up @@ -5281,6 +5288,7 @@ dependencies = [
"infusion",
"lightning-application",
"lightning-interfaces",
"lightning-notifier",
"lightning-test-utils",
"lightning-utils",
"resolved-pathbuf",
Expand Down
13 changes: 10 additions & 3 deletions core/application/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::marker::PhantomData;
use std::sync::Mutex;

use affair::{Executor, TokioSpawn};
use anyhow::{anyhow, Result};
Expand All @@ -15,7 +16,7 @@ use crate::config::{Config, StorageConfig};
use crate::env::{Env, UpdateWorker};
use crate::query_runner::QueryRunner;
pub struct Application<C: Collection> {
update_socket: ExecutionEngineSocket,
update_socket: Mutex<Option<ExecutionEngineSocket>>,
query_runner: QueryRunner,
collection: PhantomData<C>,
}
Expand Down Expand Up @@ -63,7 +64,9 @@ impl<C: Collection> ApplicationInterface<C> for Application<C> {

Ok(Self {
query_runner: env.query_runner(),
update_socket: TokioSpawn::spawn_async(UpdateWorker::<C>::new(env, blockstore)),
update_socket: Mutex::new(Some(TokioSpawn::spawn_async(UpdateWorker::<C>::new(
env, blockstore,
)))),
collection: PhantomData,
})
}
Expand All @@ -75,7 +78,11 @@ impl<C: Collection> ApplicationInterface<C> for Application<C> {
///
/// See the safety document for the [`ExecutionEngineSocket`].
fn transaction_executor(&self) -> ExecutionEngineSocket {
self.update_socket.clone()
self.update_socket
.lock()
.unwrap()
.take()
.expect("Execution Engine Socket has already been taken")
}

/// Returns the instance of a sync query runner which can be used to run queries without
Expand Down
2 changes: 1 addition & 1 deletion core/blockstore-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async fn get_peers(

let mut peers = Vec::new();
for (i, signer_config) in signers_configs.into_iter().enumerate() {
let (_, query_runner) = (app.transaction_executor(), app.sync_query());
let query_runner = app.sync_query();
let signer = Signer::<TestBinding>::init(signer_config, query_runner.clone()).unwrap();
let notifier = Notifier::<TestBinding>::init(&app);
let topology = Topology::<TestBinding>::init(
Expand Down
1 change: 1 addition & 0 deletions core/blockstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ lightning-signer = { path="../signer" }
lightning-application = { path="../application", features = ["test"] }
lightning-rep-collector = { path="../rep-collector" }
lightning-indexer = { path = "../indexer" }
lightning-notifier = { path="../notifier" }
13 changes: 12 additions & 1 deletion core/blockstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ mod tests {
ConsensusInterface,
IncrementalPutInterface,
IndexerInterface,
NotifierInterface,
SignerInterface,
SyncQueryRunnerInterface,
WithStartAndShutdown,
};
use lightning_notifier::Notifier;
use lightning_signer::{Config as SignerConfig, Signer};
use lightning_test_utils::consensus::{Config as ConsensusConfig, MockConsensus};
use tokio::sync::mpsc;
use tokio::test;

use crate::blockstore::{Blockstore, BLOCK_SIZE};
Expand Down Expand Up @@ -58,6 +61,7 @@ mod tests {
SignerInterface = Signer<Self>;
ConsensusInterface = MockConsensus<Self>;
IndexerInterface = Indexer<Self>;
NotifierInterface = Notifier<Self>;
});

fn create_content() -> Vec<u8> {
Expand Down Expand Up @@ -169,6 +173,8 @@ mod tests {

let mut signer = Signer::<TestBinding>::init(signer_config, query_runner.clone()).unwrap();

let notifier = Notifier::<TestBinding>::init(&app);

let consensus_config = ConsensusConfig {
min_ordering_time: 0,
max_ordering_time: 1,
Expand All @@ -183,11 +189,16 @@ mod tests {
query_runner.clone(),
infusion::Blank::default(),
None,
&notifier,
)
.unwrap();

signer.provide_mempool(consensus.mempool());
signer.provide_new_block_notify(consensus.new_block_notifier());

let (new_block_tx, new_block_rx) = mpsc::channel(10);

signer.provide_new_block_notify(new_block_rx);
notifier.notify_on_new_block(new_block_tx);

let indexer =
Indexer::<TestBinding>::init(Default::default(), query_runner, &signer).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion core/broadcast/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async fn create_peer(
address: SocketAddr,
in_state: bool,
) -> Peer<TestBinding> {
let (_, query_runner) = (app.transaction_executor(), app.sync_query());
let query_runner = app.sync_query();
let signer = Signer::<TestBinding>::init(signer_config, query_runner.clone()).unwrap();
let notifier = Notifier::<TestBinding>::init(app);
let topology = Topology::<TestBinding>::init(
Expand Down
30 changes: 14 additions & 16 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use lightning_interfaces::types::{Epoch, EpochInfo, UpdateMethod};
use lightning_interfaces::{
ApplicationInterface,
BroadcastInterface,
Emitter,
IndexSocket,
NotifierInterface,
PubSub,
SyncQueryRunnerInterface,
};
Expand Down Expand Up @@ -44,25 +46,24 @@ use crate::narwhal::{NarwhalArgs, NarwhalService};

pub struct Consensus<C: Collection> {
/// Inner state of the consensus
/// todo(dalton): We can probably get a little more effecient then a mutex here
/// todo(dalton): We can probably get a little more efficient then a mutex here
/// maybe a once box
#[allow(clippy::type_complexity)]
epoch_state: Mutex<
Option<
EpochState<
c![C::ApplicationInterface::SyncExecutor],
c![C::BroadcastInterface::PubSub<PubSubMsg>],
c![C::NotifierInterface::Emitter],
>,
>,
>,
/// This socket recieves signed transactions and forwards them to an active committee member to
/// This socket receives signed transactions and forwards them to an active committee member to
/// be ordered
mempool_socket: MempoolSocket,
/// Timestamp of the narwhal certificate that caused an epoch change
/// is sent through this channel to notify that epoch chould change.
reconfigure_notify: Arc<Notify>,
/// A notifier that is notified everytime a new block is proccessed
new_block_notify: Arc<Notify>,
/// Called from the shutdown function to notify the start event loop to
/// exit.
shutdown_notify: Arc<Notify>,
Expand All @@ -71,7 +72,7 @@ pub struct Consensus<C: Collection> {
}

/// This struct contains mutable state only for the current epoch.
struct EpochState<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static> {
struct EpochState<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter> {
/// The node public key of the node.
node_public_key: NodePublicKey,
/// The consensus public key of the node.
Expand All @@ -85,29 +86,31 @@ struct EpochState<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static> {
/// Path to the database used by the narwhal implementation
pub store_path: ResolvedPathBuf,
/// Narwhal execution state.
execution_state: Arc<Execution<Q>>,
execution_state: Arc<Execution<Q, NE>>,
/// Used to send transactions to consensus
/// We still use this socket on consensus struct because a node is not always on the committee,
/// so its not always sending a transaction to its own mempool. The signer interface
/// also takes care of nonce bookkeeping and retry logic
txn_socket: SubmitTxSocket,
/// Interface for sending messages through the gossip layer
pub_sub: P,
/// Narhwal sends payloads ready for broadcast to this reciever
/// Narhwal sends payloads ready for broadcast to this receiver
rx_narwhal_batches: Option<mpsc::Receiver<(AuthenticStampedParcel, bool)>>,
/// To notify when consensus is shutting down
shutdown_notify: Arc<Notify>,
}

#[allow(clippy::too_many_arguments)]
impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static> EpochState<Q, P> {
impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter>
EpochState<Q, P, NE>
{
fn new(
node_public_key: NodePublicKey,
consensus_public_key: ConsensusPublicKey,
query_runner: Q,
narwhal_args: NarwhalArgs,
store_path: ResolvedPathBuf,
execution_state: Arc<Execution<Q>>,
execution_state: Arc<Execution<Q, NE>>,
txn_socket: SubmitTxSocket,
pub_sub: P,
rx_narwhal_batches: mpsc::Receiver<(AuthenticStampedParcel, bool)>,
Expand Down Expand Up @@ -384,6 +387,7 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
query_runner: c!(C::ApplicationInterface::SyncExecutor),
pubsub: c!(C::BroadcastInterface::PubSub<Self::Certificate>),
indexer_socket: Option<IndexSocket>,
notifier: &c!(C::NotifierInterface),
) -> anyhow::Result<Self> {
// Spawn the registry for narwhal
let registry = Registry::new();
Expand All @@ -392,7 +396,6 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {

let (consensus_sk, primary_sk) = signer.get_sk();
let reconfigure_notify = Arc::new(Notify::new());
let new_block_notify = Arc::new(Notify::new());
let networking_keypair = NetworkKeyPair::from(primary_sk);
let primary_keypair = KeyPair::from(consensus_sk);
let forwarder = Forwarder::new(query_runner.clone(), primary_keypair.public().clone());
Expand All @@ -409,10 +412,10 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
let execution_state = Arc::new(Execution::new(
executor,
reconfigure_notify.clone(),
new_block_notify.clone(),
tx_narwhal_batches,
query_runner.clone(),
indexer_socket,
notifier.get_emitter(),
));

let shutdown_notify = Arc::new(Notify::new());
Expand All @@ -434,7 +437,6 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
epoch_state: Mutex::new(Some(epoch_state)),
mempool_socket: TokioSpawn::spawn_async(forwarder),
reconfigure_notify,
new_block_notify,
shutdown_notify,
is_running: AtomicBool::new(false),
})
Expand All @@ -446,10 +448,6 @@ impl<C: Collection> ConsensusInterface<C> for Consensus<C> {
fn mempool(&self) -> MempoolSocket {
self.mempool_socket.clone()
}

fn new_block_notifier(&self) -> Arc<Notify> {
self.new_block_notify.clone()
}
}

#[derive(Debug, Serialize, Deserialize, Clone, IsVariant, From, TryInto)]
Expand Down
16 changes: 11 additions & 5 deletions core/consensus/src/edge_node/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ use std::time::Duration;

use fleek_crypto::NodePublicKey;
use lightning_interfaces::types::Epoch;
use lightning_interfaces::{BroadcastEventInterface, PubSub, SyncQueryRunnerInterface, ToDigest};
use lightning_interfaces::{
BroadcastEventInterface,
Emitter,
PubSub,
SyncQueryRunnerInterface,
ToDigest,
};
use lightning_utils::application::QueryRunnerExt;
use quick_cache::unsync::Cache;
use tokio::pin;
Expand All @@ -24,9 +30,9 @@ pub struct EdgeConsensus {
}

impl EdgeConsensus {
pub fn spawn<P: PubSub<PubSubMsg> + 'static, Q: SyncQueryRunnerInterface>(
pub fn spawn<P: PubSub<PubSubMsg> + 'static, Q: SyncQueryRunnerInterface, NE: Emitter>(
pub_sub: P,
execution: Arc<Execution<Q>>,
execution: Arc<Execution<Q, NE>>,
query_runner: Q,
node_public_key: NodePublicKey,
rx_narwhal_batches: mpsc::Receiver<(AuthenticStampedParcel, bool)>,
Expand Down Expand Up @@ -62,10 +68,10 @@ impl EdgeConsensus {

/// Creates and event loop which consumes messages from pubsub and sends them to the
/// right destination.
async fn message_receiver_worker<P: PubSub<PubSubMsg>, Q: SyncQueryRunnerInterface>(
async fn message_receiver_worker<P: PubSub<PubSubMsg>, Q: SyncQueryRunnerInterface, NE: Emitter>(
mut pub_sub: P,
shutdown_notify: Arc<Notify>,
execution: Arc<Execution<Q>>,
execution: Arc<Execution<Q, NE>>,
query_runner: Q,
node_public_key: NodePublicKey,
mut rx_narwhal_batch: mpsc::Receiver<(AuthenticStampedParcel, bool)>,
Expand Down
14 changes: 7 additions & 7 deletions core/consensus/src/edge_node/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use lightning_interfaces::types::{Digest as BroadcastDigest, NodeIndex};
use lightning_interfaces::SyncQueryRunnerInterface;
use lightning_interfaces::{Emitter, SyncQueryRunnerInterface};
use lightning_utils::application::QueryRunnerExt;

use super::ring_buffer::RingBuffer;
Expand Down Expand Up @@ -107,12 +107,12 @@ impl TransactionStore {

// Threshold should be 2f + 1 of the committee
// Returns true if the epoch has changed
pub async fn try_execute<Q: SyncQueryRunnerInterface>(
pub async fn try_execute<Q: SyncQueryRunnerInterface, NE: Emitter>(
&mut self,
digest: Digest,
threshold: usize,
query_runner: &Q,
execution: &Arc<Execution<Q>>,
execution: &Arc<Execution<Q, NE>>,
) -> Result<bool, NotExecuted> {
// get the current chain head
let head = query_runner.get_last_block();
Expand Down Expand Up @@ -141,11 +141,11 @@ impl TransactionStore {
Ok(epoch_changed)
}

async fn try_execute_internal<Q: SyncQueryRunnerInterface>(
async fn try_execute_internal<Q: SyncQueryRunnerInterface, NE: Emitter>(
&mut self,
digest: Digest,
threshold: usize,
execution: &Arc<Execution<Q>>,
execution: &Arc<Execution<Q, NE>>,
head: Digest,
) -> Result<bool, NotExecuted> {
if self.executed.contains(&digest) {
Expand All @@ -162,10 +162,10 @@ impl TransactionStore {
Err(NotExecuted::MissingAttestations(digest))
}

async fn try_execute_chain<Q: SyncQueryRunnerInterface>(
async fn try_execute_chain<Q: SyncQueryRunnerInterface, NE: Emitter>(
&mut self,
digest: Digest,
execution: &Arc<Execution<Q>>,
execution: &Arc<Execution<Q, NE>>,
head: Digest,
) -> Result<bool, NotExecuted> {
let mut txn_chain = VecDeque::new();
Expand Down
Loading