From c08a598d3fc9e45de1f3020f6c50e153ef0073e3 Mon Sep 17 00:00:00 2001 From: Lucien Greathouse Date: Mon, 8 Jan 2018 12:33:36 -0800 Subject: [PATCH] Fix broken file watcher implementation This one took a little bit of tracking down; the VfsWatcher used to spawn a new thread and then stall/park forever. With one of the recent changes to get rid of the extra thread, VfsWatcher started getting dropped, which in turn dropped the watchers created by the notify crate. Because the threads only tie back to the VfsWatcher was a cloned Arc>, everything was fine, except that their mpsc::Receiver objects were no longer receiving events. This manifested itself as the file watcher magically not watching any files. Oops. --- src/commands/serve.rs | 11 +++- src/vfs/vfs_watcher.rs | 132 +++++++++++++++++++++++------------------ 2 files changed, 84 insertions(+), 59 deletions(-) diff --git a/src/commands/serve.rs b/src/commands/serve.rs index e8cb2b82..998001ba 100644 --- a/src/commands/serve.rs +++ b/src/commands/serve.rs @@ -1,6 +1,7 @@ use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; use std::process; +use std::sync::{Arc, Mutex}; +use std::thread; use rand; @@ -87,6 +88,12 @@ pub fn serve(project_path: &PathBuf, verbose: bool, port: Option) { println!("Server listening on port {}", web_config.port); - VfsWatcher::new(vfs.clone()).start(); + { + let vfs = vfs.clone(); + thread::spawn(move || { + VfsWatcher::new(vfs).start(); + }); + } + web::start(web_config, project.clone(), &PLUGIN_CHAIN, vfs.clone()); } diff --git a/src/vfs/vfs_watcher.rs b/src/vfs/vfs_watcher.rs index b1db63b7..f5d50174 100644 --- a/src/vfs/vfs_watcher.rs +++ b/src/vfs/vfs_watcher.rs @@ -1,3 +1,4 @@ +use std::path::PathBuf; use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::Duration; @@ -11,77 +12,94 @@ use vfs::VfsSession; /// changes to the virtual filesystem layer. pub struct VfsWatcher { vfs: Arc>, - watchers: Vec, } impl VfsWatcher { pub fn new(vfs: Arc>) -> VfsWatcher { VfsWatcher { vfs, - watchers: Vec::new(), } } - pub fn start(mut self) { - let outer_vfs = self.vfs.lock().unwrap(); + fn start_watcher( + vfs: Arc>, + rx: mpsc::Receiver, + partition_name: String, + root_path: PathBuf, + ) { + loop { + let event = rx.recv().unwrap(); - for (partition_name, root_path) in outer_vfs.get_partitions() { - let (tx, rx) = mpsc::channel(); - let partition_name = partition_name.clone(); - let root_path = root_path.clone(); + let mut vfs = vfs.lock().unwrap(); + let current_time = vfs.current_time(); - let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(1)) - .expect("Unable to create watcher!"); + match event { + DebouncedEvent::Write(ref change_path) | + DebouncedEvent::Create(ref change_path) | + DebouncedEvent::Remove(ref change_path) => { + if let Some(mut route) = path_to_route(&root_path, change_path) { + route.insert(0, partition_name.clone()); - watcher - .watch(&root_path, RecursiveMode::Recursive) - .expect("Unable to watch path!"); - - self.watchers.push(watcher); - - { - let vfs = self.vfs.clone(); - - thread::spawn(move || { - loop { - let event = rx.recv().unwrap(); - let mut vfs = vfs.lock().unwrap(); - let current_time = vfs.current_time(); - - match event { - DebouncedEvent::Write(ref change_path) | - DebouncedEvent::Create(ref change_path) | - DebouncedEvent::Remove(ref change_path) => { - if let Some(mut route) = path_to_route(&root_path, change_path) { - route.insert(0, partition_name.clone()); - - vfs.add_change(current_time, route); - } else { - println!("Failed to get route from {}", change_path.display()); - } - }, - DebouncedEvent::Rename(ref from_change, ref to_change) => { - if let Some(mut route) = path_to_route(&root_path, from_change) { - route.insert(0, partition_name.clone()); - - vfs.add_change(current_time, route); - } else { - println!("Failed to get route from {}", from_change.display()); - } - - if let Some(mut route) = path_to_route(&root_path, to_change) { - route.insert(0, partition_name.clone()); - - vfs.add_change(current_time, route); - } else { - println!("Failed to get route from {}", to_change.display()); - } - }, - _ => {}, - } + vfs.add_change(current_time, route); + } else { + eprintln!("Failed to get route from {}", change_path.display()); } - }); + }, + DebouncedEvent::Rename(ref from_change, ref to_change) => { + if let Some(mut route) = path_to_route(&root_path, from_change) { + route.insert(0, partition_name.clone()); + + vfs.add_change(current_time, route); + } else { + eprintln!("Failed to get route from {}", from_change.display()); + } + + if let Some(mut route) = path_to_route(&root_path, to_change) { + route.insert(0, partition_name.clone()); + + vfs.add_change(current_time, route); + } else { + eprintln!("Failed to get route from {}", to_change.display()); + } + }, + _ => {}, } } } + + pub fn start(self) { + let mut watchers = Vec::new(); + + // Create an extra scope so that `vfs` gets dropped and unlocked + { + let vfs = self.vfs.lock().unwrap(); + + for (ref partition_name, ref root_path) in vfs.get_partitions() { + let (tx, rx) = mpsc::channel(); + + let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(1)) + .expect("Unable to create watcher!"); + + watcher + .watch(&root_path, RecursiveMode::Recursive) + .expect("Unable to watch path!"); + + watchers.push(watcher); + + { + let partition_name = partition_name.to_string(); + let root_path = root_path.to_path_buf(); + let vfs = self.vfs.clone(); + + thread::spawn(move || { + Self::start_watcher(vfs, rx, partition_name, root_path); + }); + } + } + } + + loop { + thread::park(); + } + } }