Major Subsystem Rewrite (Reconciler Mk5) (#217)

This commit is contained in:
Lucien Greathouse
2019-08-27 15:00:37 -07:00
committed by GitHub
parent 8e8291a0bd
commit fea303ac8b
80 changed files with 3843 additions and 5609 deletions

View File

@@ -2,16 +2,15 @@
//! JSON.
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
collections::HashSet,
sync::Arc,
};
use futures::{
future::{self, IntoFuture},
future,
Future,
sync::oneshot,
};
use hyper::{
service::Service,
header,
@@ -22,43 +21,15 @@ use hyper::{
Response,
};
use serde::{Serialize, Deserialize};
use rbx_dom_weak::{RbxId, RbxInstance};
use rbx_dom_weak::RbxId;
use crate::{
live_session::LiveSession,
serve_session::ServeSession,
session_id::SessionId,
snapshot_reconciler::InstanceChanges,
rbx_session::{MetadataPerInstance},
};
/// Contains the instance metadata relevant to Rojo clients.
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PublicInstanceMetadata {
ignore_unknown_instances: bool,
}
impl PublicInstanceMetadata {
pub fn from_session_metadata(meta: &MetadataPerInstance) -> PublicInstanceMetadata {
PublicInstanceMetadata {
ignore_unknown_instances: meta.ignore_unknown_instances,
}
}
}
/// Used to attach metadata specific to Rojo to instances, which come from the
/// rbx_dom_weak crate.
///
/// Both fields are wrapped in Cow in order to make owned-vs-borrowed simpler
/// for tests.
#[derive(Debug, Serialize, Deserialize)]
pub struct InstanceWithMetadata<'a> {
#[serde(flatten)]
pub instance: Cow<'a, RbxInstance>,
#[serde(rename = "Metadata")]
pub metadata: Option<PublicInstanceMetadata>,
}
const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: u64 = 3;
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -67,23 +38,23 @@ pub struct ServerInfoResponse<'a> {
pub server_version: &'a str,
pub protocol_version: u64,
pub expected_place_ids: Option<HashSet<u64>>,
pub root_instance_id: RbxId,
// pub root_instance_id: RbxId,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadResponse<'a> {
pub struct ReadResponse {
pub session_id: SessionId,
pub message_cursor: u32,
pub instances: HashMap<RbxId, InstanceWithMetadata<'a>>,
// pub message_cursor: u32,
// pub instances: HashMap<RbxId, InstanceWithMetadata<'a>>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeResponse<'a> {
pub struct SubscribeResponse {
pub session_id: SessionId,
pub message_cursor: u32,
pub messages: Cow<'a, [InstanceChanges]>,
// pub message_cursor: u32,
// pub messages: Cow<'a, [InstanceChanges]>,
}
fn response_json<T: serde::Serialize>(value: T) -> Response<Body> {
@@ -91,11 +62,11 @@ fn response_json<T: serde::Serialize>(value: T) -> Response<Body> {
Ok(v) => v,
Err(err) => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(err.to_string()))
.unwrap();
},
}
};
Response::builder()
@@ -105,8 +76,7 @@ fn response_json<T: serde::Serialize>(value: T) -> Response<Body> {
}
pub struct ApiService {
live_session: Arc<LiveSession>,
server_version: &'static str,
serve_session: Arc<ServeSession>,
}
impl Service for ApiService {
@@ -135,24 +105,19 @@ impl Service for ApiService {
}
impl ApiService {
pub fn new(live_session: Arc<LiveSession>) -> ApiService {
pub fn new(serve_session: Arc<ServeSession>) -> ApiService {
ApiService {
live_session,
server_version: env!("CARGO_PKG_VERSION"),
serve_session,
}
}
/// Get a summary of information about the server
fn handle_api_rojo(&self) -> Response<Body> {
let rbx_session = self.live_session.rbx_session.lock().unwrap();
let tree = rbx_session.get_tree();
response_json(&ServerInfoResponse {
server_version: self.server_version,
protocol_version: 2,
session_id: self.live_session.session_id(),
expected_place_ids: self.live_session.serve_place_ids().clone(),
root_instance_id: tree.get_root_id(),
server_version: SERVER_VERSION,
protocol_version: PROTOCOL_VERSION,
session_id: self.serve_session.session_id(),
expected_place_ids: self.serve_session.serve_place_ids().map(Clone::clone),
})
}
@@ -160,7 +125,7 @@ impl ApiService {
/// there weren't any, subscribe to receive any new messages.
fn handle_api_subscribe(&self, request: Request<Body>) -> <ApiService as Service>::Future {
let argument = &request.uri().path()["/api/subscribe/".len()..];
let cursor: u32 = match argument.parse() {
let _cursor: u32 = match argument.parse() {
Ok(v) => v,
Err(err) => {
return Box::new(future::ok(Response::builder()
@@ -171,28 +136,9 @@ impl ApiService {
},
};
let message_queue = Arc::clone(&self.live_session.message_queue);
let session_id = self.live_session.session_id();
let (tx, rx) = oneshot::channel();
message_queue.subscribe(cursor, tx);
let result = rx.into_future()
.and_then(move |(new_cursor, new_messages)| {
Box::new(future::ok(response_json(SubscribeResponse {
session_id: session_id,
messages: Cow::Owned(new_messages),
message_cursor: new_cursor,
})))
})
.or_else(|e| {
Box::new(future::ok(Response::builder()
.status(500)
.body(Body::from(format!("Internal Error: {:?}", e)))
.unwrap()))
});
Box::new(result)
Box::new(future::ok(response_json(SubscribeResponse {
session_id: self.serve_session.session_id(),
})))
}
fn handle_api_read(&self, request: Request<Body>) -> Response<Body> {
@@ -202,9 +148,7 @@ impl ApiService {
.map(RbxId::parse_str)
.collect();
let message_queue = Arc::clone(&self.live_session.message_queue);
let requested_ids = match requested_ids {
let _requested_ids = match requested_ids {
Some(id) => id,
None => {
return Response::builder()
@@ -215,39 +159,8 @@ impl ApiService {
},
};
let rbx_session = self.live_session.rbx_session.lock().unwrap();
let tree = rbx_session.get_tree();
let message_cursor = message_queue.get_message_cursor();
let mut instances = HashMap::new();
for &requested_id in &requested_ids {
if let Some(instance) = tree.get_instance(requested_id) {
let metadata = rbx_session.get_instance_metadata(requested_id)
.map(PublicInstanceMetadata::from_session_metadata);
instances.insert(instance.get_id(), InstanceWithMetadata {
instance: Cow::Borrowed(instance),
metadata,
});
for descendant in tree.descendants(requested_id) {
let descendant_meta = rbx_session.get_instance_metadata(descendant.get_id())
.map(PublicInstanceMetadata::from_session_metadata);
instances.insert(descendant.get_id(), InstanceWithMetadata {
instance: Cow::Borrowed(descendant),
metadata: descendant_meta,
});
}
}
}
response_json(&ReadResponse {
session_id: self.live_session.session_id(),
message_cursor,
instances,
response_json(ReadResponse {
session_id: self.serve_session.session_id(),
})
}
}

View File

@@ -14,16 +14,14 @@ use hyper::{
};
use ritz::html;
use crate::{
live_session::LiveSession,
visualize::{VisualizeRbxSession, VisualizeImfs, graphviz_to_svg},
};
use crate::serve_session::ServeSession;
const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
static HOME_CSS: &str = include_str!("../../assets/index.css");
pub struct InterfaceService {
live_session: Arc<LiveSession>,
server_version: &'static str,
#[allow(unused)] // TODO: Fill out interface service
serve_session: Arc<ServeSession>,
}
impl Service for InterfaceService {
@@ -48,10 +46,9 @@ impl Service for InterfaceService {
}
impl InterfaceService {
pub fn new(live_session: Arc<LiveSession>) -> InterfaceService {
pub fn new(serve_session: Arc<ServeSession>) -> InterfaceService {
InterfaceService {
live_session,
server_version: env!("CARGO_PKG_VERSION"),
serve_session,
}
}
@@ -71,7 +68,7 @@ impl InterfaceService {
"Rojo Live Sync is up and running!"
</h1>
<h2 class="subtitle">
"Version " { self.server_version }
"Version " { SERVER_VERSION }
</h2>
<a class="docs" href="https://lpghatguy.github.io/rojo">
"Rojo Documentation"
@@ -88,34 +85,16 @@ impl InterfaceService {
}
fn handle_visualize_rbx(&self) -> Response<Body> {
let rbx_session = self.live_session.rbx_session.lock().unwrap();
let dot_source = format!("{}", VisualizeRbxSession(&rbx_session));
match graphviz_to_svg(&dot_source) {
Some(svg) => Response::builder()
.header(header::CONTENT_TYPE, "image/svg+xml")
.body(Body::from(svg))
.unwrap(),
None => Response::builder()
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(dot_source))
.unwrap(),
}
Response::builder()
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from("TODO: /visualize/rbx"))
.unwrap()
}
fn handle_visualize_imfs(&self) -> Response<Body> {
let imfs = self.live_session.imfs.lock().unwrap();
let dot_source = format!("{}", VisualizeImfs(&imfs));
match graphviz_to_svg(&dot_source) {
Some(svg) => Response::builder()
.header(header::CONTENT_TYPE, "image/svg+xml")
.body(Body::from(svg))
.unwrap(),
None => Response::builder()
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(dot_source))
.unwrap(),
}
Response::builder()
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from("TODO: /visualize/imfs"))
.unwrap()
}
}

View File

@@ -1,6 +1,4 @@
// TODO: This module needs to be public for visualize, we should move
// PublicInstanceMetadata and switch this private!
pub mod api;
mod api;
mod interface;
use std::sync::Arc;
@@ -18,9 +16,7 @@ use hyper::{
Server,
};
use crate::{
live_session::LiveSession,
};
use crate::serve_session::ServeSession;
use self::{
api::ApiService,
@@ -50,22 +46,22 @@ impl Service for RootService {
}
impl RootService {
pub fn new(live_session: Arc<LiveSession>) -> RootService {
pub fn new(serve_session: Arc<ServeSession>) -> RootService {
RootService {
api: ApiService::new(Arc::clone(&live_session)),
interface: InterfaceService::new(Arc::clone(&live_session)),
api: ApiService::new(Arc::clone(&serve_session)),
interface: InterfaceService::new(Arc::clone(&serve_session)),
}
}
}
pub struct LiveServer {
live_session: Arc<LiveSession>,
serve_session: Arc<ServeSession>,
}
impl LiveServer {
pub fn new(live_session: Arc<LiveSession>) -> LiveServer {
pub fn new(serve_session: Arc<ServeSession>) -> LiveServer {
LiveServer {
live_session,
serve_session,
}
}
@@ -75,7 +71,7 @@ impl LiveServer {
let server = Server::bind(&address)
.serve(move || {
let service: FutureResult<_, hyper::Error> =
future::ok(RootService::new(Arc::clone(&self.live_session)));
future::ok(RootService::new(Arc::clone(&self.serve_session)));
service
})
.map_err(|e| eprintln!("Server error: {}", e));