WIP: Epiphany Refactor (#85)

This commit is contained in:
Lucien Greathouse
2018-08-26 01:03:53 -07:00
committed by GitHub
parent 80b9b7594b
commit 72bc77f1d5
52 changed files with 1145 additions and 2157 deletions

View File

@@ -1,197 +1,152 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::{mpsc, RwLock, Arc};
use std::{
borrow::Cow,
collections::HashMap,
sync::{mpsc, Arc},
};
use rouille::{self, Request, Response};
use id::Id;
use message_session::{MessageSession, Message};
use project::Project;
use rbx::RbxInstance;
use rbx_session::RbxSession;
use session::Session;
/// The set of configuration the web server needs to start.
pub struct WebConfig {
pub port: u64,
pub project: Project,
pub server_id: u64,
pub rbx_session: Arc<RwLock<RbxSession>>,
pub message_session: MessageSession,
}
impl WebConfig {
pub fn from_session(server_id: u64, port: u64, session: &Session) -> WebConfig {
WebConfig {
port,
server_id,
project: session.project.clone(),
rbx_session: session.get_rbx_session(),
message_session: session.get_message_session(),
}
}
}
use ::{
id::Id,
message_queue::Message,
project::Project,
rbx::RbxInstance,
session::Session,
};
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ServerInfoResponse<'a> {
pub server_id: &'a str,
pub session_id: &'a str,
pub server_version: &'a str,
pub protocol_version: u64,
pub partitions: HashMap<String, Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadAllResponse<'a> {
pub server_id: &'a str,
pub message_cursor: i32,
pub instances: Cow<'a, HashMap<Id, RbxInstance>>,
pub partition_instances: Cow<'a, HashMap<String, Id>>,
pub root_instance_id: Id,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReadResponse<'a> {
pub server_id: &'a str,
pub message_cursor: i32,
pub session_id: &'a str,
pub message_cursor: u32,
pub instances: HashMap<Id, Cow<'a, RbxInstance>>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeResponse<'a> {
pub server_id: &'a str,
pub message_cursor: i32,
pub session_id: &'a str,
pub message_cursor: u32,
pub messages: Cow<'a, [Message]>,
}
pub struct Server {
config: WebConfig,
session: Arc<Session>,
server_version: &'static str,
server_id: String,
}
impl Server {
pub fn new(config: WebConfig) -> Server {
pub fn new(session: Arc<Session>) -> Server {
Server {
session: session,
server_version: env!("CARGO_PKG_VERSION"),
server_id: config.server_id.to_string(),
config,
}
}
#[allow(unreachable_code)]
pub fn handle_request(&self, request: &Request) -> Response {
router!(request,
(GET) (/) => {
Response::text("Rojo up and running!")
Response::text("Rojo is up and running!")
},
(GET) (/api/rojo) => {
// Get a summary of information about the server.
let mut partitions = HashMap::new();
for partition in self.config.project.partitions.values() {
partitions.insert(partition.name.clone(), partition.target.clone());
}
let tree = self.session.tree.read().unwrap();
Response::json(&ServerInfoResponse {
server_version: self.server_version,
protocol_version: 2,
server_id: &self.server_id,
partitions: partitions,
session_id: &self.session.session_id,
root_instance_id: tree.root_instance_id,
})
},
(GET) (/api/subscribe/{ cursor: i32 }) => {
(GET) (/api/subscribe/{ cursor: u32 }) => {
// Retrieve any messages past the given cursor index, and if
// there weren't any, subscribe to receive any new messages.
let message_queue = Arc::clone(&self.session.message_queue);
// Did the client miss any messages since the last subscribe?
{
let messages = self.config.message_session.messages.read().unwrap();
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);
if cursor > messages.len() as i32 {
if new_messages.len() > 0 {
return Response::json(&SubscribeResponse {
server_id: &self.server_id,
session_id: &self.session.session_id,
messages: Cow::Borrowed(&[]),
message_cursor: messages.len() as i32 - 1,
});
}
if cursor < messages.len() as i32 - 1 {
let new_messages = &messages[(cursor + 1) as usize..];
let new_cursor = cursor + new_messages.len() as i32;
return Response::json(&SubscribeResponse {
server_id: &self.server_id,
messages: Cow::Borrowed(new_messages),
message_cursor: new_cursor,
});
})
}
}
let (tx, rx) = mpsc::channel();
let sender_id = self.config.message_session.subscribe(tx);
let sender_id = message_queue.subscribe(tx);
match rx.recv() {
Ok(_) => (),
Err(_) => return Response::text("error!").with_status_code(500),
}
self.config.message_session.unsubscribe(sender_id);
message_queue.unsubscribe(sender_id);
{
let messages = self.config.message_session.messages.read().unwrap();
let new_messages = &messages[(cursor + 1) as usize..];
let new_cursor = cursor + new_messages.len() as i32;
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);
Response::json(&SubscribeResponse {
server_id: &self.server_id,
messages: Cow::Borrowed(new_messages),
return Response::json(&SubscribeResponse {
session_id: &self.session.session_id,
messages: Cow::Owned(new_messages),
message_cursor: new_cursor,
})
}
},
(GET) (/api/read_all) => {
let rbx_session = self.config.rbx_session.read().unwrap();
let message_cursor = self.config.message_session.get_message_cursor();
Response::json(&ReadAllResponse {
server_id: &self.server_id,
message_cursor,
instances: Cow::Borrowed(rbx_session.tree.get_all_instances()),
partition_instances: Cow::Borrowed(&rbx_session.partition_instances),
})
},
(GET) (/api/read/{ id_list: String }) => {
let requested_ids = id_list
let message_queue = Arc::clone(&self.session.message_queue);
let requested_ids: Result<Vec<Id>, _> = id_list
.split(",")
.map(str::parse::<Id>)
.collect::<Result<Vec<Id>, _>>();
.map(str::parse)
.collect();
let requested_ids = match requested_ids {
Ok(v) => v,
Ok(id) => id,
Err(_) => return rouille::Response::text("Malformed ID list").with_status_code(400),
};
let rbx_session = self.config.rbx_session.read().unwrap();
let tree = self.session.tree.read().unwrap();
let message_cursor = self.config.message_session.get_message_cursor();
let message_cursor = message_queue.get_message_cursor();
let mut instances = HashMap::new();
for requested_id in &requested_ids {
rbx_session.tree.get_instance_and_descendants(*requested_id, &mut instances);
for &requested_id in &requested_ids {
match tree.get_instance(requested_id) {
Some(instance) => {
instances.insert(instance.get_id(), Cow::Borrowed(instance));
for descendant in tree.iter_descendants(requested_id) {
instances.insert(descendant.get_id(), Cow::Borrowed(descendant));
}
},
None => {},
}
}
Response::json(&ReadResponse {
server_id: &self.server_id,
session_id: &self.session.session_id,
message_cursor,
instances,
})
@@ -200,13 +155,10 @@ impl Server {
_ => Response::empty_404()
)
}
}
/// Start the Rojo web server, taking over the current thread.
#[allow(unreachable_code)]
pub fn start(config: WebConfig) {
let address = format!("localhost:{}", config.port);
let server = Server::new(config);
pub fn listen(self, port: u64) {
let address = format!("localhost:{}", port);
rouille::start_server(address, move |request| server.handle_request(request));
}
rouille::start_server(address, move |request| self.handle_request(request));
}
}