Fix broken file watcher implementation

This one took a little bit of tracking down; the VfsWatcher used to spawn a new
thread and then stall/park forever. With one of the recent changes to get rid of
the extra thread, VfsWatcher started getting dropped, which in turn dropped the
watchers created by the notify crate.

Because the threads only tie back to the VfsWatcher was a cloned
Arc<Mutex<VfsSession>>, everything was fine, except that their mpsc::Receiver
objects were no longer receiving events.

This manifested itself as the file watcher magically not watching any files.
Oops.
This commit is contained in:
Lucien Greathouse
2018-01-08 12:33:36 -08:00
parent 1318842c36
commit c08a598d3f
2 changed files with 84 additions and 59 deletions

View File

@@ -1,6 +1,7 @@
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::process;
use std::sync::{Arc, Mutex};
use std::thread;
use rand;
@@ -87,6 +88,12 @@ pub fn serve(project_path: &PathBuf, verbose: bool, port: Option<u64>) {
println!("Server listening on port {}", web_config.port);
VfsWatcher::new(vfs.clone()).start();
{
let vfs = vfs.clone();
thread::spawn(move || {
VfsWatcher::new(vfs).start();
});
}
web::start(web_config, project.clone(), &PLUGIN_CHAIN, vfs.clone());
}

View File

@@ -1,3 +1,4 @@
use std::path::PathBuf;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
@@ -11,77 +12,94 @@ use vfs::VfsSession;
/// changes to the virtual filesystem layer.
pub struct VfsWatcher {
vfs: Arc<Mutex<VfsSession>>,
watchers: Vec<RecommendedWatcher>,
}
impl VfsWatcher {
pub fn new(vfs: Arc<Mutex<VfsSession>>) -> VfsWatcher {
VfsWatcher {
vfs,
watchers: Vec::new(),
}
}
pub fn start(mut self) {
let outer_vfs = self.vfs.lock().unwrap();
fn start_watcher(
vfs: Arc<Mutex<VfsSession>>,
rx: mpsc::Receiver<DebouncedEvent>,
partition_name: String,
root_path: PathBuf,
) {
loop {
let event = rx.recv().unwrap();
for (partition_name, root_path) in outer_vfs.get_partitions() {
let (tx, rx) = mpsc::channel();
let partition_name = partition_name.clone();
let root_path = root_path.clone();
let mut vfs = vfs.lock().unwrap();
let current_time = vfs.current_time();
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(1))
.expect("Unable to create watcher!");
match event {
DebouncedEvent::Write(ref change_path) |
DebouncedEvent::Create(ref change_path) |
DebouncedEvent::Remove(ref change_path) => {
if let Some(mut route) = path_to_route(&root_path, change_path) {
route.insert(0, partition_name.clone());
watcher
.watch(&root_path, RecursiveMode::Recursive)
.expect("Unable to watch path!");
self.watchers.push(watcher);
{
let vfs = self.vfs.clone();
thread::spawn(move || {
loop {
let event = rx.recv().unwrap();
let mut vfs = vfs.lock().unwrap();
let current_time = vfs.current_time();
match event {
DebouncedEvent::Write(ref change_path) |
DebouncedEvent::Create(ref change_path) |
DebouncedEvent::Remove(ref change_path) => {
if let Some(mut route) = path_to_route(&root_path, change_path) {
route.insert(0, partition_name.clone());
vfs.add_change(current_time, route);
} else {
println!("Failed to get route from {}", change_path.display());
}
},
DebouncedEvent::Rename(ref from_change, ref to_change) => {
if let Some(mut route) = path_to_route(&root_path, from_change) {
route.insert(0, partition_name.clone());
vfs.add_change(current_time, route);
} else {
println!("Failed to get route from {}", from_change.display());
}
if let Some(mut route) = path_to_route(&root_path, to_change) {
route.insert(0, partition_name.clone());
vfs.add_change(current_time, route);
} else {
println!("Failed to get route from {}", to_change.display());
}
},
_ => {},
}
vfs.add_change(current_time, route);
} else {
eprintln!("Failed to get route from {}", change_path.display());
}
});
},
DebouncedEvent::Rename(ref from_change, ref to_change) => {
if let Some(mut route) = path_to_route(&root_path, from_change) {
route.insert(0, partition_name.clone());
vfs.add_change(current_time, route);
} else {
eprintln!("Failed to get route from {}", from_change.display());
}
if let Some(mut route) = path_to_route(&root_path, to_change) {
route.insert(0, partition_name.clone());
vfs.add_change(current_time, route);
} else {
eprintln!("Failed to get route from {}", to_change.display());
}
},
_ => {},
}
}
}
pub fn start(self) {
let mut watchers = Vec::new();
// Create an extra scope so that `vfs` gets dropped and unlocked
{
let vfs = self.vfs.lock().unwrap();
for (ref partition_name, ref root_path) in vfs.get_partitions() {
let (tx, rx) = mpsc::channel();
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(1))
.expect("Unable to create watcher!");
watcher
.watch(&root_path, RecursiveMode::Recursive)
.expect("Unable to watch path!");
watchers.push(watcher);
{
let partition_name = partition_name.to_string();
let root_path = root_path.to_path_buf();
let vfs = self.vfs.clone();
thread::spawn(move || {
Self::start_watcher(vfs, rx, partition_name, root_path);
});
}
}
}
loop {
thread::park();
}
}
}