Remove Rouille and port everything to Hyper

This commit is contained in:
Lucien Greathouse
2019-02-22 15:11:27 -08:00
parent 105d8aeb6b
commit c9a663ed39
7 changed files with 132 additions and 399 deletions

View File

@@ -18,5 +18,4 @@ pub mod rbx_snapshot;
pub mod session_id;
pub mod snapshot_reconciler;
pub mod visualize;
pub mod web;
pub mod web_util;
pub mod web;

View File

@@ -10,15 +10,14 @@ use std::{
use futures::{future, Future};
use hyper::{
service::Service,
StatusCode,
header,
Method,
Body,
};
use serde_derive::{Serialize, Deserialize};
use rouille::{
self,
router,
Request,
Response,
};
use serde_derive::{Serialize, Deserialize};
use rbx_dom_weak::{RbxId, RbxInstance};
use crate::{
@@ -83,57 +82,66 @@ pub struct SubscribeResponse<'a> {
pub messages: Cow<'a, [InstanceChanges]>,
}
pub struct ApiServer {
fn response_json<T: serde::Serialize>(value: T) -> Response<Body> {
let serialized = match serde_json::to_string(&value) {
Ok(v) => v,
Err(err) => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(err.to_string()))
.unwrap();
},
};
Response::builder()
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(serialized))
.unwrap()
}
pub struct ApiService {
live_session: Arc<LiveSession>,
server_version: &'static str,
}
impl Service for ApiServer {
impl Service for ApiService {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type Future = Box<Future<Item = hyper::Response<Self::ReqBody>, Error = Self::Error> + Send>;
type Future = Box<dyn Future<Item = hyper::Response<Self::ReqBody>, Error = Self::Error> + Send>;
fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
Box::new(future::ok(hyper::Response::new(Body::from("Hello, from API!"))))
let response = match (request.method(), request.uri().path()) {
(&Method::GET, "/api/rojo") => self.handle_api_rojo(),
(&Method::GET, path) if path.starts_with("/api/subscribe/") => self.handle_api_subscribe(request),
(&Method::GET, path) if path.starts_with("/api/read/") => self.handle_api_read(request),
_ => {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()
},
};
Box::new(future::ok(response))
}
}
impl ApiServer {
pub fn new(live_session: Arc<LiveSession>) -> ApiServer {
ApiServer {
impl ApiService {
pub fn new(live_session: Arc<LiveSession>) -> ApiService {
ApiService {
live_session,
server_version: env!("CARGO_PKG_VERSION"),
}
}
#[allow(unreachable_code)]
pub fn handle_request(&self, request: &Request) -> Response {
router!(request,
(GET) (/api/rojo) => {
self.handle_api_rojo()
},
(GET) (/api/subscribe/{ cursor: u32 }) => {
self.handle_api_subscribe(cursor)
},
(GET) (/api/read/{ id_list: String }) => {
let requested_ids: Option<Vec<RbxId>> = id_list
.split(',')
.map(RbxId::parse_str)
.collect();
self.handle_api_read(requested_ids)
},
_ => Response::empty_404()
)
}
/// Get a summary of information about the server
fn handle_api_rojo(&self) -> Response {
fn handle_api_rojo(&self) -> Response<Body> {
let rbx_session = self.live_session.rbx_session.lock().unwrap();
let tree = rbx_session.get_tree();
Response::json(&ServerInfoResponse {
response_json(&ServerInfoResponse {
server_version: self.server_version,
protocol_version: 2,
session_id: self.live_session.session_id,
@@ -144,7 +152,19 @@ impl ApiServer {
/// Retrieve any messages past the given cursor index, and if
/// there weren't any, subscribe to receive any new messages.
fn handle_api_subscribe(&self, cursor: u32) -> Response {
fn handle_api_subscribe(&self, request: Request<Body>) -> Response<Body> {
let argument = &request.uri().path()["/api/subscribe".len()..];
let cursor: u32 = match argument.parse() {
Ok(v) => v,
Err(err) => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(err.to_string()))
.unwrap();
},
};
let message_queue = Arc::clone(&self.live_session.message_queue);
// Did the client miss any messages since the last subscribe?
@@ -152,7 +172,7 @@ impl ApiServer {
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);
if !new_messages.is_empty() {
return Response::json(&SubscribeResponse {
return response_json(&SubscribeResponse {
session_id: self.live_session.session_id,
messages: Cow::Borrowed(&new_messages),
message_cursor: new_cursor,
@@ -160,13 +180,16 @@ impl ApiServer {
}
}
// TOOD: Switch to futures mpsc instead to not block this task
let (tx, rx) = mpsc::channel();
let sender_id = message_queue.subscribe(tx);
match rx.recv() {
Ok(_) => (),
Err(_) => return Response::text("error!").with_status_code(500),
Err(_) => return Response::builder()
.status(500)
.body(Body::from("error!"))
.unwrap(),
}
message_queue.unsubscribe(sender_id);
@@ -174,7 +197,7 @@ impl ApiServer {
{
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);
return Response::json(&SubscribeResponse {
return response_json(&SubscribeResponse {
session_id: self.live_session.session_id,
messages: Cow::Owned(new_messages),
message_cursor: new_cursor,
@@ -182,12 +205,24 @@ impl ApiServer {
}
}
fn handle_api_read(&self, requested_ids: Option<Vec<RbxId>>) -> Response {
fn handle_api_read(&self, request: Request<Body>) -> Response<Body> {
let argument = &request.uri().path()["/api/subscribe".len()..];
let requested_ids: Option<Vec<RbxId>> = argument
.split(',')
.map(RbxId::parse_str)
.collect();
let message_queue = Arc::clone(&self.live_session.message_queue);
let requested_ids = match requested_ids {
Some(id) => id,
None => return rouille::Response::text("Malformed ID list").with_status_code(400),
None => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from("Malformed ID list"))
.unwrap();
},
};
let rbx_session = self.live_session.rbx_session.lock().unwrap();
@@ -219,7 +254,7 @@ impl ApiServer {
}
}
Response::json(&ReadResponse {
response_json(&ReadResponse {
session_id: self.live_session.session_id,
message_cursor,
instances,

View File

@@ -5,15 +5,14 @@ use std::sync::Arc;
use futures::{future, Future};
use hyper::{
service::Service,
header,
Body,
};
use rouille::{
self,
router,
Method,
StatusCode,
Request,
Response,
};
use ritz::{html};
use ritz::html;
use crate::{
live_session::LiveSession,
@@ -22,47 +21,41 @@ use crate::{
static HOME_CSS: &str = include_str!("../../assets/index.css");
pub struct InterfaceServer {
pub struct InterfaceService {
live_session: Arc<LiveSession>,
server_version: &'static str,
}
impl Service for InterfaceServer {
impl Service for InterfaceService {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type Future = Box<Future<Item = hyper::Response<Self::ReqBody>, Error = Self::Error> + Send>;
type Future = Box<dyn Future<Item = Response<Self::ReqBody>, Error = Self::Error> + Send>;
fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
Box::new(future::ok(hyper::Response::new(Body::from("Hello, from interface!"))))
fn call(&mut self, request: Request<Self::ReqBody>) -> Self::Future {
let response = match (request.method(), request.uri().path()) {
(&Method::GET, "/") => self.handle_home(),
(&Method::GET, "/visualize/rbx") => self.handle_visualize_rbx(),
(&Method::GET, "/visualize/imfs") => self.handle_visualize_imfs(),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap(),
};
Box::new(future::ok(response))
}
}
impl InterfaceServer {
pub fn new(live_session: Arc<LiveSession>) -> InterfaceServer {
InterfaceServer {
impl InterfaceService {
pub fn new(live_session: Arc<LiveSession>) -> InterfaceService {
InterfaceService {
live_session,
server_version: env!("CARGO_PKG_VERSION"),
}
}
#[allow(unreachable_code)]
pub fn handle_request(&self, request: &Request) -> Response {
router!(request,
(GET) (/) => {
self.handle_home()
},
(GET) (/visualize/rbx) => {
self.handle_visualize_rbx()
},
(GET) (/visualize/imfs) => {
self.handle_visualize_imfs()
},
_ => Response::empty_404()
)
}
fn handle_home(&self) -> Response {
fn handle_home(&self) -> Response<Body> {
let page = html! {
<html>
<head>
@@ -88,26 +81,41 @@ impl InterfaceServer {
</html>
};
Response::html(format!("<!DOCTYPE html>{}", page))
Response::builder()
.header(header::CONTENT_TYPE, "text/html")
.body(Body::from(format!("<!DOCTYPE html>{}", page)))
.unwrap()
}
fn handle_visualize_rbx(&self) -> Response {
fn handle_visualize_rbx(&self) -> Response<Body> {
let rbx_session = self.live_session.rbx_session.lock().unwrap();
let dot_source = format!("{}", VisualizeRbxSession(&rbx_session));
match graphviz_to_svg(&dot_source) {
Some(svg) => Response::svg(svg),
None => Response::text(dot_source),
Some(svg) => Response::builder()
.header(header::CONTENT_TYPE, "image/svg+xml")
.body(Body::from(svg))
.unwrap(),
None => Response::builder()
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(dot_source))
.unwrap(),
}
}
fn handle_visualize_imfs(&self) -> Response {
fn handle_visualize_imfs(&self) -> Response<Body> {
let imfs = self.live_session.imfs.lock().unwrap();
let dot_source = format!("{}", VisualizeImfs(&imfs));
match graphviz_to_svg(&dot_source) {
Some(svg) => Response::svg(svg),
None => Response::text(dot_source),
Some(svg) => Response::builder()
.header(header::CONTENT_TYPE, "image/svg+xml")
.body(Body::from(svg))
.unwrap(),
None => Response::builder()
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(dot_source))
.unwrap(),
}
}
}

View File

@@ -22,20 +22,20 @@ use crate::{
};
use self::{
api::ApiServer,
interface::InterfaceServer,
api::ApiService,
interface::InterfaceService,
};
pub struct RootService {
api: api::ApiServer,
interface: interface::InterfaceServer,
api: api::ApiService,
interface: interface::InterfaceService,
}
impl Service for RootService {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type Future = Box<Future<Item = Response<Self::ReqBody>, Error = Self::Error> + Send>;
type Future = Box<dyn Future<Item = Response<Self::ReqBody>, Error = Self::Error> + Send>;
fn call(&mut self, request: Request<Self::ReqBody>) -> Self::Future {
if request.uri().path().starts_with("/api") {
@@ -49,8 +49,8 @@ impl Service for RootService {
impl RootService {
pub fn new(live_session: Arc<LiveSession>) -> RootService {
RootService {
api: ApiServer::new(Arc::clone(&live_session)),
interface: InterfaceServer::new(Arc::clone(&live_session)),
api: ApiService::new(Arc::clone(&live_session)),
interface: InterfaceService::new(Arc::clone(&live_session)),
}
}
}
@@ -71,7 +71,7 @@ impl LiveServer {
let server = Server::bind(&address)
.serve(move || {
let service: FutureResult<RootService, hyper::Error> =
let service: FutureResult<_, hyper::Error> =
future::ok(RootService::new(Arc::clone(&self.live_session)));
service
})

View File

@@ -1,43 +0,0 @@
use std::io::Read;
use rouille;
use serde;
use serde_json;
static MAX_BODY_SIZE: usize = 100 * 1024 * 1024; // 100 MiB
/// Pulls text that may be JSON out of a Rouille Request object.
///
/// Doesn't do any actual parsing -- all this method does is verify the content
/// type of the request and read the request's body.
fn read_json_text(request: &rouille::Request) -> Option<String> {
// Bail out if the request body isn't marked as JSON
let content_type = request.header("Content-Type")?;
if !content_type.starts_with("application/json") {
return None;
}
let body = request.data()?;
// Allocate a buffer and read up to MAX_BODY_SIZE+1 bytes into it.
let mut out = Vec::new();
body.take(MAX_BODY_SIZE.saturating_add(1) as u64).read_to_end(&mut out).ok()?;
// If the body was too big (MAX_BODY_SIZE+1), we abort instead of trying to
// process it.
if out.len() > MAX_BODY_SIZE {
return None;
}
String::from_utf8(out).ok()
}
/// Reads the body out of a Rouille Request and attempts to turn it into JSON.
pub fn read_json<T>(request: &rouille::Request) -> Option<T>
where
T: serde::de::DeserializeOwned,
{
let body = read_json_text(&request)?;
serde_json::from_str(&body).ok()?
}