Skip to content

Conversation

@RustyCoderX
Copy link

constantly checking for new task now code waits for events by blocking and async receives this reduces CPU usage while keeping everything working the same

// In the future, we hope to fix this issue so that
// the event stream can be properly waited for every time.
match self.handle.recv_timeout(Duration::from_secs(1)) {
match self.handle.recv() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this timeout? We don't want to block indefinetly in the Drop handler when there is something wrong with the cleanup.

Comment on lines 686 to 695
loop {
match self.drop_stream.try_recv() {
match self.drop_stream.recv() {
Ok(token) => match self.sent_out_shared_memory.remove(&token) {
Some(region) => self.add_to_cache(region),
None => tracing::warn!("received unknown finished drop token `{token:?}`"),
},
Err(flume::TryRecvError::Empty) => break,
Err(flume::TryRecvError::Disconnected) => {
Err(flume::RecvError::Disconnected) => {
bail!("event stream was closed before sending all expected drop tokens")
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to wait here, we just want to handle all buffered events the in the stream. This was not a busy wait loop since we used to break on the first Empty error. So I don't see how this change helps.

}

match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
match self.drop_stream.recv() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, why are we removing the timeout here? We have that as a failsafe in case that something behaves unusual. In general, we don't want to block indefinetly in Drop implementations since the program might not able to exit anymore then.

async fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
while let Ok(event) = events.recv().await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to wait for incoming events here. The purpose of this function is to move all events that are buffered in the stream into self.queue. This was not a busy loop since try_recv returns an Err if no events are ready.


[workspace.package]
edition = "2024"
edition = "2021"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for going back to the old 2021 edition?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants