forked from rojo-rbx/rojo
Create ChangeProcessor for routing events from imfs to tree to message queue
This commit is contained in:
84
src/change_processor.rs
Normal file
84
src/change_processor.rs
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
//! Defines the process by which changes are pulled from the Imfs, filtered, and
|
||||||
|
//! used to mutate Rojo's tree during a live session.
|
||||||
|
//!
|
||||||
|
//! This object is owned by a ServeSession.
|
||||||
|
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use crossbeam_channel::{select, Sender};
|
||||||
|
use jod_thread::JoinHandle;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
imfs::{Imfs, ImfsFetcher},
|
||||||
|
message_queue::MessageQueue,
|
||||||
|
snapshot::{AppliedPatchSet, RojoTree},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct ChangeProcessor {
|
||||||
|
shutdown_sender: Sender<()>,
|
||||||
|
thread_handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChangeProcessor {
|
||||||
|
pub fn start<F: ImfsFetcher + Send + 'static>(
|
||||||
|
tree: Arc<Mutex<RojoTree>>,
|
||||||
|
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
|
||||||
|
imfs: Arc<Mutex<Imfs<F>>>,
|
||||||
|
) -> Self {
|
||||||
|
let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1);
|
||||||
|
let imfs_receiver = {
|
||||||
|
let imfs = imfs.lock().unwrap();
|
||||||
|
imfs.change_receiver()
|
||||||
|
};
|
||||||
|
|
||||||
|
let thread_handle = jod_thread::Builder::new()
|
||||||
|
.name("ChangeProcessor thread".to_owned())
|
||||||
|
.spawn(move || {
|
||||||
|
log::trace!("ChangeProcessor thread started");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
recv(imfs_receiver) -> event => {
|
||||||
|
let event = event.unwrap();
|
||||||
|
|
||||||
|
log::trace!("Imfs event: {:?}", event);
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut imfs = imfs.lock().unwrap();
|
||||||
|
imfs.commit_change(&event).expect("Error applying IMFS change");
|
||||||
|
}
|
||||||
|
|
||||||
|
let patch_set = {
|
||||||
|
let _tree = tree.lock().unwrap();
|
||||||
|
|
||||||
|
// TODO: Apply changes to tree based on IMFS and
|
||||||
|
// calculate applied patch set from it.
|
||||||
|
AppliedPatchSet::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
message_queue.push_messages(&[patch_set]);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
recv(shutdown_receiver) -> _ => {
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::trace!("ChangeProcessor thread stopping");
|
||||||
|
})
|
||||||
|
.expect("Could not start ChangeProcessor thread");
|
||||||
|
|
||||||
|
Self {
|
||||||
|
shutdown_sender,
|
||||||
|
thread_handle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for ChangeProcessor {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let _ = self.shutdown_sender.send(());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,6 +13,7 @@ pub mod commands;
|
|||||||
pub mod project;
|
pub mod project;
|
||||||
|
|
||||||
mod auth_cookie;
|
mod auth_cookie;
|
||||||
|
mod change_processor;
|
||||||
mod imfs;
|
mod imfs;
|
||||||
mod message_queue;
|
mod message_queue;
|
||||||
mod multimap;
|
mod multimap;
|
||||||
|
|||||||
@@ -1,46 +1,102 @@
|
|||||||
use std::{
|
use std::{
|
||||||
collections::HashSet,
|
collections::HashSet,
|
||||||
sync::{Mutex, MutexGuard},
|
sync::{Arc, Mutex, MutexGuard},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
change_processor::ChangeProcessor,
|
||||||
imfs::{Imfs, ImfsFetcher},
|
imfs::{Imfs, ImfsFetcher},
|
||||||
message_queue::MessageQueue,
|
message_queue::MessageQueue,
|
||||||
project::Project,
|
project::Project,
|
||||||
session_id::SessionId,
|
session_id::SessionId,
|
||||||
snapshot::AppliedPatchSet,
|
snapshot::{AppliedPatchSet, RojoTree},
|
||||||
snapshot::RojoTree,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Contains all of the state for a Rojo serve session.
|
/// Contains all of the state for a Rojo serve session.
|
||||||
|
///
|
||||||
|
/// Nothing here is specific to any Rojo interface. Though the primary way to
|
||||||
|
/// interact with a serve session is Rojo's HTTP right now, there's no reason
|
||||||
|
/// why Rojo couldn't expose an IPC or channels-based API for embedding in the
|
||||||
|
/// future. `ServeSession` would be roughly the right interface to expose for
|
||||||
|
/// those cases.
|
||||||
pub struct ServeSession<F> {
|
pub struct ServeSession<F> {
|
||||||
|
/// The root project for the serve session, if there was one defined.
|
||||||
|
///
|
||||||
|
/// This will be defined if a folder with a `default.project.json` file was
|
||||||
|
/// used for starting the serve session, or if the user specified a full
|
||||||
|
/// path to a `.project.json` file.
|
||||||
|
///
|
||||||
|
/// If `root_project` is None, values from the project should be treated as
|
||||||
|
/// their defaults.
|
||||||
root_project: Option<Project>,
|
root_project: Option<Project>,
|
||||||
|
|
||||||
|
/// A randomly generated ID for this serve session. It's used to ensure that
|
||||||
|
/// a client doesn't begin connecting to a different server part way through
|
||||||
|
/// an operation that needs to be atomic.
|
||||||
session_id: SessionId,
|
session_id: SessionId,
|
||||||
tree: Mutex<RojoTree>,
|
|
||||||
message_queue: MessageQueue<AppliedPatchSet>,
|
/// The tree of Roblox instances associated with this session that will be
|
||||||
imfs: Imfs<F>,
|
/// updated in real-time. This is derived from the session's IMFS and will
|
||||||
|
/// eventually be mutable to connected clients.
|
||||||
|
tree: Arc<Mutex<RojoTree>>,
|
||||||
|
|
||||||
|
/// An in-memory filesystem containing all of the files relevant for this
|
||||||
|
/// live session.
|
||||||
|
///
|
||||||
|
/// The main use for accessing it from the session is for debugging issues
|
||||||
|
/// with Rojo's live-sync protocol.
|
||||||
|
imfs: Arc<Mutex<Imfs<F>>>,
|
||||||
|
|
||||||
|
/// A queue of changes that have been applied to `tree` that affect clients.
|
||||||
|
///
|
||||||
|
/// Clients to the serve session will subscribe to this queue either
|
||||||
|
/// directly or through the HTTP API to be notified of mutations that need
|
||||||
|
/// to be applied.
|
||||||
|
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
|
||||||
|
|
||||||
|
/// The object responsible for listening to changes from the in-memory
|
||||||
|
/// filesystem, applying them, updating the Roblox instance tree, and
|
||||||
|
/// routing messages through the session's message queue to any connected
|
||||||
|
/// clients.
|
||||||
|
change_processor: ChangeProcessor,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F: ImfsFetcher> ServeSession<F> {
|
/// Methods that need thread-safety bounds on ImfsFetcher are limited to this
|
||||||
|
/// block to prevent needing to spread Send + Sync + 'static into everything
|
||||||
|
/// that handles ServeSession.
|
||||||
|
impl<F: ImfsFetcher + Send + 'static> ServeSession<F> {
|
||||||
pub fn new(imfs: Imfs<F>, tree: RojoTree, root_project: Option<Project>) -> Self {
|
pub fn new(imfs: Imfs<F>, tree: RojoTree, root_project: Option<Project>) -> Self {
|
||||||
let session_id = SessionId::new();
|
let session_id = SessionId::new();
|
||||||
let message_queue = MessageQueue::new();
|
let message_queue = MessageQueue::new();
|
||||||
|
|
||||||
ServeSession {
|
let tree = Arc::new(Mutex::new(tree));
|
||||||
|
let message_queue = Arc::new(message_queue);
|
||||||
|
let imfs = Arc::new(Mutex::new(imfs));
|
||||||
|
|
||||||
|
let change_processor = ChangeProcessor::start(
|
||||||
|
Arc::clone(&tree),
|
||||||
|
Arc::clone(&message_queue),
|
||||||
|
Arc::clone(&imfs),
|
||||||
|
);
|
||||||
|
|
||||||
|
Self {
|
||||||
session_id,
|
session_id,
|
||||||
root_project,
|
root_project,
|
||||||
tree: Mutex::new(tree),
|
tree,
|
||||||
message_queue,
|
message_queue,
|
||||||
imfs,
|
imfs,
|
||||||
|
change_processor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: ImfsFetcher> ServeSession<F> {
|
||||||
pub fn tree(&self) -> MutexGuard<'_, RojoTree> {
|
pub fn tree(&self) -> MutexGuard<'_, RojoTree> {
|
||||||
self.tree.lock().unwrap()
|
self.tree.lock().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn imfs(&self) -> &Imfs<F> {
|
pub fn imfs(&self) -> MutexGuard<'_, Imfs<F>> {
|
||||||
&self.imfs
|
self.imfs.lock().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn message_queue(&self) -> &MessageQueue<AppliedPatchSet> {
|
pub fn message_queue(&self) -> &MessageQueue<AppliedPatchSet> {
|
||||||
|
|||||||
Reference in New Issue
Block a user