merge impl-v2: server

This commit is contained in:
Lucien Greathouse
2018-06-10 22:59:04 -07:00
parent e30545c132
commit ec1f9bd706
35 changed files with 1643 additions and 1207 deletions

View File

@@ -1,225 +1,212 @@
use std::io::Read;
use std::sync::{Arc, Mutex};
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::{mpsc, RwLock, Arc};
use rouille;
use serde;
use serde_json;
use rouille::{self, Request, Response};
use id::Id;
use message_session::{MessageSession, Message};
use project::Project;
use vfs::{VfsSession, VfsChange};
use rbx::RbxInstance;
use plugin::PluginChain;
static MAX_BODY_SIZE: usize = 25 * 1024 * 1024; // 25 MiB
use rbx_session::RbxSession;
use session::Session;
/// The set of configuration the web server needs to start.
pub struct WebConfig {
pub port: u64,
pub verbose: bool,
pub project: Project,
pub server_id: u64,
pub rbx_session: Arc<RwLock<RbxSession>>,
pub message_session: MessageSession,
}
#[derive(Debug, Serialize)]
impl WebConfig {
pub fn from_session(server_id: u64, port: u64, session: &Session) -> WebConfig {
WebConfig {
port,
server_id,
project: session.project.clone(),
rbx_session: session.get_rbx_session(),
message_session: session.get_message_session(),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ServerInfo<'a> {
pub struct ServerInfoResponse<'a> {
pub server_id: &'a str,
pub server_version: &'a str,
pub protocol_version: u64,
pub partitions: HashMap<String, Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadAllResponse<'a> {
pub server_id: &'a str,
pub message_cursor: i32,
pub instances: Cow<'a, HashMap<Id, RbxInstance>>,
pub partition_instances: Cow<'a, HashMap<String, Id>>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadResponse<'a> {
pub server_id: &'a str,
pub message_cursor: i32,
pub instances: HashMap<Id, Cow<'a, RbxInstance>>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeResponse<'a> {
pub server_id: &'a str,
pub message_cursor: i32,
pub messages: Cow<'a, [Message]>,
}
pub struct Server {
config: WebConfig,
server_version: &'static str,
protocol_version: u64,
server_id: &'a str,
project: &'a Project,
current_time: f64,
server_id: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ReadResult<'a> {
items: Vec<Option<RbxInstance>>,
server_id: &'a str,
current_time: f64,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ChangesResult<'a> {
changes: &'a [VfsChange],
server_id: &'a str,
current_time: f64,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct WriteSpecifier {
route: String,
item: RbxInstance,
}
fn json<T: serde::Serialize>(value: T) -> rouille::Response {
let data = serde_json::to_string(&value).unwrap();
rouille::Response::from_data("application/json", data)
}
/// Pulls text that may be JSON out of a Rouille Request object.
///
/// Doesn't do any actual parsing -- all this method does is verify the content
/// type of the request and read the request's body.
fn read_json_text(request: &rouille::Request) -> Option<String> {
// Bail out if the request body isn't marked as JSON
match request.header("Content-Type") {
Some(header) => if !header.starts_with("application/json") {
return None;
},
None => return None,
impl Server {
pub fn new(config: WebConfig) -> Server {
Server {
server_version: env!("CARGO_PKG_VERSION"),
server_id: config.server_id.to_string(),
config,
}
}
let body = match request.data() {
Some(v) => v,
None => return None,
};
// Allocate a buffer and read up to MAX_BODY_SIZE+1 bytes into it.
let mut out = Vec::new();
match body.take(MAX_BODY_SIZE.saturating_add(1) as u64).read_to_end(&mut out) {
Ok(_) => {},
Err(_) => return None,
}
// If the body was too big (MAX_BODY_SIZE+1), we abort instead of trying to
// process it.
if out.len() > MAX_BODY_SIZE {
return None;
}
let parsed = match String::from_utf8(out) {
Ok(v) => v,
Err(_) => return None,
};
Some(parsed)
}
/// Reads the body out of a Rouille Request and attempts to turn it into JSON.
fn read_json<T>(request: &rouille::Request) -> Option<T>
where
T: serde::de::DeserializeOwned,
{
let body = match read_json_text(&request) {
Some(v) => v,
None => return None,
};
let parsed = match serde_json::from_str(&body) {
Ok(v) => v,
Err(_) => return None,
};
// TODO: Change return type to some sort of Result
Some(parsed)
}
/// Start the Rojo web server and park our current thread.
pub fn start(config: WebConfig, project: Project, plugin_chain: &'static PluginChain, vfs: Arc<Mutex<VfsSession>>) {
let address = format!("localhost:{}", config.port);
let server_id = config.server_id.to_string();
rouille::start_server(address, move |request| {
pub fn handle_request(&self, request: &Request) -> Response {
router!(request,
(GET) (/) => {
Response::text("Rojo up and running!")
},
(GET) (/api/rojo) => {
// Get a summary of information about the server.
let current_time = {
let vfs = vfs.lock().unwrap();
let mut partitions = HashMap::new();
vfs.current_time()
};
json(ServerInfo {
server_version: env!("CARGO_PKG_VERSION"),
protocol_version: 1,
server_id: &server_id,
project: &project,
current_time,
})
},
(GET) (/changes/{ last_time: f64 }) => {
// Get the list of changes since the given time.
let vfs = vfs.lock().unwrap();
let current_time = vfs.current_time();
let changes = vfs.changes_since(last_time);
json(ChangesResult {
changes,
server_id: &server_id,
current_time,
})
},
(POST) (/read) => {
// Read some instances from the server according to a JSON
// format body.
let read_request: Vec<Vec<String>> = match read_json(&request) {
Some(v) => v,
None => return rouille::Response::empty_400(),
};
// Read the files off of the filesystem that the client
// requested.
let (items, current_time) = {
let vfs = vfs.lock().unwrap();
let current_time = vfs.current_time();
let mut items = Vec::new();
for route in &read_request {
match vfs.read(&route) {
Ok(v) => items.push(Some(v)),
Err(_) => items.push(None),
}
}
(items, current_time)
};
// Transform all of our VfsItem objects into Roblox instances
// the client can use.
let rbx_items = items
.iter()
.map(|item| {
match *item {
Some(ref item) => plugin_chain.transform_file(item),
None => None,
}
})
.collect::<Vec<_>>();
if config.verbose {
println!("Got read request: {:?}", read_request);
println!("Responding with:\n\t{:?}", rbx_items);
for partition in self.config.project.partitions.values() {
partitions.insert(partition.name.clone(), partition.target.clone());
}
json(ReadResult {
server_id: &server_id,
items: rbx_items,
current_time,
Response::json(&ServerInfoResponse {
server_version: self.server_version,
protocol_version: 2,
server_id: &self.server_id,
partitions: partitions,
})
},
(POST) (/write) => {
// Not yet implemented.
(GET) (/api/subscribe/{ cursor: i32 }) => {
// Retrieve any messages past the given cursor index, and if
// there weren't any, subscribe to receive any new messages.
let _write_request: Vec<WriteSpecifier> = match read_json(&request) {
Some(v) => v,
None => return rouille::Response::empty_400(),
};
// Did the client miss any messages since the last subscribe?
{
let messages = self.config.message_session.messages.read().unwrap();
rouille::Response::empty_404()
if cursor > messages.len() as i32 {
return Response::json(&SubscribeResponse {
server_id: &self.server_id,
messages: Cow::Borrowed(&[]),
message_cursor: messages.len() as i32 - 1,
});
}
if cursor < messages.len() as i32 - 1 {
let new_messages = &messages[(cursor + 1) as usize..];
let new_cursor = cursor + new_messages.len() as i32;
return Response::json(&SubscribeResponse {
server_id: &self.server_id,
messages: Cow::Borrowed(new_messages),
message_cursor: new_cursor,
});
}
}
let (tx, rx) = mpsc::channel();
let sender_id = self.config.message_session.subscribe(tx);
match rx.recv() {
Ok(_) => (),
Err(_) => return Response::text("error!").with_status_code(500),
}
self.config.message_session.unsubscribe(sender_id);
{
let messages = self.config.message_session.messages.read().unwrap();
let new_messages = &messages[(cursor + 1) as usize..];
let new_cursor = cursor + new_messages.len() as i32;
Response::json(&SubscribeResponse {
server_id: &self.server_id,
messages: Cow::Borrowed(new_messages),
message_cursor: new_cursor,
})
}
},
_ => rouille::Response::empty_404()
(GET) (/api/read_all) => {
let rbx_session = self.config.rbx_session.read().unwrap();
let message_cursor = self.config.message_session.get_message_cursor();
Response::json(&ReadAllResponse {
server_id: &self.server_id,
message_cursor,
instances: Cow::Borrowed(rbx_session.tree.get_all_instances()),
partition_instances: Cow::Borrowed(&rbx_session.partition_instances),
})
},
(GET) (/api/read/{ id_list: String }) => {
let requested_ids = id_list
.split(",")
.map(str::parse::<Id>)
.collect::<Result<Vec<Id>, _>>();
let requested_ids = match requested_ids {
Ok(v) => v,
Err(_) => return rouille::Response::text("Malformed ID list").with_status_code(400),
};
let rbx_session = self.config.rbx_session.read().unwrap();
let message_cursor = self.config.message_session.get_message_cursor();
let mut instances = HashMap::new();
for requested_id in &requested_ids {
rbx_session.tree.get_instance(*requested_id, &mut instances);
}
Response::json(&ReadResponse {
server_id: &self.server_id,
message_cursor,
instances,
})
},
_ => Response::empty_404()
)
});
}
}
/// Start the Rojo web server, taking over the current thread.
#[allow(unreachable_code)]
pub fn start(config: WebConfig) {
let address = format!("localhost:{}", config.port);
let server = Server::new(config);
rouille::start_server(address, move |request| server.handle_request(request));
}