diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f80ac55..2cc5b4e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [Unreleased] * Fixed `cargo init` giving unexpected results by upgrading to `rbx_dom_weak` 1.1.0 +* Fixed API not responding when the Rojo plugin is connected ([#133](https://github.com/LPGhatguy/rojo/issues/133)) * Updated default place file: * Improved default properties to be closer to Studio's built-in 'Baseplate' template * Added a baseplate to the project file (Thanks, [@AmaranthineCodices](https://github.com/AmaranthineCodices/)!) diff --git a/server/src/message_queue.rs b/server/src/message_queue.rs index 9eefb749..e8d0e5c7 100644 --- a/server/src/message_queue.rs +++ b/server/src/message_queue.rs @@ -1,13 +1,14 @@ use std::{ collections::HashMap, sync::{ - mpsc, atomic::{AtomicUsize, Ordering}, RwLock, Mutex, }, }; +use futures::sync::mpsc; + /// A unique identifier, not guaranteed to be generated in any order. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct ListenerId(usize); @@ -21,8 +22,7 @@ pub fn get_listener_id() -> ListenerId { /// A message queue with persistent history that can be subscribed to. /// -/// Definitely non-optimal, but a simple design that works well for the -/// synchronous web server Rojo uses, Rouille. +/// Definitely non-optimal. This would ideally be a lockless mpmc queue. #[derive(Default)] pub struct MessageQueue { messages: RwLock>, @@ -38,15 +38,15 @@ impl MessageQueue { } pub fn push_messages(&self, new_messages: &[T]) { - let message_listeners = self.message_listeners.lock().unwrap(); + let mut message_listeners = self.message_listeners.lock().unwrap(); { let mut messages = self.messages.write().unwrap(); messages.extend_from_slice(new_messages); } - for listener in message_listeners.values() { - listener.send(()).unwrap(); + for listener in message_listeners.values_mut() { + listener.try_send(()).unwrap(); } } diff --git a/server/src/web/api.rs b/server/src/web/api.rs index c66408c6..a36d02c8 100644 --- a/server/src/web/api.rs +++ b/server/src/web/api.rs @@ -4,10 +4,10 @@ use std::{ borrow::Cow, collections::{HashMap, HashSet}, - sync::{mpsc, Arc}, + sync::Arc, }; -use futures::{future, Future}; +use futures::{future, Future, stream::Stream, sync::mpsc}; use hyper::{ service::Service, header, @@ -114,14 +114,16 @@ impl Service for ApiService { fn call(&mut self, request: hyper::Request) -> Self::Future { let response = match (request.method(), request.uri().path()) { (&Method::GET, "/api/rojo") => self.handle_api_rojo(), - (&Method::GET, path) if path.starts_with("/api/subscribe/") => self.handle_api_subscribe(request), (&Method::GET, path) if path.starts_with("/api/read/") => self.handle_api_read(request), + (&Method::GET, path) if path.starts_with("/api/subscribe/") => { + return self.handle_api_subscribe(request); + } _ => { Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::empty()) .unwrap() - }, + } }; Box::new(future::ok(response)) @@ -152,16 +154,16 @@ impl ApiService { /// Retrieve any messages past the given cursor index, and if /// there weren't any, subscribe to receive any new messages. - fn handle_api_subscribe(&self, request: Request) -> Response { + fn handle_api_subscribe(&self, request: Request) -> ::Future { let argument = &request.uri().path()["/api/subscribe/".len()..]; let cursor: u32 = match argument.parse() { Ok(v) => v, Err(err) => { - return Response::builder() + return Box::new(future::ok(Response::builder() .status(StatusCode::BAD_REQUEST) .header(header::CONTENT_TYPE, "text/plain") .body(Body::from(err.to_string())) - .unwrap(); + .unwrap())); }, }; @@ -172,37 +174,38 @@ impl ApiService { let (new_cursor, new_messages) = message_queue.get_messages_since(cursor); if !new_messages.is_empty() { - return response_json(&SubscribeResponse { + return Box::new(future::ok(response_json(&SubscribeResponse { session_id: self.live_session.session_id(), messages: Cow::Borrowed(&new_messages), message_cursor: new_cursor, - }) + }))); } } - // TOOD: Switch to futures mpsc instead to not block this task - let (tx, rx) = mpsc::channel(); + let (tx, rx) = mpsc::channel(1024); let sender_id = message_queue.subscribe(tx); + let session_id = self.live_session.session_id(); - match rx.recv() { - Ok(_) => (), - Err(_) => return Response::builder() - .status(500) - .body(Body::from("error!")) - .unwrap(), - } + let result = rx.into_future() + .and_then(move |_| { + message_queue.unsubscribe(sender_id); - message_queue.unsubscribe(sender_id); + let (new_cursor, new_messages) = message_queue.get_messages_since(cursor); - { - let (new_cursor, new_messages) = message_queue.get_messages_since(cursor); - - return response_json(&SubscribeResponse { - session_id: self.live_session.session_id(), - messages: Cow::Owned(new_messages), - message_cursor: new_cursor, + Box::new(future::ok(response_json(SubscribeResponse { + session_id: session_id, + messages: Cow::Owned(new_messages), + message_cursor: new_cursor, + }))) }) - } + .or_else(|e| { + Box::new(future::ok(Response::builder() + .status(500) + .body(Body::from(format!("Internal Error: {:?}", e))) + .unwrap())) + }); + + Box::new(result) } fn handle_api_read(&self, request: Request) -> Response {