From 41025225b224b768cea788b10cf60cc374b4514a Mon Sep 17 00:00:00 2001 From: Lucien Greathouse Date: Wed, 27 Mar 2019 13:27:50 -0700 Subject: [PATCH] Rewrite message queue with oneshot futures (#139) --- server/src/message_queue.rs | 72 ++++++++++++++++++++++--------------- server/src/web/api.rs | 31 +++++----------- 2 files changed, 53 insertions(+), 50 deletions(-) diff --git a/server/src/message_queue.rs b/server/src/message_queue.rs index e8d0e5c7..08d93e8e 100644 --- a/server/src/message_queue.rs +++ b/server/src/message_queue.rs @@ -1,23 +1,28 @@ use std::{ - collections::HashMap, + mem, sync::{ - atomic::{AtomicUsize, Ordering}, RwLock, Mutex, }, }; -use futures::sync::mpsc; +use futures::sync::oneshot; -/// A unique identifier, not guaranteed to be generated in any order. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct ListenerId(usize); +struct Listener { + sender: oneshot::Sender<(u32, Vec)>, + cursor: u32, +} -/// Generate a new ID, which has no defined ordering. -pub fn get_listener_id() -> ListenerId { - static LAST_ID: AtomicUsize = AtomicUsize::new(0); +fn fire_listener_if_ready(messages: &[T], listener: Listener) -> Result<(), Listener> { + let current_cursor = messages.len() as u32; - ListenerId(LAST_ID.fetch_add(1, Ordering::SeqCst)) + if listener.cursor < current_cursor { + let new_messages = messages[(listener.cursor as usize)..].to_vec(); + let _ = listener.sender.send((current_cursor, new_messages)); + Ok(()) + } else { + Err(listener) + } } /// A message queue with persistent history that can be subscribed to. @@ -26,42 +31,53 @@ pub fn get_listener_id() -> ListenerId { #[derive(Default)] pub struct MessageQueue { messages: RwLock>, - message_listeners: Mutex>>, + message_listeners: Mutex>>, } impl MessageQueue { pub fn new() -> MessageQueue { MessageQueue { messages: RwLock::new(Vec::new()), - message_listeners: Mutex::new(HashMap::new()), + message_listeners: Mutex::new(Vec::new()), } } pub fn push_messages(&self, new_messages: &[T]) { let mut message_listeners = self.message_listeners.lock().unwrap(); + let mut messages = self.messages.write().unwrap(); + messages.extend_from_slice(new_messages); - { - let mut messages = self.messages.write().unwrap(); - messages.extend_from_slice(new_messages); + let mut remaining_listeners = Vec::new(); + + for listener in message_listeners.drain(..) { + match fire_listener_if_ready(&messages, listener) { + Ok(_) => {} + Err(listener) => remaining_listeners.push(listener) + } } - for listener in message_listeners.values_mut() { - listener.try_send(()).unwrap(); - } + // Without this annotation, Rust gets confused since the first argument + // is a MutexGuard, but the second is a Vec. + mem::replace::>(&mut message_listeners, remaining_listeners); } - pub fn subscribe(&self, sender: mpsc::Sender<()>) -> ListenerId { - let id = get_listener_id(); + pub fn subscribe(&self, cursor: u32, sender: oneshot::Sender<(u32, Vec)>) { + let listener = { + let listener = Listener { + sender, + cursor, + }; + + let messages = self.messages.read().unwrap(); + + match fire_listener_if_ready(&messages, listener) { + Ok(_) => return, + Err(listener) => listener + } + }; let mut message_listeners = self.message_listeners.lock().unwrap(); - message_listeners.insert(id, sender); - - id - } - - pub fn unsubscribe(&self, id: ListenerId) { - let mut message_listeners = self.message_listeners.lock().unwrap(); - message_listeners.remove(&id); + message_listeners.push(listener); } pub fn get_message_cursor(&self) -> u32 { diff --git a/server/src/web/api.rs b/server/src/web/api.rs index a36d02c8..0b6cbe03 100644 --- a/server/src/web/api.rs +++ b/server/src/web/api.rs @@ -7,7 +7,11 @@ use std::{ sync::Arc, }; -use futures::{future, Future, stream::Stream, sync::mpsc}; +use futures::{ + future::{self, IntoFuture}, + Future, + sync::oneshot, +}; use hyper::{ service::Service, header, @@ -168,30 +172,13 @@ impl ApiService { }; let message_queue = Arc::clone(&self.live_session.message_queue); - - // Did the client miss any messages since the last subscribe? - { - let (new_cursor, new_messages) = message_queue.get_messages_since(cursor); - - if !new_messages.is_empty() { - return Box::new(future::ok(response_json(&SubscribeResponse { - session_id: self.live_session.session_id(), - messages: Cow::Borrowed(&new_messages), - message_cursor: new_cursor, - }))); - } - } - - let (tx, rx) = mpsc::channel(1024); - let sender_id = message_queue.subscribe(tx); let session_id = self.live_session.session_id(); + let (tx, rx) = oneshot::channel(); + message_queue.subscribe(cursor, tx); + let result = rx.into_future() - .and_then(move |_| { - message_queue.unsubscribe(sender_id); - - let (new_cursor, new_messages) = message_queue.get_messages_since(cursor); - + .and_then(move |(new_cursor, new_messages)| { Box::new(future::ok(response_json(SubscribeResponse { session_id: session_id, messages: Cow::Owned(new_messages),