Skip to content

Commit 45a2b31

Browse files
committed
[bifrost] Get a CommitToken back from notify_committed()
1 parent 98d2099 commit 45a2b31

File tree

1 file changed

+8
-11
lines changed

1 file changed

+8
-11
lines changed

crates/bifrost/src/background_appender.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,10 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use std::sync::Arc;
12-
1311
use futures::FutureExt;
1412
use pin_project::pin_project;
1513
use restate_types::logs::Record;
16-
use tokio::sync::{Notify, mpsc, oneshot};
14+
use tokio::sync::{mpsc, oneshot};
1715
use tracing::{trace, warn};
1816

1917
use restate_core::{ShutdownError, TaskCenter, TaskHandle, cancellation_watcher};
@@ -130,8 +128,8 @@ where
130128
batch.push(record);
131129
notif_buffer.push(tx);
132130
}
133-
AppendOperation::Canary(notify) => {
134-
notify.notify_one();
131+
AppendOperation::Canary(tx) => {
132+
notif_buffer.push(tx);
135133
}
136134
AppendOperation::MarkAsPreferred => {
137135
appender.mark_as_preferred();
@@ -357,19 +355,18 @@ impl<T: StorageEncode> LogSender<T> {
357355
///
358356
/// Not cancellation safe. Every call will attempt to acquire capacity on the channel and send
359357
/// a new message to the appender.
360-
pub async fn notify_committed(&self) -> Result<(), EnqueueError<()>> {
358+
pub async fn notify_committed(&self) -> Result<CommitToken, EnqueueError<()>> {
361359
let Ok(permit) = self.tx.reserve().await else {
362360
// channel is closed, this should happen the appender is draining or has been darained
363361
// already
364362
return Err(EnqueueError::Closed(()));
365363
};
366364

367-
let notify = Arc::new(Notify::new());
368-
let canary = AppendOperation::Canary(notify.clone());
365+
let (tx, rx) = oneshot::channel();
366+
let canary = AppendOperation::Canary(tx);
369367
permit.send(canary);
370368

371-
notify.notified().await;
372-
Ok(())
369+
Ok(CommitToken { rx })
373370
}
374371

375372
/// Marks this node as a preferred writer for the underlying log
@@ -422,7 +419,7 @@ enum AppendOperation {
422419
EnqueueWithNotification(Record, oneshot::Sender<()>),
423420
// A message denoting a request to be notified when it's processed by the appender.
424421
// It's used to check if previously enqueued appends have been committed or not
425-
Canary(Arc<Notify>),
422+
Canary(oneshot::Sender<()>),
426423
/// Let's bifrost know that this node is the preferred writer of this log
427424
MarkAsPreferred,
428425
/// Let's bifrost know that this node might not be the preferred writer of this log

0 commit comments

Comments
 (0)