Refactor ChangeProcessor to be easier to follow

This commit is contained in:
Lucien Greathouse
2019-12-19 17:41:35 -08:00
parent 46d7bba87d
commit 26e2e81188

View File

@@ -1,11 +1,6 @@
//! Defines the process by which changes are pulled from the Vfs, filtered, and
//! used to mutate Rojo's tree during a live session.
//!
//! This object is owned by a ServeSession.
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crossbeam_channel::{select, Receiver, Sender}; use crossbeam_channel::{select, Sender};
use jod_thread::JoinHandle; use jod_thread::JoinHandle;
use rbx_dom_weak::RbxId; use rbx_dom_weak::RbxId;
@@ -18,41 +13,50 @@ use crate::{
vfs::{FsResultExt, Vfs, VfsEvent, VfsFetcher}, vfs::{FsResultExt, Vfs, VfsEvent, VfsFetcher},
}; };
/// Owns the connection between Rojo's VFS and its DOM by holding onto another
/// thread that processes messages.
///
/// Consumers of ChangeProcessor, like ServeSession, are intended to communicate
/// with this object via channels.
///
/// ChangeProcessor expects to be the only writer to the RojoTree and Vfs
/// objects passed to it.
pub struct ChangeProcessor { pub struct ChangeProcessor {
/// Controls the runtime of the processor thread. When signaled, the job
/// thread will finish its current work and terminate.
///
/// This channel should be signaled before dropping ChangeProcessor or we'll
/// hang forever waiting for the message processing loop to terminate.
shutdown_sender: Sender<()>, shutdown_sender: Sender<()>,
_thread_handle: JoinHandle<()>,
/// A handle to the message processing thread. When dropped, we'll block
/// until it's done.
///
/// Allowed to be unused because dropping this value has side effects.
#[allow(unused)]
job_thread: JoinHandle<()>,
} }
impl ChangeProcessor { impl ChangeProcessor {
/// Spin up the ChangeProcessor, connecting it to the given tree, VFS, and
/// outbound message queue.
pub fn start<F: VfsFetcher + Send + Sync + 'static>( pub fn start<F: VfsFetcher + Send + Sync + 'static>(
tree: Arc<Mutex<RojoTree>>, tree: Arc<Mutex<RojoTree>>,
message_queue: Arc<MessageQueue<AppliedPatchSet>>, message_queue: Arc<MessageQueue<AppliedPatchSet>>,
vfs: Arc<Vfs<F>>, vfs: Arc<Vfs<F>>,
) -> Self { ) -> Self {
let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1); let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1);
let vfs_receiver = vfs.change_receiver();
let task = JobThreadContext {
tree,
vfs,
message_queue,
};
let thread_handle = jod_thread::Builder::new() let job_thread = jod_thread::Builder::new()
.name("ChangeProcessor thread".to_owned()) .name("ChangeProcessor thread".to_owned())
.spawn(move || { .spawn(move || {
log::trace!("ChangeProcessor thread started"); log::trace!("ChangeProcessor thread started");
Self::main_task(shutdown_receiver, tree, message_queue, vfs);
log::trace!("ChangeProcessor thread stopped");
})
.expect("Could not start ChangeProcessor thread");
Self {
shutdown_sender,
_thread_handle: thread_handle,
}
}
fn main_task<F: VfsFetcher>(
shutdown_receiver: Receiver<()>,
tree: Arc<Mutex<RojoTree>>,
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
vfs: Arc<Vfs<F>>,
) {
let vfs_receiver = vfs.change_receiver();
#[allow( #[allow(
// Crossbeam's select macro generates code that Clippy doesn't like, // Crossbeam's select macro generates code that Clippy doesn't like,
@@ -66,18 +70,72 @@ impl ChangeProcessor {
loop { loop {
select! { select! {
recv(vfs_receiver) -> event => { recv(vfs_receiver) -> event => {
let event = event.unwrap(); task.handle_vfs_event(event.unwrap());
},
recv(shutdown_receiver) -> _ => {
log::trace!("ChangeProcessor shutdown signal received...");
break;
},
}
}
log::trace!("ChangeProcessor thread stopped");
})
.expect("Could not start ChangeProcessor thread");
Self {
shutdown_sender,
job_thread,
}
}
}
impl Drop for ChangeProcessor {
fn drop(&mut self) {
// Signal the job thread to start spinning down. Without this we'll hang
// forever waiting for the thread to finish its infinite loop.
let _ = self.shutdown_sender.send(());
// After this function ends, the job thread will be joined. It might
// block for a small period of time while it processes its last work.
}
}
/// Contains all of the state needed to synchronize the DOM and VFS.
struct JobThreadContext<F> {
/// A handle to the DOM we're managing.
tree: Arc<Mutex<RojoTree>>,
/// A handle to the VFS we're managing.
vfs: Arc<Vfs<F>>,
/// Whenever changes are applied to the DOM, we should push those changes
/// into this message queue to inform any connected clients.
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
}
impl<F: VfsFetcher> JobThreadContext<F> {
fn handle_vfs_event(&self, event: VfsEvent) {
log::trace!("Vfs event: {:?}", event); log::trace!("Vfs event: {:?}", event);
let applied_patches = { // Update the VFS immediately with the event.
vfs.commit_change(&event).expect("Error applying VFS change"); self.vfs
.commit_change(&event)
.expect("Error applying VFS change");
let mut tree = tree.lock().unwrap(); // For a given VFS event, we might have many changes to different parts
// of the tree. Calculate and apply all of these changes.
let applied_patches = {
let mut tree = self.tree.lock().unwrap();
let mut applied_patches = Vec::new(); let mut applied_patches = Vec::new();
match event { match event {
VfsEvent::Created(path) | VfsEvent::Modified(path) | VfsEvent::Removed(path) => { VfsEvent::Created(path) | VfsEvent::Modified(path) | VfsEvent::Removed(path) => {
// Find the nearest ancestor to this path that has
// associated instances in the tree. This helps make sure
// that we handle additions correctly, especially if we
// receive events for descendants of a large tree being
// created all at once.
let mut current_path = path.as_path(); let mut current_path = path.as_path();
let affected_ids = loop { let affected_ids = loop {
let ids = tree.get_ids_at_path(&current_path); let ids = tree.get_ids_at_path(&current_path);
@@ -95,39 +153,28 @@ impl ChangeProcessor {
} }
}; };
update_affected_instances(&mut tree, &vfs, &mut applied_patches, &affected_ids) for id in affected_ids {
if let Some(patch) = compute_and_apply_changes(&mut tree, &self.vfs, id) {
applied_patches.push(patch);
}
}
} }
} }
applied_patches applied_patches
}; };
{ // Notify anyone listening to the message queue about the changes we
message_queue.push_messages(&applied_patches); // just made.
} self.message_queue.push_messages(&applied_patches);
},
recv(shutdown_receiver) -> _ => {
log::trace!("ChangeProcessor shutdown signal received...");
break;
},
}
}
} }
} }
impl Drop for ChangeProcessor { fn compute_and_apply_changes<F: VfsFetcher>(
fn drop(&mut self) {
let _ = self.shutdown_sender.send(());
}
}
fn update_affected_instances<F: VfsFetcher>(
tree: &mut RojoTree, tree: &mut RojoTree,
vfs: &Vfs<F>, vfs: &Vfs<F>,
applied_patches: &mut Vec<AppliedPatchSet>, id: RbxId,
affected_ids: &[RbxId], ) -> Option<AppliedPatchSet> {
) {
for &id in affected_ids {
let metadata = tree let metadata = tree
.get_metadata(id) .get_metadata(id)
.expect("metadata missing for instance present in tree"); .expect("metadata missing for instance present in tree");
@@ -135,9 +182,13 @@ fn update_affected_instances<F: VfsFetcher>(
let instigating_source = match &metadata.instigating_source { let instigating_source = match &metadata.instigating_source {
Some(path) => path, Some(path) => path,
None => { None => {
log::warn!("Instance {} did not have an instigating source, but was considered for an update.", id); log::warn!(
"Instance {} did not have an instigating source, but was considered for an update.",
id
);
log::warn!("This is a Rojo bug. Please file an issue!"); log::warn!("This is a Rojo bug. Please file an issue!");
continue;
return None;
} }
}; };
@@ -198,6 +249,5 @@ fn update_affected_instances<F: VfsFetcher>(
} }
}; };
applied_patches.push(applied_patch_set); Some(applied_patch_set)
}
} }