Rewrite message queue with oneshot futures (#139)

This commit is contained in:
Lucien Greathouse
2019-03-27 13:27:50 -07:00
committed by GitHub
parent 07c7b28c03
commit 41025225b2
2 changed files with 53 additions and 50 deletions

View File

@@ -1,23 +1,28 @@
use std::{ use std::{
collections::HashMap, mem,
sync::{ sync::{
atomic::{AtomicUsize, Ordering},
RwLock, RwLock,
Mutex, Mutex,
}, },
}; };
use futures::sync::mpsc; use futures::sync::oneshot;
/// A unique identifier, not guaranteed to be generated in any order. struct Listener<T> {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] sender: oneshot::Sender<(u32, Vec<T>)>,
pub struct ListenerId(usize); cursor: u32,
}
/// Generate a new ID, which has no defined ordering. fn fire_listener_if_ready<T: Clone>(messages: &[T], listener: Listener<T>) -> Result<(), Listener<T>> {
pub fn get_listener_id() -> ListenerId { let current_cursor = messages.len() as u32;
static LAST_ID: AtomicUsize = AtomicUsize::new(0);
ListenerId(LAST_ID.fetch_add(1, Ordering::SeqCst)) if listener.cursor < current_cursor {
let new_messages = messages[(listener.cursor as usize)..].to_vec();
let _ = listener.sender.send((current_cursor, new_messages));
Ok(())
} else {
Err(listener)
}
} }
/// A message queue with persistent history that can be subscribed to. /// A message queue with persistent history that can be subscribed to.
@@ -26,42 +31,53 @@ pub fn get_listener_id() -> ListenerId {
#[derive(Default)] #[derive(Default)]
pub struct MessageQueue<T> { pub struct MessageQueue<T> {
messages: RwLock<Vec<T>>, messages: RwLock<Vec<T>>,
message_listeners: Mutex<HashMap<ListenerId, mpsc::Sender<()>>>, message_listeners: Mutex<Vec<Listener<T>>>,
} }
impl<T: Clone> MessageQueue<T> { impl<T: Clone> MessageQueue<T> {
pub fn new() -> MessageQueue<T> { pub fn new() -> MessageQueue<T> {
MessageQueue { MessageQueue {
messages: RwLock::new(Vec::new()), messages: RwLock::new(Vec::new()),
message_listeners: Mutex::new(HashMap::new()), message_listeners: Mutex::new(Vec::new()),
} }
} }
pub fn push_messages(&self, new_messages: &[T]) { pub fn push_messages(&self, new_messages: &[T]) {
let mut message_listeners = self.message_listeners.lock().unwrap(); let mut message_listeners = self.message_listeners.lock().unwrap();
let mut messages = self.messages.write().unwrap();
messages.extend_from_slice(new_messages);
{ let mut remaining_listeners = Vec::new();
let mut messages = self.messages.write().unwrap();
messages.extend_from_slice(new_messages); for listener in message_listeners.drain(..) {
match fire_listener_if_ready(&messages, listener) {
Ok(_) => {}
Err(listener) => remaining_listeners.push(listener)
}
} }
for listener in message_listeners.values_mut() { // Without this annotation, Rust gets confused since the first argument
listener.try_send(()).unwrap(); // is a MutexGuard, but the second is a Vec.
} mem::replace::<Vec<_>>(&mut message_listeners, remaining_listeners);
} }
pub fn subscribe(&self, sender: mpsc::Sender<()>) -> ListenerId { pub fn subscribe(&self, cursor: u32, sender: oneshot::Sender<(u32, Vec<T>)>) {
let id = get_listener_id(); let listener = {
let listener = Listener {
sender,
cursor,
};
let messages = self.messages.read().unwrap();
match fire_listener_if_ready(&messages, listener) {
Ok(_) => return,
Err(listener) => listener
}
};
let mut message_listeners = self.message_listeners.lock().unwrap(); let mut message_listeners = self.message_listeners.lock().unwrap();
message_listeners.insert(id, sender); message_listeners.push(listener);
id
}
pub fn unsubscribe(&self, id: ListenerId) {
let mut message_listeners = self.message_listeners.lock().unwrap();
message_listeners.remove(&id);
} }
pub fn get_message_cursor(&self) -> u32 { pub fn get_message_cursor(&self) -> u32 {

View File

@@ -7,7 +7,11 @@ use std::{
sync::Arc, sync::Arc,
}; };
use futures::{future, Future, stream::Stream, sync::mpsc}; use futures::{
future::{self, IntoFuture},
Future,
sync::oneshot,
};
use hyper::{ use hyper::{
service::Service, service::Service,
header, header,
@@ -168,30 +172,13 @@ impl ApiService {
}; };
let message_queue = Arc::clone(&self.live_session.message_queue); let message_queue = Arc::clone(&self.live_session.message_queue);
// Did the client miss any messages since the last subscribe?
{
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);
if !new_messages.is_empty() {
return Box::new(future::ok(response_json(&SubscribeResponse {
session_id: self.live_session.session_id(),
messages: Cow::Borrowed(&new_messages),
message_cursor: new_cursor,
})));
}
}
let (tx, rx) = mpsc::channel(1024);
let sender_id = message_queue.subscribe(tx);
let session_id = self.live_session.session_id(); let session_id = self.live_session.session_id();
let (tx, rx) = oneshot::channel();
message_queue.subscribe(cursor, tx);
let result = rx.into_future() let result = rx.into_future()
.and_then(move |_| { .and_then(move |(new_cursor, new_messages)| {
message_queue.unsubscribe(sender_id);
let (new_cursor, new_messages) = message_queue.get_messages_since(cursor);
Box::new(future::ok(response_json(SubscribeResponse { Box::new(future::ok(response_json(SubscribeResponse {
session_id: session_id, session_id: session_id,
messages: Cow::Owned(new_messages), messages: Cow::Owned(new_messages),