From 3c0fe4d6840b1674cb018e6a064c7653d49cafcd Mon Sep 17 00:00:00 2001 From: Lucien Greathouse Date: Mon, 28 Jan 2019 17:11:01 -0800 Subject: [PATCH] Reduce number of threads needed for FsWatcher --- server/src/fs_watcher.rs | 120 +++++++++++++++++++++---------------- server/src/imfs.rs | 12 ++++ server/src/live_session.rs | 2 +- 3 files changed, 81 insertions(+), 53 deletions(-) diff --git a/server/src/fs_watcher.rs b/server/src/fs_watcher.rs index 178994d2..33159a55 100644 --- a/server/src/fs_watcher.rs +++ b/server/src/fs_watcher.rs @@ -1,10 +1,12 @@ use std::{ sync::{mpsc, Arc, Mutex}, time::Duration, + path::Path, + ops::Deref, thread, }; -use log::trace; +use log::{warn, trace}; use notify::{ self, DebouncedEvent, @@ -20,7 +22,66 @@ use crate::{ const WATCH_TIMEOUT: Duration = Duration::from_millis(100); -fn handle_event(imfs: &Mutex, rbx_session: &Mutex, event: DebouncedEvent) { +/// Watches for changes on the filesystem and links together the in-memory +/// filesystem and in-memory Roblox tree. +pub struct FsWatcher { + watcher: RecommendedWatcher, +} + +impl FsWatcher { + /// Start a new FS watcher, watching all of the roots currently attached to + /// the given Imfs. + /// + /// `rbx_session` is optional to make testing easier. If it isn't `None`, + /// events will be passed to it after they're given to the Imfs. + pub fn start(imfs: Arc>, rbx_session: Option>>) -> FsWatcher { + let (watch_tx, watch_rx) = mpsc::channel(); + + let mut watcher = notify::watcher(watch_tx, WATCH_TIMEOUT) + .expect("Could not create filesystem watcher"); + + { + let imfs = imfs.lock().unwrap(); + + for root_path in imfs.get_roots() { + watcher.watch(root_path, RecursiveMode::Recursive) + .expect("Could not watch directory"); + } + } + + { + let imfs = Arc::clone(&imfs); + let rbx_session = rbx_session.as_ref().map(Arc::clone); + + thread::spawn(move || { + trace!("Watcher thread started"); + while let Ok(event) = watch_rx.recv() { + // handle_fs_event expects an Option<&Mutex>, but we have + // an Option>>, so we coerce with Deref. + let session_ref = rbx_session.as_ref().map(Deref::deref); + + handle_fs_event(&imfs, session_ref, event); + } + trace!("Watcher thread stopped"); + }); + } + + FsWatcher { + watcher, + } + } + + pub fn stop_watching_path(&mut self, path: &Path) { + match self.watcher.unwatch(path) { + Ok(_) => {}, + Err(e) => { + warn!("Could not unwatch path {}: {}", path.display(), e); + }, + } + } +} + +fn handle_fs_event(imfs: &Mutex, rbx_session: Option<&Mutex>, event: DebouncedEvent) { match event { DebouncedEvent::Create(path) => { trace!("Path created: {}", path.display()); @@ -30,7 +91,7 @@ fn handle_event(imfs: &Mutex, rbx_session: &Mutex, event: Debo imfs.path_created(&path).unwrap(); } - { + if let Some(rbx_session) = rbx_session { let mut rbx_session = rbx_session.lock().unwrap(); rbx_session.path_created(&path); } @@ -43,7 +104,7 @@ fn handle_event(imfs: &Mutex, rbx_session: &Mutex, event: Debo imfs.path_updated(&path).unwrap(); } - { + if let Some(rbx_session) = rbx_session { let mut rbx_session = rbx_session.lock().unwrap(); rbx_session.path_updated(&path); } @@ -56,20 +117,20 @@ fn handle_event(imfs: &Mutex, rbx_session: &Mutex, event: Debo imfs.path_removed(&path).unwrap(); } - { + if let Some(rbx_session) = rbx_session { let mut rbx_session = rbx_session.lock().unwrap(); rbx_session.path_removed(&path); } }, DebouncedEvent::Rename(from_path, to_path) => { - trace!("Path rename: {} to {}", from_path.display(), to_path.display()); + trace!("Path renamed: {} to {}", from_path.display(), to_path.display()); { let mut imfs = imfs.lock().unwrap(); imfs.path_moved(&from_path, &to_path).unwrap(); } - { + if let Some(rbx_session) = rbx_session { let mut rbx_session = rbx_session.lock().unwrap(); rbx_session.path_renamed(&from_path, &to_path); } @@ -78,49 +139,4 @@ fn handle_event(imfs: &Mutex, rbx_session: &Mutex, event: Debo trace!("Unhandled FS event: {:?}", other); }, } -} - -/// Watches for changes on the filesystem and links together the in-memory -/// filesystem and in-memory Roblox tree. -pub struct FsWatcher { - #[allow(unused)] - watchers: Vec, -} - -impl FsWatcher { - pub fn start(imfs: Arc>, rbx_session: Arc>) -> FsWatcher { - let mut watchers = Vec::new(); - - { - let imfs_temp = imfs.lock().unwrap(); - - for root_path in imfs_temp.get_roots() { - let (watch_tx, watch_rx) = mpsc::channel(); - - let mut watcher = notify::watcher(watch_tx, WATCH_TIMEOUT) - .expect("Could not create `notify` watcher"); - - watcher.watch(root_path, RecursiveMode::Recursive) - .expect("Could not watch directory"); - - watchers.push(watcher); - - let imfs = Arc::clone(&imfs); - let rbx_session = Arc::clone(&rbx_session); - let root_path = root_path.clone(); - - thread::spawn(move || { - trace!("Watcher thread ({}) started", root_path.display()); - while let Ok(event) = watch_rx.recv() { - handle_event(&imfs, &rbx_session, event); - } - trace!("Watcher thread ({}) stopped", root_path.display()); - }); - } - } - - FsWatcher { - watchers, - } - } } \ No newline at end of file diff --git a/server/src/imfs.rs b/server/src/imfs.rs index 26ccc4db..dec13807 100644 --- a/server/src/imfs.rs +++ b/server/src/imfs.rs @@ -72,6 +72,18 @@ impl Imfs { self.descend_and_read_from_disk(path) } + pub fn remove_root(&mut self, path: &Path) { + debug_assert!(path.is_absolute()); + + if self.roots.get(path).is_some() { + self.remove_item(path); + + if let Some(parent_path) = path.parent() { + self.unlink_child(parent_path, path); + } + } + } + pub fn path_created(&mut self, path: &Path) -> io::Result<()> { debug_assert!(path.is_absolute()); debug_assert!(self.is_within_roots(path)); diff --git a/server/src/live_session.rs b/server/src/live_session.rs index 6d0b9485..1f6cac87 100644 --- a/server/src/live_session.rs +++ b/server/src/live_session.rs @@ -41,7 +41,7 @@ impl LiveSession { let fs_watcher = FsWatcher::start( Arc::clone(&imfs), - Arc::clone(&rbx_session), + Some(Arc::clone(&rbx_session)), ); let session_id = SessionId::new();