use std::{ fs, sync::{Arc, Mutex}, }; use crossbeam_channel::{select, Receiver, RecvError, Sender}; use jod_thread::JoinHandle; use memofs::{IoResultExt, Vfs, VfsEvent}; use rbx_dom_weak::types::{Ref, Variant}; use crate::{ message_queue::MessageQueue, snapshot::{ apply_patch_set, compute_patch_set, AppliedPatchSet, InstigatingSource, PatchSet, RojoTree, }, snapshot_middleware::{snapshot_from_vfs, snapshot_project_node}, }; /// Processes file change events, updates the DOM, and sends those updates /// through a channel for other stuff to consume. /// /// Owns the connection between Rojo's VFS and its DOM by holding onto another /// thread that processes messages. /// /// Consumers of ChangeProcessor, like ServeSession, are intended to communicate /// with this object via channels. /// /// ChangeProcessor expects to be the only writer to the RojoTree and Vfs /// objects passed to it. pub struct ChangeProcessor { /// Controls the runtime of the processor thread. When signaled, the job /// thread will finish its current work and terminate. /// /// This channel should be signaled before dropping ChangeProcessor or we'll /// hang forever waiting for the message processing loop to terminate. shutdown_sender: Sender<()>, /// A handle to the message processing thread. When dropped, we'll block /// until it's done. /// /// Allowed to be unused because dropping this value has side effects. #[allow(unused)] job_thread: JoinHandle>, } impl ChangeProcessor { /// Spin up the ChangeProcessor, connecting it to the given tree, VFS, and /// outbound message queue. pub fn start( tree: Arc>, vfs: Arc, message_queue: Arc>, tree_mutation_receiver: Receiver, ) -> Self { let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1); let vfs_receiver = vfs.event_receiver(); let task = JobThreadContext { tree, vfs, message_queue, }; let job_thread = jod_thread::Builder::new() .name("ChangeProcessor thread".to_owned()) .spawn(move || { log::trace!("ChangeProcessor thread started"); loop { select! { recv(vfs_receiver) -> event => { task.handle_vfs_event(event?); }, recv(tree_mutation_receiver) -> patch_set => { task.handle_tree_event(patch_set?); }, recv(shutdown_receiver) -> _ => { log::trace!("ChangeProcessor shutdown signal received..."); return Ok(()); }, } } }) .expect("Could not start ChangeProcessor thread"); Self { shutdown_sender, job_thread, } } } impl Drop for ChangeProcessor { fn drop(&mut self) { // Signal the job thread to start spinning down. Without this we'll hang // forever waiting for the thread to finish its infinite loop. let _ = self.shutdown_sender.send(()); // After this function ends, the job thread will be joined. It might // block for a small period of time while it processes its last work. } } /// Contains all of the state needed to synchronize the DOM and VFS. struct JobThreadContext { /// A handle to the DOM we're managing. tree: Arc>, /// A handle to the VFS we're managing. vfs: Arc, /// Whenever changes are applied to the DOM, we should push those changes /// into this message queue to inform any connected clients. message_queue: Arc>, } impl JobThreadContext { fn handle_vfs_event(&self, event: VfsEvent) { log::trace!("Vfs event: {:?}", event); // Update the VFS immediately with the event. self.vfs .commit_event(&event) .expect("Error applying VFS change"); // For a given VFS event, we might have many changes to different parts // of the tree. Calculate and apply all of these changes. let applied_patches = match event { VfsEvent::Create(path) | VfsEvent::Remove(path) | VfsEvent::Write(path) => { let mut tree = self.tree.lock().unwrap(); let mut applied_patches = Vec::new(); // Find the nearest ancestor to this path that has // associated instances in the tree. This helps make sure // that we handle additions correctly, especially if we // receive events for descendants of a large tree being // created all at once. let mut current_path = path.as_path(); let affected_ids = loop { let ids = tree.get_ids_at_path(current_path); log::trace!("Path {} affects IDs {:?}", current_path.display(), ids); if !ids.is_empty() { break ids.to_vec(); } log::trace!("Trying parent path..."); match current_path.parent() { Some(parent) => current_path = parent, None => break Vec::new(), } }; for id in affected_ids { if let Some(patch) = compute_and_apply_changes(&mut tree, &self.vfs, id) { if !patch.is_empty() { applied_patches.push(patch); } } } applied_patches } _ => { log::warn!("Unhandled VFS event: {:?}", event); Vec::new() } }; // Notify anyone listening to the message queue about the changes we // just made. self.message_queue.push_messages(&applied_patches); } fn handle_tree_event(&self, patch_set: PatchSet) { log::trace!("Applying PatchSet from client: {:#?}", patch_set); let applied_patch = { let mut tree = self.tree.lock().unwrap(); for &id in &patch_set.removed_instances { if let Some(instance) = tree.get_instance(id) { if let Some(instigating_source) = &instance.metadata().instigating_source { match instigating_source { InstigatingSource::Path(path) => fs::remove_file(path).unwrap(), InstigatingSource::ProjectNode(_, _, _, _) => { log::warn!( "Cannot remove instance {:?}, it's from a project file", id ); } } } else { // TODO log::warn!( "Cannot remove instance {:?}, it is not an instigating source.", id ); } } else { log::warn!("Cannot remove instance {:?}, it does not exist.", id); } } for update in &patch_set.updated_instances { let id = update.id; if let Some(instance) = tree.get_instance(id) { if update.changed_name.is_some() { log::warn!("Cannot rename instances yet."); } if update.changed_class_name.is_some() { log::warn!("Cannot change ClassName yet."); } if update.changed_metadata.is_some() { log::warn!("Cannot change metadata yet."); } for (key, changed_value) in &update.changed_properties { if key == "Source" { if let Some(instigating_source) = &instance.metadata().instigating_source { match instigating_source { InstigatingSource::Path(path) => { if let Some(Variant::String(value)) = changed_value { fs::write(path, value).unwrap(); } else { log::warn!("Cannot change Source to non-string value."); } } InstigatingSource::ProjectNode(_, _, _, _) => { log::warn!( "Cannot remove instance {:?}, it's from a project file", id ); } } } else { log::warn!( "Cannot update instance {:?}, it is not an instigating source.", id ); } } else { log::warn!("Cannot change properties besides BaseScript.Source."); } } } else { log::warn!("Cannot update instance {:?}, it does not exist.", id); } } apply_patch_set(&mut tree, patch_set) }; if !applied_patch.is_empty() { self.message_queue.push_messages(&[applied_patch]); } } } fn compute_and_apply_changes(tree: &mut RojoTree, vfs: &Vfs, id: Ref) -> Option { let metadata = tree .get_metadata(id) .expect("metadata missing for instance present in tree"); let instigating_source = match &metadata.instigating_source { Some(path) => path, None => { log::error!( "Instance {:?} did not have an instigating source, but was considered for an update.", id ); log::error!("This is a bug. Please file an issue!"); return None; } }; // How we process a file change event depends on what created this // file/folder in the first place. let applied_patch_set = match instigating_source { InstigatingSource::Path(path) => match vfs.metadata(path).with_not_found() { Ok(Some(_)) => { // Our instance was previously created from a path and that // path still exists. We can generate a snapshot starting at // that path and use it as the source for our patch. let snapshot = match snapshot_from_vfs(&metadata.context, vfs, path) { Ok(snapshot) => snapshot, Err(err) => { log::error!("Snapshot error: {:?}", err); return None; } }; let patch_set = compute_patch_set(snapshot, tree, id); apply_patch_set(tree, patch_set) } Ok(None) => { // Our instance was previously created from a path, but that // path no longer exists. // // We associate deleting the instigating file for an // instance with deleting that instance. let mut patch_set = PatchSet::new(); patch_set.removed_instances.push(id); apply_patch_set(tree, patch_set) } Err(err) => { log::error!("Error processing filesystem change: {:?}", err); return None; } }, InstigatingSource::ProjectNode(project_path, instance_name, project_node, parent_class) => { // This instance is the direct subject of a project node. Since // there might be information associated with our instance from // the project file, we snapshot the entire project node again. let snapshot_result = snapshot_project_node( &metadata.context, project_path, instance_name, project_node, vfs, parent_class.as_ref().map(|name| name.as_str()), ); let snapshot = match snapshot_result { Ok(snapshot) => snapshot, Err(err) => { log::error!("{:?}", err); return None; } }; let patch_set = compute_patch_set(snapshot, tree, id); apply_patch_set(tree, patch_set) } }; Some(applied_patch_set) }