From a398338c028eaf74cd4598d4d47d474199beddf0 Mon Sep 17 00:00:00 2001 From: Lucien Greathouse Date: Fri, 20 Dec 2019 14:24:28 -0800 Subject: [PATCH] Two way sync V0 (#282) * Unfinished two-way sync API * In-memory two-way sync complete * Move PatchSet application into ChangeProcessor thread, where it can be synchronous * Stop InstanceMap's signals when a ServeSession terminates * Apply patch in ChangeProcessor * Feature flag * Fix error in ChangeProcessor due to wrong drop order --- Cargo.toml | 3 ++ plugin/src/ApiContext.lua | 19 +++++++++--- plugin/src/DevSettings.lua | 4 +-- plugin/src/InstanceMap.lua | 14 ++++++++- plugin/src/ServeSession.lua | 1 + src/change_processor.rs | 22 ++++++++++++-- src/serve_session.rs | 38 +++++++++++++++++++----- src/web/api.rs | 58 +++++++++++++++++++++++++++++++++++-- src/web/interface.rs | 17 +++++++++++ 9 files changed, 156 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 41b0e456..601fecc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,9 @@ exclude = [ [features] default = [] +# Turn on the server half of Rojo's unstable two-way sync feature. +unstable_two_way_sync = [] + # Enable this feature to live-reload assets from the web UI. dev-live-assets = [] diff --git a/plugin/src/ApiContext.lua b/plugin/src/ApiContext.lua index 6e5b36e0..c6e89a93 100644 --- a/plugin/src/ApiContext.lua +++ b/plugin/src/ApiContext.lua @@ -156,11 +156,22 @@ function ApiContext:read(ids) end function ApiContext:write(patch) - local url = ("%s/write"):format(self.__baseUrl) - local body = Http.jsonEncode({ + local url = ("%s/api/write"):format(self.__baseUrl) + + local body = { sessionId = self.__sessionId, - patch = patch, - }) + removed = patch.removed, + updated = patch.updated, + } + + -- Only add the 'added' field if the table is non-empty, or else Roblox's + -- JSON implementation will turn the table into an array instead of an + -- object, causing API validation to fail. + if next(patch.added) ~= nil then + body.added = patch.added + end + + body = Http.jsonEncode(body) return Http.post(url, body) :andThen(rejectFailedRequests) diff --git a/plugin/src/DevSettings.lua b/plugin/src/DevSettings.lua index 182769da..c859f8f0 100644 --- a/plugin/src/DevSettings.lua +++ b/plugin/src/DevSettings.lua @@ -25,7 +25,7 @@ local VALUES = { [Environment.Test] = true, }, }, - ExperimentalTwoWaySync = { + UnstableTwoWaySync = { type = "BoolValue", values = { [Environment.User] = false, @@ -141,7 +141,7 @@ function DevSettings:shouldTypecheck() end function DevSettings:twoWaySyncEnabled() - return getValue("ExperimentalTwoWaySync") + return getValue("UnstableTwoWaySync") end function _G.ROJO_DEV_CREATE() diff --git a/plugin/src/InstanceMap.lua b/plugin/src/InstanceMap.lua index d1a6a1a7..1396ed6c 100644 --- a/plugin/src/InstanceMap.lua +++ b/plugin/src/InstanceMap.lua @@ -20,6 +20,16 @@ function InstanceMap.new(onInstanceChanged) return setmetatable(self, InstanceMap) end +--[[ + Disconnect all connections and release all instance references. +]] +function InstanceMap:stop() + -- I think this is safe. + for instance in pairs(self.fromInstances) do + self:removeInstance(instance) + end +end + function InstanceMap:__fmtDebug(output) output:writeLine("InstanceMap {{") output:indent() @@ -145,7 +155,9 @@ function InstanceMap:__disconnectSignals(instance) local signals = self.instancesToSignal[instance] if signals ~= nil then - -- In the general case, we avoid + -- In most cases, we only have a single signal, so we avoid keeping + -- around the extra table. ValueBase objects force us to use multiple + -- signals to emulate the Instance.Changed event, however. if typeof(signals) == "table" then for _, signal in ipairs(signals) do signal:Disconnect() diff --git a/plugin/src/ServeSession.lua b/plugin/src/ServeSession.lua index e6cdbb2f..41580f66 100644 --- a/plugin/src/ServeSession.lua +++ b/plugin/src/ServeSession.lua @@ -189,6 +189,7 @@ end function ServeSession:__stopInternal(err) self:__setStatus(Status.Disconnected, err) self.__apiContext:disconnect() + self.__instanceMap:stop() end function ServeSession:__setStatus(status, detail) diff --git a/src/change_processor.rs b/src/change_processor.rs index a131d325..ebf307fd 100644 --- a/src/change_processor.rs +++ b/src/change_processor.rs @@ -1,6 +1,6 @@ use std::sync::{Arc, Mutex}; -use crossbeam_channel::{select, Sender}; +use crossbeam_channel::{select, Receiver, Sender}; use jod_thread::JoinHandle; use rbx_dom_weak::RbxId; @@ -42,8 +42,9 @@ impl ChangeProcessor { /// outbound message queue. pub fn start( tree: Arc>, - message_queue: Arc>, vfs: Arc>, + message_queue: Arc>, + tree_mutation_receiver: Receiver, ) -> Self { let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1); let vfs_receiver = vfs.change_receiver(); @@ -72,6 +73,9 @@ impl ChangeProcessor { recv(vfs_receiver) -> event => { task.handle_vfs_event(event.unwrap()); }, + recv(tree_mutation_receiver) -> patch_set => { + task.handle_tree_event(patch_set.unwrap()); + }, recv(shutdown_receiver) -> _ => { log::trace!("ChangeProcessor shutdown signal received..."); break; @@ -168,6 +172,20 @@ impl JobThreadContext { // just made. self.message_queue.push_messages(&applied_patches); } + + fn handle_tree_event(&self, patch_set: PatchSet) { + log::trace!("Applying PatchSet from client: {:#?}", patch_set); + + // TODO: Calculate a corresponding VFS patch and apply that instead? + + let applied_patch = { + let mut tree = self.tree.lock().unwrap(); + + apply_patch_set(&mut tree, patch_set) + }; + + self.message_queue.push_messages(&[applied_patch]); + } } fn compute_and_apply_changes( diff --git a/src/serve_session.rs b/src/serve_session.rs index 7a90d6b0..34f49ffa 100644 --- a/src/serve_session.rs +++ b/src/serve_session.rs @@ -5,13 +5,15 @@ use std::{ time::Instant, }; +use crossbeam_channel::Sender; + use crate::{ change_processor::ChangeProcessor, common_setup, message_queue::MessageQueue, project::Project, session_id::SessionId, - snapshot::{AppliedPatchSet, RojoTree}, + snapshot::{AppliedPatchSet, PatchSet, RojoTree}, vfs::{Vfs, VfsFetcher}, }; @@ -23,6 +25,20 @@ use crate::{ /// future. `ServeSession` would be roughly the right interface to expose for /// those cases. pub struct ServeSession { + /// The object responsible for listening to changes from the in-memory + /// filesystem, applying them, updating the Roblox instance tree, and + /// routing messages through the session's message queue to any connected + /// clients. + /// + /// SHOULD BE DROPPED FIRST! ServeSession and ChangeProcessor communicate + /// with eachother via channels. If ServeSession hangs up those channels + /// before dropping the ChangeProcessor, its thread will panic with a + /// RecvError, causing the main thread to panic on drop. + /// + /// Allowed to be unused because it has side effects when dropped. + #[allow(unused)] + change_processor: ChangeProcessor, + /// When the serve session was started. Used only for user-facing /// diagnostics. start_time: Instant, @@ -61,11 +77,9 @@ pub struct ServeSession { /// to be applied. message_queue: Arc>, - /// The object responsible for listening to changes from the in-memory - /// filesystem, applying them, updating the Roblox instance tree, and - /// routing messages through the session's message queue to any connected - /// clients. - _change_processor: ChangeProcessor, + /// A channel to send mutation requests on. These will be handled by the + /// ChangeProcessor and trigger changes in the tree. + tree_mutation_sender: Sender, } /// Methods that need thread-safety bounds on VfsFetcher are limited to this @@ -94,21 +108,25 @@ impl ServeSession { let message_queue = Arc::new(message_queue); let vfs = Arc::new(vfs); + let (tree_mutation_sender, tree_mutation_receiver) = crossbeam_channel::unbounded(); + log::trace!("Starting ChangeProcessor"); let change_processor = ChangeProcessor::start( Arc::clone(&tree), - Arc::clone(&message_queue), Arc::clone(&vfs), + Arc::clone(&message_queue), + tree_mutation_receiver, ); Self { + change_processor, start_time, session_id, root_project, tree, message_queue, + tree_mutation_sender, vfs, - _change_processor: change_processor, } } } @@ -122,6 +140,10 @@ impl ServeSession { self.tree.lock().unwrap() } + pub fn tree_mutation_sender(&self) -> Sender { + self.tree_mutation_sender.clone() + } + pub fn vfs(&self) -> &Vfs { &self.vfs } diff --git a/src/web/api.rs b/src/web/api.rs index 9a81ef00..0ddb21d6 100644 --- a/src/web/api.rs +++ b/src/web/api.rs @@ -3,19 +3,20 @@ use std::{collections::HashMap, sync::Arc}; -use futures::Future; +use futures::{Future, Stream}; use hyper::{service::Service, Body, Method, Request, StatusCode}; use rbx_dom_weak::RbxId; use crate::{ serve_session::ServeSession, + snapshot::{PatchSet, PatchUpdate}, vfs::VfsFetcher, web::{ interface::{ ErrorResponse, Instance, InstanceMetadata as WebInstanceMetadata, InstanceUpdate, - ReadResponse, ServerInfoResponse, SubscribeMessage, SubscribeResponse, - PROTOCOL_VERSION, SERVER_VERSION, + ReadResponse, ServerInfoResponse, SubscribeMessage, SubscribeResponse, WriteRequest, + WriteResponse, PROTOCOL_VERSION, SERVER_VERSION, }, util::{json, json_ok}, }, @@ -39,6 +40,11 @@ impl Service for ApiService { (&Method::GET, path) if path.starts_with("/api/subscribe/") => { self.handle_api_subscribe(request) } + + (&Method::POST, "/api/write") if cfg!(feature = "unstable_two_way_sync") => { + self.handle_api_write(request) + } + (_method, path) => json( ErrorResponse::not_found(format!("Route not found: {}", path)), StatusCode::NOT_FOUND, @@ -145,6 +151,52 @@ impl ApiService { })) } + fn handle_api_write(&self, request: Request) -> ::Future { + let session_id = self.serve_session.session_id(); + let tree_mutation_sender = self.serve_session.tree_mutation_sender(); + + Box::new(request.into_body().concat2().and_then(move |body| { + let request: WriteRequest = match serde_json::from_slice(&body) { + Ok(request) => request, + Err(err) => { + return json( + ErrorResponse::bad_request(format!("Invalid body: {}", err)), + StatusCode::BAD_REQUEST, + ); + } + }; + + if request.session_id != session_id { + return json( + ErrorResponse::bad_request("Wrong session ID"), + StatusCode::BAD_REQUEST, + ); + } + + let updated_instances = request + .updated + .into_iter() + .map(|update| PatchUpdate { + id: update.id, + changed_class_name: update.changed_class_name, + changed_name: update.changed_name, + changed_properties: update.changed_properties, + changed_metadata: None, + }) + .collect(); + + tree_mutation_sender + .send(PatchSet { + removed_instances: Vec::new(), + added_instances: Vec::new(), + updated_instances, + }) + .unwrap(); + + json_ok(&WriteResponse { session_id }) + })) + } + fn handle_api_read(&self, request: Request) -> ::Future { let argument = &request.uri().path()["/api/read/".len()..]; let requested_ids: Option> = argument.split(',').map(RbxId::parse_str).collect(); diff --git a/src/web/interface.rs b/src/web/interface.rs index 01cb62c3..dc315ca7 100644 --- a/src/web/interface.rs +++ b/src/web/interface.rs @@ -100,6 +100,23 @@ pub struct ReadResponse<'a> { pub instances: HashMap>, } +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WriteRequest { + pub session_id: SessionId, + pub removed: Vec, + + #[serde(default)] + pub added: HashMap, + pub updated: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WriteResponse { + pub session_id: SessionId, +} + /// Response body from /api/subscribe/{cursor} #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")]