Connect message queue to frontend API

This commit is contained in:
Lucien Greathouse
2019-09-19 13:12:21 -07:00
parent 196d27b959
commit e773a92e53
2 changed files with 27 additions and 17 deletions

View File

@@ -3,7 +3,7 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use futures::{future, Future}; use futures::{future, sync::oneshot, Future};
use hyper::{header, service::Service, Body, Method, Request, Response, StatusCode}; use hyper::{header, service::Service, Body, Method, Request, Response, StatusCode};
use rbx_dom_weak::RbxId; use rbx_dom_weak::RbxId;
@@ -13,8 +13,8 @@ use crate::{
serve_session::ServeSession, serve_session::ServeSession,
web::{ web::{
interface::{ interface::{
Instance, NotFoundError, ReadResponse, ServerInfoResponse, PROTOCOL_VERSION, Instance, NotFoundError, ReadResponse, ServerInfoResponse, SubscribeResponse,
SERVER_VERSION, PROTOCOL_VERSION, SERVER_VERSION,
}, },
util::{json, json_ok}, util::{json, json_ok},
}, },
@@ -66,7 +66,7 @@ impl<F: ImfsFetcher> ApiService<F> {
/// there weren't any, subscribe to receive any new messages. /// there weren't any, subscribe to receive any new messages.
fn handle_api_subscribe(&self, request: Request<Body>) -> <Self as Service>::Future { fn handle_api_subscribe(&self, request: Request<Body>) -> <Self as Service>::Future {
let argument = &request.uri().path()["/api/subscribe/".len()..]; let argument = &request.uri().path()["/api/subscribe/".len()..];
let _input_cursor: u32 = match argument.parse() { let input_cursor: u32 = match argument.parse() {
Ok(v) => v, Ok(v) => v,
Err(err) => { Err(err) => {
return Box::new(future::ok( return Box::new(future::ok(
@@ -79,19 +79,30 @@ impl<F: ImfsFetcher> ApiService<F> {
} }
}; };
// Temporary response to prevent Rojo plugin from sending too many let session_id = self.serve_session.session_id();
// requests, this will hang the request until it times out. let (sender, receiver) = oneshot::channel();
Box::new(future::empty())
// let message_queue = self.serve_session.message_queue(); {
// let message_cursor = message_queue.cursor(); let message_queue = self.serve_session.message_queue();
// let messages = Vec::new(); message_queue.subscribe(input_cursor, sender);
}
// json_ok(SubscribeResponse { Box::new(receiver.then(move |result| {
// session_id: self.serve_session.session_id(), match result {
// message_cursor, Ok((message_cursor, messages)) => json_ok(SubscribeResponse {
// messages, session_id,
// }) message_cursor,
messages,
}),
Err(_) => Box::new(future::ok(
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from("Message queue disconnected sender!"))
.unwrap(),
)),
}
}))
} }
fn handle_api_read(&self, request: Request<Body>) -> <Self as Service>::Future { fn handle_api_read(&self, request: Request<Body>) -> <Self as Service>::Future {

View File

@@ -17,8 +17,7 @@ pub(crate) const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
pub const PROTOCOL_VERSION: u64 = 3; pub const PROTOCOL_VERSION: u64 = 3;
// TODO // TODO
#[derive(Debug, Serialize, Deserialize)] pub type SubscribeMessage = ();
pub struct SubscribeMessage;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]