Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/debounce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};

pub type OperationsBuffer =
Arc<Mutex<HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>>>;
Expand Down Expand Up @@ -97,6 +97,11 @@ impl Debounce {
}
}

pub fn set_on_going_write_duration(&mut self, duration: Duration) {
self.timer.set_on_going_write_duration(duration);
}


fn check_partial_rename(&mut self, path: PathBuf, op: op::Op, cookie: Option<u32>) {
if let Ok(mut op_buf) = self.operations_buffer.lock() {
// the previous event was a rename event, but this one isn't; something went wrong
Expand Down Expand Up @@ -250,6 +255,7 @@ impl Debounce {
// it already was a write event
Some(op::Op::WRITE) => {
restart_timer(timer_id, path.clone(), &mut self.timer);
handle_on_going_write_event(&self.timer, path.clone(), &self.tx);
}

// upgrade to write event
Expand Down Expand Up @@ -507,3 +513,22 @@ fn restart_timer(timer_id: &mut Option<u64>, path: PathBuf, timer: &mut WatchTim
}
*timer_id = Some(timer.schedule(path));
}

fn handle_on_going_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sender<DebouncedEvent>) {
let mut on_going_write_event = timer.on_going_write_event.lock().unwrap();
let mut event_details = Option::None;
if let Some(ref i) = *on_going_write_event {
let now = Instant::now();
if i.0 <= now {
//fire event
let _ = tx.send(DebouncedEvent::OnGoingWrite((i.1).clone()));
}
} else {
//schedule event
if let Some(d) = timer.on_going_write_duration {
let fire_at = Instant::now() + d;
event_details = Some((fire_at, path));
}
}
*on_going_write_event = event_details;
}
19 changes: 18 additions & 1 deletion src/debounce/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct ScheduleWorker {
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
stopped: Arc<AtomicBool>,
worker_on_going_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
}

impl ScheduleWorker {
Expand Down Expand Up @@ -56,7 +57,12 @@ impl ScheduleWorker {
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::WRITE) => {
//disable on_going_write
let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap();
*on_going_write_event = None;
Some(DebouncedEvent::Write(path))
},
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
Expand Down Expand Up @@ -116,6 +122,8 @@ pub struct WatchTimer {
delay: Duration,
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
stopped: Arc<AtomicBool>,
pub on_going_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
pub on_going_write_duration: Option<Duration>,
}

impl WatchTimer {
Expand All @@ -133,6 +141,8 @@ impl WatchTimer {
let worker_stop_trigger = stop_trigger.clone();
let worker_events = events.clone();
let worker_stopped = stopped.clone();
let on_going_write_event = Arc::new(Mutex::new(None));
let worker_on_going_write_event = on_going_write_event.clone();
thread::spawn(move || {
ScheduleWorker {
new_event_trigger: worker_new_event_trigger,
Expand All @@ -141,6 +151,7 @@ impl WatchTimer {
tx,
operations_buffer,
stopped: worker_stopped,
worker_on_going_write_event: worker_on_going_write_event,
}
.run();
});
Expand All @@ -152,9 +163,15 @@ impl WatchTimer {
delay,
events,
stopped,
on_going_write_event,
on_going_write_duration: None,
}
}

pub fn set_on_going_write_duration(&mut self, duration: Duration) {
self.on_going_write_duration = Some(duration);
}

pub fn schedule(&mut self, path: PathBuf) -> u64 {
self.counter = self.counter.wrapping_add(1);

Expand Down
7 changes: 7 additions & 0 deletions src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ impl Watcher for FsEventWatcher {
let _ = self.run();
result
}

fn set_on_going_write_duration(&self, duration: Duration) {
let mut debounced_event = self.event_tx.lock().unwrap();
if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event {
debounce.set_on_going_write_duration(duration);
}
}
}

impl Drop for FsEventWatcher {
Expand Down
11 changes: 11 additions & 0 deletions src/inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum EventLoopMsg {
RemoveWatch(PathBuf, Sender<Result<()>>),
Shutdown,
RenameTimeout(u32),
OnGoingWriteDelay(Duration),
}

#[inline]
Expand Down Expand Up @@ -201,6 +202,11 @@ impl EventLoop {
send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
}
}
EventLoopMsg::OnGoingWriteDelay(duration) => {
if let EventTx::Debounced {ref tx,ref mut debounce} = self.event_tx {
debounce.set_on_going_write_duration(duration);
}
}
}
}
}
Expand Down Expand Up @@ -486,6 +492,11 @@ impl Watcher for INotifyWatcher {
self.0.lock().unwrap().send(msg).unwrap();
rx.recv().unwrap()
}

fn set_on_going_write_duration(&self, duration: Duration) {
let msg = EventLoopMsg::OnGoingWriteDelay(duration);
self.0.lock().unwrap().send(msg).unwrap();
}
}

