-
Notifications
You must be signed in to change notification settings - Fork 46
Take ExecutionEngineSocket from executor instead of cloning it + Notifier refactor #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
daltoncoder
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good start there is a couple changes i listed and we should sync on moving away from injection into the socket like i described below
core/application/src/app.rs
Outdated
| use crate::query_runner::QueryRunner; | ||
| pub struct Application<C: Collection> { | ||
| update_socket: ExecutionEngineSocket, | ||
| update_socket: Arc<Mutex<Option<ExecutionEngineSocket>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| update_socket: Arc<Mutex<Option<ExecutionEngineSocket>>>, | |
| update_socket: Mutex<Option<ExecutionEngineSocket>>, |
No need for the Arc here
core/application/src/app.rs
Outdated
| Ok(Self { | ||
| query_runner: env.query_runner(), | ||
| update_socket: TokioSpawn::spawn_async(UpdateWorker::<C>::new(env, blockstore)), | ||
| update_socket: Arc::new(Mutex::new(Some(TokioSpawn::spawn_async( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| update_socket: Arc::new(Mutex::new(Some(TokioSpawn::spawn_async( | |
| update_socket: Mutex::new(Some(TokioSpawn::spawn_async( |
core/application/src/app.rs
Outdated
| /// See the safety document for the [`ExecutionEngineSocket`]. | ||
| fn transaction_executor(&self) -> ExecutionEngineSocket { | ||
| self.update_socket.clone() | ||
| fn transaction_executor(&self) -> Option<ExecutionEngineSocket> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| fn transaction_executor(&self) -> Option<ExecutionEngineSocket> { | |
| fn transaction_executor(&self) -> ExecutionEngineSocket> { |
Lets keep the return on the interface the same as it was. We should unwrap here in this function before returning. The safety comments on this function in the interface should reflect that this is going to panic if a process besides consensus calls this
core/interfaces/src/consensus.rs
Outdated
| executor.inject(move |res| { | ||
| if res.change_epoch { | ||
| epoch_change_notifier.epoch_changed(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of the reasoning we want to move to this new notifier flow is because this injection we do into the Affair socket is overhead that we really dont want. So we want to pass this notifier to consensus and be notifying in submit_batch when we know the epoch has changed. Ill sync with you and give you some more color on what im describing off of github.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, let's get rid of this flow here and instead trigger the event from the consensus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I see your point!
qti3e
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good overall, but let's not return an Option as dalton already pointed out and this way any changes to the test files should be not needed anymore.
core/interfaces/src/consensus.rs
Outdated
| executor.inject(move |res| { | ||
| if res.change_epoch { | ||
| epoch_change_notifier.epoch_changed(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, let's get rid of this flow here and instead trigger the event from the consensus.
| #[derive(Default, Clone)] | ||
| pub struct EpochChangeNotificationsEmitter { | ||
| emitter: Arc<Notify>, | ||
| } | ||
|
|
||
| impl EpochNotifierEmitter for EpochChangeNotificationsEmitter { | ||
| fn epoch_changed(&self) { | ||
| self.emitter.notify_waiters() | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following the comment above about getting rid of the injection flow with affair, we should use this event emitter instead to perform the notification.
|
Thanks @daltoncoder & @qti3e for pointing out fields for improvement! |
| #[infusion::service] | ||
| pub trait NotifierInterface<C: Collection>: Sync + Send + Clone { | ||
| type EpochEmitter: EpochNotifierEmitter; | ||
| type BlockEmitter: BlockNotifierEmitter; | ||
|
|
||
| fn _init(app: ::ApplicationInterface) { | ||
| ok!(Self::init(app)) | ||
| } | ||
|
|
||
| fn init(app: &c!(C::ApplicationInterface)) -> Self; | ||
|
|
||
| /// Returns a reference to the emitter end of this notifier. Should only be used if we are | ||
| /// interested (and responsible) for triggering a notification around new block. | ||
| fn new_block_emitter(&self) -> Self::BlockEmitter; | ||
|
|
||
| /// Returns a reference to the emitter end of this notifier. Should only be used if we are | ||
| /// interested (and responsible) for triggering a notification around new epoch. | ||
| fn new_epoch_emitter(&self) -> Self::EpochEmitter; | ||
|
|
||
| fn notify_on_new_block(&self, tx: mpsc::Sender<Notification>); | ||
|
|
||
| fn notify_on_new_epoch(&self, tx: mpsc::Sender<Notification>); | ||
|
|
||
| fn notify_before_epoch_change(&self, duration: Duration, tx: mpsc::Sender<Notification>); | ||
| } | ||
|
|
||
| #[infusion::blank] | ||
| pub trait EpochNotifierEmitter: Clone + Send + Sync + 'static { | ||
| /// Notify the waiters about epoch change. | ||
| fn epoch_changed(&self); | ||
| } | ||
|
|
||
| #[infusion::blank] | ||
| pub trait BlockNotifierEmitter: Clone + Send + Sync + 'static { | ||
| /// Notify the waiters about new block. | ||
| fn new_block(&self); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm we should keep the emitters in the same trait and group them together:
| #[infusion::service] | |
| pub trait NotifierInterface<C: Collection>: Sync + Send + Clone { | |
| type EpochEmitter: EpochNotifierEmitter; | |
| type BlockEmitter: BlockNotifierEmitter; | |
| fn _init(app: ::ApplicationInterface) { | |
| ok!(Self::init(app)) | |
| } | |
| fn init(app: &c!(C::ApplicationInterface)) -> Self; | |
| /// Returns a reference to the emitter end of this notifier. Should only be used if we are | |
| /// interested (and responsible) for triggering a notification around new block. | |
| fn new_block_emitter(&self) -> Self::BlockEmitter; | |
| /// Returns a reference to the emitter end of this notifier. Should only be used if we are | |
| /// interested (and responsible) for triggering a notification around new epoch. | |
| fn new_epoch_emitter(&self) -> Self::EpochEmitter; | |
| fn notify_on_new_block(&self, tx: mpsc::Sender<Notification>); | |
| fn notify_on_new_epoch(&self, tx: mpsc::Sender<Notification>); | |
| fn notify_before_epoch_change(&self, duration: Duration, tx: mpsc::Sender<Notification>); | |
| } | |
| #[infusion::blank] | |
| pub trait EpochNotifierEmitter: Clone + Send + Sync + 'static { | |
| /// Notify the waiters about epoch change. | |
| fn epoch_changed(&self); | |
| } | |
| #[infusion::blank] | |
| pub trait BlockNotifierEmitter: Clone + Send + Sync + 'static { | |
| /// Notify the waiters about new block. | |
| fn new_block(&self); | |
| } | |
| #[infusion::service] | |
| pub trait NotifierInterface<C: Collection>: Sync + Send + Clone { | |
| type Emitter: Emitter; | |
| fn _init(app: ::ApplicationInterface) { | |
| ok!(Self::init(app)) | |
| } | |
| fn init(app: &c!(C::ApplicationInterface)) -> Self; | |
| /// Returns a reference to the emitter end of this notifier. Should only be used if we are | |
| /// interested (and responsible) for triggering a notification around new epoch. | |
| fn get_emitter(&self) -> Self::Emitter; | |
| fn notify_on_new_block(&self, tx: mpsc::Sender<Notification>); | |
| fn notify_on_new_epoch(&self, tx: mpsc::Sender<Notification>); | |
| fn notify_before_epoch_change(&self, duration: Duration, tx: mpsc::Sender<Notification>); | |
| } | |
| #[infusion::blank] | |
| pub trait Emitter: Clone + Send + Sync + 'static { | |
| /// Notify the waiters about epoch change. | |
| fn epoch_changed(&self); | |
| /// Notify the waiters about new block. | |
| fn new_block(&self); | |
| } |
daltoncoder
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Closing
Application: Take ExecutionEngineSocket from executor instead of cloning it