Refactor MessageQueue API to return a oneshot receiver

This commit is contained in:
Lucien Greathouse
2019-10-08 13:49:41 -07:00
parent 18533d5944
commit 4e47655b17
2 changed files with 9 additions and 11 deletions

View File

@@ -62,20 +62,24 @@ impl<T: Clone> MessageQueue<T> {
} }
/// Subscribe to any messages occurring after the given message cursor. /// Subscribe to any messages occurring after the given message cursor.
pub fn subscribe(&self, cursor: u32, sender: oneshot::Sender<(u32, Vec<T>)>) { pub fn subscribe(&self, cursor: u32) -> oneshot::Receiver<(u32, Vec<T>)> {
let (sender, receiver) = oneshot::channel();
let listener = { let listener = {
let listener = Listener { sender, cursor }; let listener = Listener { sender, cursor };
let messages = self.messages.read().unwrap(); let messages = self.messages.read().unwrap();
match fire_listener_if_ready(&messages, listener) { match fire_listener_if_ready(&messages, listener) {
Ok(_) => return, Ok(_) => return receiver,
Err(listener) => listener, Err(listener) => listener,
} }
}; };
let mut message_listeners = self.message_listeners.lock().unwrap(); let mut message_listeners = self.message_listeners.lock().unwrap();
message_listeners.push(listener); message_listeners.push(listener);
receiver
} }
/// Subscribe to any messages being pushed into the queue. /// Subscribe to any messages being pushed into the queue.
@@ -88,10 +92,8 @@ impl<T: Clone> MessageQueue<T> {
let messages = self.messages.read().unwrap(); let messages = self.messages.read().unwrap();
messages.len() as u32 messages.len() as u32
}; };
let (sender, receiver) = oneshot::channel();
self.subscribe(cursor, sender); self.subscribe(cursor)
receiver
} }
pub fn cursor(&self) -> u32 { pub fn cursor(&self) -> u32 {

View File

@@ -3,7 +3,7 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use futures::{sync::oneshot, Future}; use futures::Future;
use hyper::{service::Service, Body, Method, Request, StatusCode}; use hyper::{service::Service, Body, Method, Request, StatusCode};
use rbx_dom_weak::RbxId; use rbx_dom_weak::RbxId;
@@ -83,12 +83,8 @@ impl<F: ImfsFetcher> ApiService<F> {
}; };
let session_id = self.serve_session.session_id(); let session_id = self.serve_session.session_id();
let (sender, receiver) = oneshot::channel();
{ let receiver = self.serve_session.message_queue().subscribe(input_cursor);
let message_queue = self.serve_session.message_queue();
message_queue.subscribe(input_cursor, sender);
}
let tree_handle = self.serve_session.tree_handle(); let tree_handle = self.serve_session.tree_handle();