impl Drop for INotifyWatcher {
Expand Down
26 changes: 19 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,19 +310,20 @@ pub mod op {
/// Multiple actions may be delivered in a single event.
pub struct Op: u32 {
/// Attributes changed
const CHMOD = 0b0000001;
const CHMOD = 0b00000001;
/// Created
const CREATE = 0b0000010;
const CREATE = 0b00000010;
/// Removed
const REMOVE = 0b0000100;
const REMOVE = 0b00000100;
/// Renamed
const RENAME = 0b0001000;
const RENAME = 0b00001000;
/// Written
const WRITE = 0b0010000;
const WRITE = 0b00010000;
/// File opened for writing was closed
const CLOSE_WRITE = 0b0100000;
const CLOSE_WRITE = 0b00100000;
/// Directories need to be rescanned
const RESCAN = 0b1000000;
const RESCAN = 0b01000000;
const ON_GOING_WRITE = 0b10000000;
}
}

Expand All @@ -333,6 +334,7 @@ pub mod op {
pub const WRITE: Op = Op::WRITE;
pub const CLOSE_WRITE: Op = Op::CLOSE_WRITE;
pub const RESCAN: Op = Op::RESCAN;
pub const ON_GOING_WRITE: Op = Op::ON_GOING_WRITE;
}

#[cfg(test)]
Expand All @@ -348,6 +350,7 @@ mod op_test {
fn new_bitflags_form() {
let op = super::op::Op::CHMOD | super::op::Op::WRITE;
assert!(op.contains(super::op::Op::WRITE));
assert!(op.contains(super::op::Op::CHMOD));
}

#[test]
Expand Down Expand Up @@ -446,6 +449,9 @@ pub enum DebouncedEvent {
///
/// This event may contain a path for which the error was detected.
Error(Error, Option<PathBuf>),

/// Event emitted when a file being watched is to be tailed.
OnGoingWrite(PathBuf),
}

impl PartialEq for DebouncedEvent {
Expand Down Expand Up @@ -611,6 +617,12 @@ pub trait Watcher: Sized {
/// Returns an error in the case that `path` has not been watched or if removing the watch
/// fails.
fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()>;

/// Sets the duration for DebouncedEvent::OnGoingWrite. When set, OnGoingWrite event will be
/// fired every "duration" units.
fn set_on_going_write_duration(&self, duration: Duration) {
// null and poll watchers are not required to implement this.
}
}

/// The recommended `Watcher` implementation for the current platform
Expand Down
11 changes: 11 additions & 0 deletions src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum Action {
Watch(PathBuf, RecursiveMode),
Unwatch(PathBuf),
Stop,
SetOnGoingWriteEventDuration(Duration),
}

pub enum MetaEvent {
Expand Down Expand Up @@ -118,6 +119,12 @@ impl ReadDirectoryChangesServer {
stop_watch(ws, &self.meta_tx);
}
break;
},
Action::SetOnGoingWriteEventDuration(duration) => {
let mut debounced_event = self.event_tx.lock().unwrap();
if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event {
debounce.set_on_going_write_duration(duration);
}
}
}
}
Expand Down Expand Up @@ -562,6 +569,10 @@ impl Watcher for ReadDirectoryChangesWatcher {
self.wakeup_server();
res
}

fn set_on_going_write_duration(&self, duration: Duration) {
self.tx.send(Action::SetOnGoingWriteEventDuration(duration));
}
}

impl Drop for ReadDirectoryChangesWatcher {
Expand Down