From 2440d9fc48f5af76fc45c35c3d11cbf939d594e0 Mon Sep 17 00:00:00 2001 From: Lucien Greathouse Date: Mon, 23 Sep 2019 16:09:58 -0700 Subject: [PATCH] Create ChangeProcessor for routing events from imfs to tree to message queue --- src/change_processor.rs | 84 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/serve_session.rs | 78 ++++++++++++++++++++++++++++++++------ 3 files changed, 152 insertions(+), 11 deletions(-) create mode 100644 src/change_processor.rs diff --git a/src/change_processor.rs b/src/change_processor.rs new file mode 100644 index 00000000..0220025f --- /dev/null +++ b/src/change_processor.rs @@ -0,0 +1,84 @@ +//! Defines the process by which changes are pulled from the Imfs, filtered, and +//! used to mutate Rojo's tree during a live session. +//! +//! This object is owned by a ServeSession. + +use std::sync::{Arc, Mutex}; + +use crossbeam_channel::{select, Sender}; +use jod_thread::JoinHandle; + +use crate::{ + imfs::{Imfs, ImfsFetcher}, + message_queue::MessageQueue, + snapshot::{AppliedPatchSet, RojoTree}, +}; + +pub struct ChangeProcessor { + shutdown_sender: Sender<()>, + thread_handle: JoinHandle<()>, +} + +impl ChangeProcessor { + pub fn start( + tree: Arc>, + message_queue: Arc>, + imfs: Arc>>, + ) -> Self { + let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1); + let imfs_receiver = { + let imfs = imfs.lock().unwrap(); + imfs.change_receiver() + }; + + let thread_handle = jod_thread::Builder::new() + .name("ChangeProcessor thread".to_owned()) + .spawn(move || { + log::trace!("ChangeProcessor thread started"); + + loop { + select! { + recv(imfs_receiver) -> event => { + let event = event.unwrap(); + + log::trace!("Imfs event: {:?}", event); + + { + let mut imfs = imfs.lock().unwrap(); + imfs.commit_change(&event).expect("Error applying IMFS change"); + } + + let patch_set = { + let _tree = tree.lock().unwrap(); + + // TODO: Apply changes to tree based on IMFS and + // calculate applied patch set from it. + AppliedPatchSet::new() + }; + + { + message_queue.push_messages(&[patch_set]); + } + }, + recv(shutdown_receiver) -> _ => { + break; + }, + } + } + + log::trace!("ChangeProcessor thread stopping"); + }) + .expect("Could not start ChangeProcessor thread"); + + Self { + shutdown_sender, + thread_handle, + } + } +} + +impl Drop for ChangeProcessor { + fn drop(&mut self) { + let _ = self.shutdown_sender.send(()); + } +} diff --git a/src/lib.rs b/src/lib.rs index 1c7603ef..f190593c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod commands; pub mod project; mod auth_cookie; +mod change_processor; mod imfs; mod message_queue; mod multimap; diff --git a/src/serve_session.rs b/src/serve_session.rs index 327b66c7..c1e37aeb 100644 --- a/src/serve_session.rs +++ b/src/serve_session.rs @@ -1,46 +1,102 @@ use std::{ collections::HashSet, - sync::{Mutex, MutexGuard}, + sync::{Arc, Mutex, MutexGuard}, }; use crate::{ + change_processor::ChangeProcessor, imfs::{Imfs, ImfsFetcher}, message_queue::MessageQueue, project::Project, session_id::SessionId, - snapshot::AppliedPatchSet, - snapshot::RojoTree, + snapshot::{AppliedPatchSet, RojoTree}, }; /// Contains all of the state for a Rojo serve session. +/// +/// Nothing here is specific to any Rojo interface. Though the primary way to +/// interact with a serve session is Rojo's HTTP right now, there's no reason +/// why Rojo couldn't expose an IPC or channels-based API for embedding in the +/// future. `ServeSession` would be roughly the right interface to expose for +/// those cases. pub struct ServeSession { + /// The root project for the serve session, if there was one defined. + /// + /// This will be defined if a folder with a `default.project.json` file was + /// used for starting the serve session, or if the user specified a full + /// path to a `.project.json` file. + /// + /// If `root_project` is None, values from the project should be treated as + /// their defaults. root_project: Option, + + /// A randomly generated ID for this serve session. It's used to ensure that + /// a client doesn't begin connecting to a different server part way through + /// an operation that needs to be atomic. session_id: SessionId, - tree: Mutex, - message_queue: MessageQueue, - imfs: Imfs, + + /// The tree of Roblox instances associated with this session that will be + /// updated in real-time. This is derived from the session's IMFS and will + /// eventually be mutable to connected clients. + tree: Arc>, + + /// An in-memory filesystem containing all of the files relevant for this + /// live session. + /// + /// The main use for accessing it from the session is for debugging issues + /// with Rojo's live-sync protocol. + imfs: Arc>>, + + /// A queue of changes that have been applied to `tree` that affect clients. + /// + /// Clients to the serve session will subscribe to this queue either + /// directly or through the HTTP API to be notified of mutations that need + /// to be applied. + message_queue: Arc>, + + /// The object responsible for listening to changes from the in-memory + /// filesystem, applying them, updating the Roblox instance tree, and + /// routing messages through the session's message queue to any connected + /// clients. + change_processor: ChangeProcessor, } -impl ServeSession { +/// Methods that need thread-safety bounds on ImfsFetcher are limited to this +/// block to prevent needing to spread Send + Sync + 'static into everything +/// that handles ServeSession. +impl ServeSession { pub fn new(imfs: Imfs, tree: RojoTree, root_project: Option) -> Self { let session_id = SessionId::new(); let message_queue = MessageQueue::new(); - ServeSession { + let tree = Arc::new(Mutex::new(tree)); + let message_queue = Arc::new(message_queue); + let imfs = Arc::new(Mutex::new(imfs)); + + let change_processor = ChangeProcessor::start( + Arc::clone(&tree), + Arc::clone(&message_queue), + Arc::clone(&imfs), + ); + + Self { session_id, root_project, - tree: Mutex::new(tree), + tree, message_queue, imfs, + change_processor, } } +} +impl ServeSession { pub fn tree(&self) -> MutexGuard<'_, RojoTree> { self.tree.lock().unwrap() } - pub fn imfs(&self) -> &Imfs { - &self.imfs + pub fn imfs(&self) -> MutexGuard<'_, Imfs> { + self.imfs.lock().unwrap() } pub fn message_queue(&self) -> &MessageQueue {