Port to futures channel instead of std one.

Fixes #133.
This commit is contained in:
Lucien Greathouse
2019-03-12 11:45:39 -07:00
parent 3b6238ff93
commit ad93631ef8
3 changed files with 37 additions and 33 deletions

View File

@@ -2,6 +2,7 @@
## [Unreleased] ## [Unreleased]
* Fixed `cargo init` giving unexpected results by upgrading to `rbx_dom_weak` 1.1.0 * Fixed `cargo init` giving unexpected results by upgrading to `rbx_dom_weak` 1.1.0
* Fixed API not responding when the Rojo plugin is connected ([#133](https://github.com/LPGhatguy/rojo/issues/133))
* Updated default place file: * Updated default place file:
* Improved default properties to be closer to Studio's built-in 'Baseplate' template * Improved default properties to be closer to Studio's built-in 'Baseplate' template
* Added a baseplate to the project file (Thanks, [@AmaranthineCodices](https://github.com/AmaranthineCodices/)!) * Added a baseplate to the project file (Thanks, [@AmaranthineCodices](https://github.com/AmaranthineCodices/)!)

View File

@@ -1,13 +1,14 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{ sync::{
mpsc,
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
RwLock, RwLock,
Mutex, Mutex,
}, },
}; };
use futures::sync::mpsc;
/// A unique identifier, not guaranteed to be generated in any order. /// A unique identifier, not guaranteed to be generated in any order.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ListenerId(usize); pub struct ListenerId(usize);
@@ -21,8 +22,7 @@ pub fn get_listener_id() -> ListenerId {
/// A message queue with persistent history that can be subscribed to. /// A message queue with persistent history that can be subscribed to.
/// ///
/// Definitely non-optimal, but a simple design that works well for the /// Definitely non-optimal. This would ideally be a lockless mpmc queue.
/// synchronous web server Rojo uses, Rouille.
#[derive(Default)] #[derive(Default)]
pub struct MessageQueue<T> { pub struct MessageQueue<T> {
messages: RwLock<Vec<T>>, messages: RwLock<Vec<T>>,
@@ -38,15 +38,15 @@ impl<T: Clone> MessageQueue<T> {
} }
pub fn push_messages(&self, new_messages: &[T]) { pub fn push_messages(&self, new_messages: &[T]) {
let message_listeners = self.message_listeners.lock().unwrap(); let mut message_listeners = self.message_listeners.lock().unwrap();
{ {
let mut messages = self.messages.write().unwrap(); let mut messages = self.messages.write().unwrap();
messages.extend_from_slice(new_messages); messages.extend_from_slice(new_messages);
} }
for listener in message_listeners.values() { for listener in message_listeners.values_mut() {
listener.send(()).unwrap(); listener.try_send(()).unwrap();
} }
} }

View File

@@ -4,10 +4,10 @@
use std::{ use std::{
borrow::Cow, borrow::Cow,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
sync::{mpsc, Arc}, sync::Arc,
}; };
use futures::{future, Future}; use futures::{future, Future, stream::Stream, sync::mpsc};
use hyper::{ use hyper::{
service::Service, service::Service,
header, header,
@@ -114,14 +114,16 @@ impl Service for ApiService {
fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future { fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
let response = match (request.method(), request.uri().path()) { let response = match (request.method(), request.uri().path()) {
(&Method::GET, "/api/rojo") => self.handle_api_rojo(), (&Method::GET, "/api/rojo") => self.handle_api_rojo(),
(&Method::GET, path) if path.starts_with("/api/subscribe/") => self.handle_api_subscribe(request),
(&Method::GET, path) if path.starts_with("/api/read/") => self.handle_api_read(request), (&Method::GET, path) if path.starts_with("/api/read/") => self.handle_api_read(request),
(&Method::GET, path) if path.starts_with("/api/subscribe/") => {
return self.handle_api_subscribe(request);
}
_ => { _ => {
Response::builder() Response::builder()
.status(StatusCode::NOT_FOUND) .status(StatusCode::NOT_FOUND)
.body(Body::empty()) .body(Body::empty())
.unwrap() .unwrap()
}, }
}; };
Box::new(future::ok(response)) Box::new(future::ok(response))
@@ -152,16 +154,16 @@ impl ApiService {
/// Retrieve any messages past the given cursor index, and if /// Retrieve any messages past the given cursor index, and if
/// there weren't any, subscribe to receive any new messages. /// there weren't any, subscribe to receive any new messages.
fn handle_api_subscribe(&self, request: Request<Body>) -> Response<Body> { fn handle_api_subscribe(&self, request: Request<Body>) -> <ApiService as Service>::Future {
let argument = &request.uri().path()["/api/subscribe/".len()..]; let argument = &request.uri().path()["/api/subscribe/".len()..];
let cursor: u32 = match argument.parse() { let cursor: u32 = match argument.parse() {
Ok(v) => v, Ok(v) => v,
Err(err) => { Err(err) => {
return Response::builder() return Box::new(future::ok(Response::builder()
.status(StatusCode::BAD_REQUEST) .status(StatusCode::BAD_REQUEST)
.header(header::CONTENT_TYPE, "text/plain") .header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(err.to_string())) .body(Body::from(err.to_string()))
.unwrap(); .unwrap()));
}, },
}; };
@@ -172,37 +174,38 @@ impl ApiService {
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor); let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);
if !new_messages.is_empty() { if !new_messages.is_empty() {
return response_json(&SubscribeResponse { return Box::new(future::ok(response_json(&SubscribeResponse {
session_id: self.live_session.session_id(), session_id: self.live_session.session_id(),
messages: Cow::Borrowed(&new_messages), messages: Cow::Borrowed(&new_messages),
message_cursor: new_cursor, message_cursor: new_cursor,
}) })));
} }
} }
// TOOD: Switch to futures mpsc instead to not block this task let (tx, rx) = mpsc::channel(1024);
let (tx, rx) = mpsc::channel();
let sender_id = message_queue.subscribe(tx); let sender_id = message_queue.subscribe(tx);
let session_id = self.live_session.session_id();
match rx.recv() { let result = rx.into_future()
Ok(_) => (), .and_then(move |_| {
Err(_) => return Response::builder() message_queue.unsubscribe(sender_id);
.status(500)
.body(Body::from("error!"))
.unwrap(),
}
message_queue.unsubscribe(sender_id); let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);
{ Box::new(future::ok(response_json(SubscribeResponse {
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor); session_id: session_id,
messages: Cow::Owned(new_messages),
return response_json(&SubscribeResponse { message_cursor: new_cursor,
session_id: self.live_session.session_id(), })))
messages: Cow::Owned(new_messages),
message_cursor: new_cursor,
}) })
} .or_else(|e| {
Box::new(future::ok(Response::builder()
.status(500)
.body(Body::from(format!("Internal Error: {:?}", e)))
.unwrap()))
});
Box::new(result)
} }
fn handle_api_read(&self, request: Request<Body>) -> Response<Body> { fn handle_api_read(&self, request: Request<Body>) -> Response<Body> {