diff --git a/src/message_queue.rs b/src/message_queue.rs index 6591e0c7..2f20dcab 100644 --- a/src/message_queue.rs +++ b/src/message_queue.rs @@ -62,20 +62,24 @@ impl MessageQueue { } /// Subscribe to any messages occurring after the given message cursor. - pub fn subscribe(&self, cursor: u32, sender: oneshot::Sender<(u32, Vec)>) { + pub fn subscribe(&self, cursor: u32) -> oneshot::Receiver<(u32, Vec)> { + let (sender, receiver) = oneshot::channel(); + let listener = { let listener = Listener { sender, cursor }; let messages = self.messages.read().unwrap(); match fire_listener_if_ready(&messages, listener) { - Ok(_) => return, + Ok(_) => return receiver, Err(listener) => listener, } }; let mut message_listeners = self.message_listeners.lock().unwrap(); message_listeners.push(listener); + + receiver } /// Subscribe to any messages being pushed into the queue. @@ -88,10 +92,8 @@ impl MessageQueue { let messages = self.messages.read().unwrap(); messages.len() as u32 }; - let (sender, receiver) = oneshot::channel(); - self.subscribe(cursor, sender); - receiver + self.subscribe(cursor) } pub fn cursor(&self) -> u32 { diff --git a/src/web/api.rs b/src/web/api.rs index 8be66359..1ba95725 100644 --- a/src/web/api.rs +++ b/src/web/api.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; -use futures::{sync::oneshot, Future}; +use futures::Future; use hyper::{service::Service, Body, Method, Request, StatusCode}; use rbx_dom_weak::RbxId; @@ -83,12 +83,8 @@ impl ApiService { }; let session_id = self.serve_session.session_id(); - let (sender, receiver) = oneshot::channel(); - { - let message_queue = self.serve_session.message_queue(); - message_queue.subscribe(input_cursor, sender); - } + let receiver = self.serve_session.message_queue().subscribe(input_cursor); let tree_handle = self.serve_session.tree_handle();