Skip to content

Commit 0a740eb

Browse files
authored
[debounce] Introduce OngoingWrite event (#183)
2 parents 59d8666 + 8c482a0 commit 0a740eb

6 files changed

Lines changed: 109 additions & 6 deletions

File tree

src/debounce/mod.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22

33
mod timer;
44

5-
use super::{op, DebouncedEvent, RawEvent};
5+
use super::{op, DebouncedEvent, RawEvent, Config};
66

77
use self::timer::WatchTimer;
88

99
use std::collections::HashMap;
1010
use std::path::PathBuf;
1111
use std::sync::mpsc;
1212
use std::sync::{Arc, Mutex};
13-
use std::time::Duration;
13+
use std::time::{Duration, Instant};
1414

1515
pub type OperationsBuffer =
1616
Arc<Mutex<HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>>>;
@@ -97,6 +97,15 @@ impl Debounce {
9797
}
9898
}
9999

100+
pub fn configure_debounced_mode(&mut self, config: Config) {
101+
match config {
102+
Config::OngoingWrites(c) => {
103+
self.timer.set_ongoing_write_duration(c);
104+
}
105+
}
106+
}
107+
108+
100109
fn check_partial_rename(&mut self, path: PathBuf, op: op::Op, cookie: Option<u32>) {
101110
if let Ok(mut op_buf) = self.operations_buffer.lock() {
102111
// the previous event was a rename event, but this one isn't; something went wrong
@@ -250,6 +259,7 @@ impl Debounce {
250259
// it already was a write event
251260
Some(op::Op::WRITE) => {
252261
restart_timer(timer_id, path.clone(), &mut self.timer);
262+
handle_ongoing_write_event(&self.timer, path.clone(), &self.tx);
253263
}
254264

255265
// upgrade to write event
@@ -507,3 +517,24 @@ fn restart_timer(timer_id: &mut Option<u64>, path: PathBuf, timer: &mut WatchTim
507517
}
508518
*timer_id = Some(timer.schedule(path));
509519
}
520+
521+
fn handle_ongoing_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sender<DebouncedEvent>) {
522+
let mut ongoing_write_event = timer.ongoing_write_event.lock().unwrap();
523+
let mut event_details = Option::None;
524+
if let Some(ref i) = *ongoing_write_event {
525+
let now = Instant::now();
526+
if i.0 <= now {
527+
//fire event
528+
let _ = tx.send(DebouncedEvent::OnGoingWrite((i.1).clone()));
529+
} else {
530+
event_details = Some((i.0, i.1.clone()));
531+
}
532+
} else {
533+
//schedule event
534+
if let Some(d) = timer.ongoing_write_duration {
535+
let fire_at = Instant::now() + d;
536+
event_details = Some((fire_at, path));
537+
}
538+
}
539+
*ongoing_write_event = event_details;
540+
}

src/debounce/timer.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ struct ScheduleWorker {
2626
tx: mpsc::Sender<DebouncedEvent>,
2727
operations_buffer: OperationsBuffer,
2828
stopped: Arc<AtomicBool>,
29+
worker_ongoing_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
2930
}
3031

3132
impl ScheduleWorker {
@@ -56,7 +57,12 @@ impl ScheduleWorker {
5657
}
5758
let message = match op {
5859
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
59-
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
60+
Some(op::Op::WRITE) => {
61+
//disable ongoing_write
62+
let mut ongoing_write_event = self.worker_ongoing_write_event.lock().unwrap();
63+
*ongoing_write_event = None;
64+
Some(DebouncedEvent::Write(path))
65+
},
6066
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
6167
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
6268
Some(op::Op::RENAME) if is_partial_rename => {
@@ -116,6 +122,8 @@ pub struct WatchTimer {
116122
delay: Duration,
117123
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
118124
stopped: Arc<AtomicBool>,
125+
pub ongoing_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
126+
pub ongoing_write_duration: Option<Duration>,
119127
}
120128

121129
impl WatchTimer {
@@ -133,6 +141,8 @@ impl WatchTimer {
133141
let worker_stop_trigger = stop_trigger.clone();
134142
let worker_events = events.clone();
135143
let worker_stopped = stopped.clone();
144+
let ongoing_write_event = Arc::new(Mutex::new(None));
145+
let worker_ongoing_write_event = ongoing_write_event.clone();
136146
thread::spawn(move || {
137147
ScheduleWorker {
138148
new_event_trigger: worker_new_event_trigger,
@@ -141,6 +151,7 @@ impl WatchTimer {
141151
tx,
142152
operations_buffer,
143153
stopped: worker_stopped,
154+
worker_ongoing_write_event,
144155
}
145156
.run();
146157
});
@@ -152,9 +163,15 @@ impl WatchTimer {
152163
delay,
153164
events,
154165
stopped,
166+
ongoing_write_event,
167+
ongoing_write_duration: None,
155168
}
156169
}
157170

171+
pub fn set_ongoing_write_duration(&mut self, duration: Option<Duration>) {
172+
self.ongoing_write_duration = duration;
173+
}
174+
158175
pub fn schedule(&mut self, path: PathBuf) -> u64 {
159176
self.counter = self.counter.wrapping_add(1);
160177

src/fsevent.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
extern crate fsevent as fse;
1515

1616
use super::debounce::{Debounce, EventTx};
17-
use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher};
17+
use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher, Config};
1818
use fsevent_sys::core_foundation as cf;
1919
use fsevent_sys::fsevent as fs;
2020
use libc;
@@ -394,6 +394,14 @@ impl Watcher for FsEventWatcher {
394394
let _ = self.run();
395395
result
396396
}
397+
398+
fn configure(&self, config: Config) -> Result<()> {
399+
let mut debounced_event = self.event_tx.lock().unwrap();
400+
if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event {
401+
debounce.configure_debounced_mode(config);
402+
}
403+
Ok(())
404+
}
397405
}
398406

399407
impl Drop for FsEventWatcher {

src/inotify.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ extern crate walkdir;
1111
use self::inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
1212
use self::walkdir::WalkDir;
1313
use super::debounce::{Debounce, EventTx};
14-
use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher};
14+
use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher, Config};
1515
use mio;
1616
use mio_extras;
1717
use std::collections::HashMap;
@@ -54,6 +54,7 @@ enum EventLoopMsg {
5454
RemoveWatch(PathBuf, Sender<Result<()>>),
5555
Shutdown,
5656
RenameTimeout(u32),
57+
Configure(Config),
5758
}
5859

5960
#[inline]
@@ -201,6 +202,11 @@ impl EventLoop {
201202
send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
202203
}
203204
}
205+
EventLoopMsg::Configure(config) => {
206+
if let EventTx::Debounced {ref tx,ref mut debounce} = self.event_tx {
207+
debounce.configure_debounced_mode(config);
208+
}
209+
}
204210
}
205211
}
206212
}
@@ -486,6 +492,12 @@ impl Watcher for INotifyWatcher {
486492
self.0.lock().unwrap().send(msg).unwrap();
487493
rx.recv().unwrap()
488494
}
495+
496+
fn configure(&self, config: Config) -> Result<()> {
497+
let msg = EventLoopMsg::Configure(config);
498+
self.0.lock().unwrap().send(msg).unwrap();
499+
Ok(())
500+
}
489501
}
490502

