Two way sync V0 (#282)

* Unfinished two-way sync API

* In-memory two-way sync complete

* Move PatchSet application into ChangeProcessor thread, where it can be synchronous

* Stop InstanceMap's signals when a ServeSession terminates

* Apply patch in ChangeProcessor

* Feature flag

* Fix error in ChangeProcessor due to wrong drop order
This commit is contained in:
Lucien Greathouse
2019-12-20 14:24:28 -08:00
committed by GitHub
parent 26e2e81188
commit a398338c02
9 changed files with 156 additions and 20 deletions

View File

@@ -16,6 +16,9 @@ exclude = [
[features]
default = []
# Turn on the server half of Rojo's unstable two-way sync feature.
unstable_two_way_sync = []
# Enable this feature to live-reload assets from the web UI.
dev-live-assets = []

View File

@@ -156,11 +156,22 @@ function ApiContext:read(ids)
end
function ApiContext:write(patch)
local url = ("%s/write"):format(self.__baseUrl)
local body = Http.jsonEncode({
local url = ("%s/api/write"):format(self.__baseUrl)
local body = {
sessionId = self.__sessionId,
patch = patch,
})
removed = patch.removed,
updated = patch.updated,
}
-- Only add the 'added' field if the table is non-empty, or else Roblox's
-- JSON implementation will turn the table into an array instead of an
-- object, causing API validation to fail.
if next(patch.added) ~= nil then
body.added = patch.added
end
body = Http.jsonEncode(body)
return Http.post(url, body)
:andThen(rejectFailedRequests)

View File

@@ -25,7 +25,7 @@ local VALUES = {
[Environment.Test] = true,
},
},
ExperimentalTwoWaySync = {
UnstableTwoWaySync = {
type = "BoolValue",
values = {
[Environment.User] = false,
@@ -141,7 +141,7 @@ function DevSettings:shouldTypecheck()
end
function DevSettings:twoWaySyncEnabled()
return getValue("ExperimentalTwoWaySync")
return getValue("UnstableTwoWaySync")
end
function _G.ROJO_DEV_CREATE()

View File

@@ -20,6 +20,16 @@ function InstanceMap.new(onInstanceChanged)
return setmetatable(self, InstanceMap)
end
--[[
Disconnect all connections and release all instance references.
]]
function InstanceMap:stop()
-- I think this is safe.
for instance in pairs(self.fromInstances) do
self:removeInstance(instance)
end
end
function InstanceMap:__fmtDebug(output)
output:writeLine("InstanceMap {{")
output:indent()
@@ -145,7 +155,9 @@ function InstanceMap:__disconnectSignals(instance)
local signals = self.instancesToSignal[instance]
if signals ~= nil then
-- In the general case, we avoid
-- In most cases, we only have a single signal, so we avoid keeping
-- around the extra table. ValueBase objects force us to use multiple
-- signals to emulate the Instance.Changed event, however.
if typeof(signals) == "table" then
for _, signal in ipairs(signals) do
signal:Disconnect()

View File

@@ -189,6 +189,7 @@ end
function ServeSession:__stopInternal(err)
self:__setStatus(Status.Disconnected, err)
self.__apiContext:disconnect()
self.__instanceMap:stop()
end
function ServeSession:__setStatus(status, detail)

View File

@@ -1,6 +1,6 @@
use std::sync::{Arc, Mutex};
use crossbeam_channel::{select, Sender};
use crossbeam_channel::{select, Receiver, Sender};
use jod_thread::JoinHandle;
use rbx_dom_weak::RbxId;
@@ -42,8 +42,9 @@ impl ChangeProcessor {
/// outbound message queue.
pub fn start<F: VfsFetcher + Send + Sync + 'static>(
tree: Arc<Mutex<RojoTree>>,
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
vfs: Arc<Vfs<F>>,
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
tree_mutation_receiver: Receiver<PatchSet>,
) -> Self {
let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1);
let vfs_receiver = vfs.change_receiver();
@@ -72,6 +73,9 @@ impl ChangeProcessor {
recv(vfs_receiver) -> event => {
task.handle_vfs_event(event.unwrap());
},
recv(tree_mutation_receiver) -> patch_set => {
task.handle_tree_event(patch_set.unwrap());
},
recv(shutdown_receiver) -> _ => {
log::trace!("ChangeProcessor shutdown signal received...");
break;
@@ -168,6 +172,20 @@ impl<F: VfsFetcher> JobThreadContext<F> {
// just made.
self.message_queue.push_messages(&applied_patches);
}
fn handle_tree_event(&self, patch_set: PatchSet) {
log::trace!("Applying PatchSet from client: {:#?}", patch_set);
// TODO: Calculate a corresponding VFS patch and apply that instead?
let applied_patch = {
let mut tree = self.tree.lock().unwrap();
apply_patch_set(&mut tree, patch_set)
};
self.message_queue.push_messages(&[applied_patch]);
}
}
fn compute_and_apply_changes<F: VfsFetcher>(

View File

@@ -5,13 +5,15 @@ use std::{
time::Instant,
};
use crossbeam_channel::Sender;
use crate::{
change_processor::ChangeProcessor,
common_setup,
message_queue::MessageQueue,
project::Project,
session_id::SessionId,
snapshot::{AppliedPatchSet, RojoTree},
snapshot::{AppliedPatchSet, PatchSet, RojoTree},
vfs::{Vfs, VfsFetcher},
};
@@ -23,6 +25,20 @@ use crate::{
/// future. `ServeSession` would be roughly the right interface to expose for
/// those cases.
pub struct ServeSession<F> {
/// The object responsible for listening to changes from the in-memory
/// filesystem, applying them, updating the Roblox instance tree, and
/// routing messages through the session's message queue to any connected
/// clients.
///
/// SHOULD BE DROPPED FIRST! ServeSession and ChangeProcessor communicate
/// with eachother via channels. If ServeSession hangs up those channels
/// before dropping the ChangeProcessor, its thread will panic with a
/// RecvError, causing the main thread to panic on drop.
///
/// Allowed to be unused because it has side effects when dropped.
#[allow(unused)]
change_processor: ChangeProcessor,
/// When the serve session was started. Used only for user-facing
/// diagnostics.
start_time: Instant,
@@ -61,11 +77,9 @@ pub struct ServeSession<F> {
/// to be applied.
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
/// The object responsible for listening to changes from the in-memory
/// filesystem, applying them, updating the Roblox instance tree, and
/// routing messages through the session's message queue to any connected
/// clients.
_change_processor: ChangeProcessor,
/// A channel to send mutation requests on. These will be handled by the
/// ChangeProcessor and trigger changes in the tree.
tree_mutation_sender: Sender<PatchSet>,
}
/// Methods that need thread-safety bounds on VfsFetcher are limited to this
@@ -94,21 +108,25 @@ impl<F: VfsFetcher + Send + Sync + 'static> ServeSession<F> {
let message_queue = Arc::new(message_queue);
let vfs = Arc::new(vfs);
let (tree_mutation_sender, tree_mutation_receiver) = crossbeam_channel::unbounded();
log::trace!("Starting ChangeProcessor");
let change_processor = ChangeProcessor::start(
Arc::clone(&tree),
Arc::clone(&message_queue),
Arc::clone(&vfs),
Arc::clone(&message_queue),
tree_mutation_receiver,
);
Self {
change_processor,
start_time,
session_id,
root_project,
tree,
message_queue,
tree_mutation_sender,
vfs,
_change_processor: change_processor,
}
}
}
@@ -122,6 +140,10 @@ impl<F: VfsFetcher> ServeSession<F> {
self.tree.lock().unwrap()
}
pub fn tree_mutation_sender(&self) -> Sender<PatchSet> {
self.tree_mutation_sender.clone()
}
pub fn vfs(&self) -> &Vfs<F> {
&self.vfs
}

View File

@@ -3,19 +3,20 @@
use std::{collections::HashMap, sync::Arc};
use futures::Future;
use futures::{Future, Stream};
use hyper::{service::Service, Body, Method, Request, StatusCode};
use rbx_dom_weak::RbxId;
use crate::{
serve_session::ServeSession,
snapshot::{PatchSet, PatchUpdate},
vfs::VfsFetcher,
web::{
interface::{
ErrorResponse, Instance, InstanceMetadata as WebInstanceMetadata, InstanceUpdate,
ReadResponse, ServerInfoResponse, SubscribeMessage, SubscribeResponse,
PROTOCOL_VERSION, SERVER_VERSION,
ReadResponse, ServerInfoResponse, SubscribeMessage, SubscribeResponse, WriteRequest,
WriteResponse, PROTOCOL_VERSION, SERVER_VERSION,
},
util::{json, json_ok},
},
@@ -39,6 +40,11 @@ impl<F: VfsFetcher> Service for ApiService<F> {
(&Method::GET, path) if path.starts_with("/api/subscribe/") => {
self.handle_api_subscribe(request)
}
(&Method::POST, "/api/write") if cfg!(feature = "unstable_two_way_sync") => {
self.handle_api_write(request)
}
(_method, path) => json(
ErrorResponse::not_found(format!("Route not found: {}", path)),
StatusCode::NOT_FOUND,
@@ -145,6 +151,52 @@ impl<F: VfsFetcher> ApiService<F> {
}))
}
fn handle_api_write(&self, request: Request<Body>) -> <Self as Service>::Future {
let session_id = self.serve_session.session_id();
let tree_mutation_sender = self.serve_session.tree_mutation_sender();
Box::new(request.into_body().concat2().and_then(move |body| {
let request: WriteRequest = match serde_json::from_slice(&body) {
Ok(request) => request,
Err(err) => {
return json(
ErrorResponse::bad_request(format!("Invalid body: {}", err)),
StatusCode::BAD_REQUEST,
);
}
};
if request.session_id != session_id {
return json(
ErrorResponse::bad_request("Wrong session ID"),
StatusCode::BAD_REQUEST,
);
}
let updated_instances = request
.updated
.into_iter()
.map(|update| PatchUpdate {
id: update.id,
changed_class_name: update.changed_class_name,
changed_name: update.changed_name,
changed_properties: update.changed_properties,
changed_metadata: None,
})
.collect();
tree_mutation_sender
.send(PatchSet {
removed_instances: Vec::new(),
added_instances: Vec::new(),
updated_instances,
})
.unwrap();
json_ok(&WriteResponse { session_id })
}))
}
fn handle_api_read(&self, request: Request<Body>) -> <Self as Service>::Future {
let argument = &request.uri().path()["/api/read/".len()..];
let requested_ids: Option<Vec<RbxId>> = argument.split(',').map(RbxId::parse_str).collect();

View File

@@ -100,6 +100,23 @@ pub struct ReadResponse<'a> {
pub instances: HashMap<RbxId, Instance<'a>>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WriteRequest {
pub session_id: SessionId,
pub removed: Vec<RbxId>,
#[serde(default)]
pub added: HashMap<RbxId, ()>,
pub updated: Vec<InstanceUpdate>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WriteResponse {
pub session_id: SessionId,
}
/// Response body from /api/subscribe/{cursor}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]