From e773a92e53c58dda5f5261e2ee7da7d02e325e31 Mon Sep 17 00:00:00 2001 From: Lucien Greathouse Date: Thu, 19 Sep 2019 13:12:21 -0700 Subject: [PATCH] Connect message queue to frontend API --- src/web/api.rs | 41 ++++++++++++++++++++++++++--------------- src/web/interface.rs | 3 +-- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/web/api.rs b/src/web/api.rs index 4bd676b9..83f4be56 100644 --- a/src/web/api.rs +++ b/src/web/api.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; -use futures::{future, Future}; +use futures::{future, sync::oneshot, Future}; use hyper::{header, service::Service, Body, Method, Request, Response, StatusCode}; use rbx_dom_weak::RbxId; @@ -13,8 +13,8 @@ use crate::{ serve_session::ServeSession, web::{ interface::{ - Instance, NotFoundError, ReadResponse, ServerInfoResponse, PROTOCOL_VERSION, - SERVER_VERSION, + Instance, NotFoundError, ReadResponse, ServerInfoResponse, SubscribeResponse, + PROTOCOL_VERSION, SERVER_VERSION, }, util::{json, json_ok}, }, @@ -66,7 +66,7 @@ impl ApiService { /// there weren't any, subscribe to receive any new messages. fn handle_api_subscribe(&self, request: Request) -> ::Future { let argument = &request.uri().path()["/api/subscribe/".len()..]; - let _input_cursor: u32 = match argument.parse() { + let input_cursor: u32 = match argument.parse() { Ok(v) => v, Err(err) => { return Box::new(future::ok( @@ -79,19 +79,30 @@ impl ApiService { } }; - // Temporary response to prevent Rojo plugin from sending too many - // requests, this will hang the request until it times out. - Box::new(future::empty()) + let session_id = self.serve_session.session_id(); + let (sender, receiver) = oneshot::channel(); - // let message_queue = self.serve_session.message_queue(); - // let message_cursor = message_queue.cursor(); - // let messages = Vec::new(); + { + let message_queue = self.serve_session.message_queue(); + message_queue.subscribe(input_cursor, sender); + } - // json_ok(SubscribeResponse { - // session_id: self.serve_session.session_id(), - // message_cursor, - // messages, - // }) + Box::new(receiver.then(move |result| { + match result { + Ok((message_cursor, messages)) => json_ok(SubscribeResponse { + session_id, + message_cursor, + messages, + }), + Err(_) => Box::new(future::ok( + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header(header::CONTENT_TYPE, "text/plain") + .body(Body::from("Message queue disconnected sender!")) + .unwrap(), + )), + } + })) } fn handle_api_read(&self, request: Request) -> ::Future { diff --git a/src/web/interface.rs b/src/web/interface.rs index b62c77f5..7fb0ac26 100644 --- a/src/web/interface.rs +++ b/src/web/interface.rs @@ -17,8 +17,7 @@ pub(crate) const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const PROTOCOL_VERSION: u64 = 3; // TODO -#[derive(Debug, Serialize, Deserialize)] -pub struct SubscribeMessage; +pub type SubscribeMessage = (); #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")]