491503
impl Drop for INotifyWatcher {

src/lib.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ mod op_test {
348348
fn new_bitflags_form() {
349349
let op = super::op::Op::CHMOD | super::op::Op::WRITE;
350350
assert!(op.contains(super::op::Op::WRITE));
351+
assert!(op.contains(super::op::Op::CHMOD));
351352
}
352353

353354
#[test]
@@ -446,6 +447,9 @@ pub enum DebouncedEvent {
446447
///
447448
/// This event may contain a path for which the error was detected.
448449
Error(Error, Option<PathBuf>),
450+
451+
/// Event emitted when a file being watched is to be tailed.
452+
OnGoingWrite(PathBuf),
449453
}
450454

451455
impl PartialEq for DebouncedEvent {
@@ -611,6 +615,25 @@ pub trait Watcher: Sized {
611615
/// Returns an error in the case that `path` has not been watched or if removing the watch
612616
/// fails.
613617
fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()>;
618+
619+
/// Configure notify with Configs.
620+
fn configure(&self, option: Config) -> Result<()> {
621+
// Default implementation because null and poll watcher are not configurable (but can be in future)
622+
Ok(())
623+
}
624+
}
625+
626+
/// Configurations that can be used when watching a file/directory.
627+
pub enum Config {
628+
/// In debounced mode a WRITE event is fired every X unit of time if no WRITE occurs before X.
629+
/// But in some scenarios (like when tailing a file) we would never receive the WRITE event
630+
/// because the watchee is being written to every Y unit of time where Y < X.
631+
/// Use this config to let notify emit DebouncedEvent::OnGoingWrite event before emitting a
632+
/// WRITE event. Once a WRITE event is emitted notify will cancel OnGoingWrite (but still emit
633+
/// OnGoingWrite in the future)
634+
/// Hence the Duration of this config should be less than watchers delay.
635+
/// To stop emitting OnGoingWrite, pass this config with None.
636+
OngoingWrites(Option<Duration>),
614637
}
615638

616639
/// The recommended `Watcher` implementation for the current platform

src/windows.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use winapi::um::winbase::{self, INFINITE, WAIT_OBJECT_0};
1616
use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE};
1717

1818
use super::debounce::{Debounce, EventTx};
19-
use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher};
19+
use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher, Config};
2020
use std::collections::HashMap;
2121
use std::env;
2222
use std::ffi::OsString;
@@ -54,6 +54,7 @@ enum Action {
5454
Watch(PathBuf, RecursiveMode),
5555
Unwatch(PathBuf),
5656
Stop,
57+
Configure(Config),
5758
}
5859

5960
pub enum MetaEvent {
@@ -118,6 +119,12 @@ impl ReadDirectoryChangesServer {
118119
stop_watch(ws, &self.meta_tx);
119120
}
120121
break;
122+
},
123+
Action::Configure(config) => {
124+
let mut debounced_event = self.event_tx.lock().unwrap();
125+
if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event {
126+
debounce.configure_debounced_mode(config);
127+
}
121128
}
122129
}
123130
}
@@ -562,6 +569,11 @@ impl Watcher for ReadDirectoryChangesWatcher {
562569
self.wakeup_server();
563570
res
564571
}
572+
573+
fn configure(&self, config: Config) -> Result<()> {
574+
self.tx.send(Action::Configure(config));
575+
Ok(())
576+
}
565577
}
566578

567579
impl Drop for ReadDirectoryChangesWatcher {

0 commit comments

Comments
 (0)