Move ChangeReceiver main block into its own function

This commit is contained in:
Lucien Greathouse
2019-09-23 16:13:19 -07:00
parent 2440d9fc48
commit 5a4189a770

View File

@@ -5,7 +5,7 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use crossbeam_channel::{select, Sender}; use crossbeam_channel::{select, Receiver, Sender};
use jod_thread::JoinHandle; use jod_thread::JoinHandle;
use crate::{ use crate::{
@@ -26,48 +26,10 @@ impl ChangeProcessor {
imfs: Arc<Mutex<Imfs<F>>>, imfs: Arc<Mutex<Imfs<F>>>,
) -> Self { ) -> Self {
let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1); let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1);
let imfs_receiver = {
let imfs = imfs.lock().unwrap();
imfs.change_receiver()
};
let thread_handle = jod_thread::Builder::new() let thread_handle = jod_thread::Builder::new()
.name("ChangeProcessor thread".to_owned()) .name("ChangeProcessor thread".to_owned())
.spawn(move || { .spawn(move || Self::main_task(shutdown_receiver, tree, message_queue, imfs))
log::trace!("ChangeProcessor thread started");
loop {
select! {
recv(imfs_receiver) -> event => {
let event = event.unwrap();
log::trace!("Imfs event: {:?}", event);
{
let mut imfs = imfs.lock().unwrap();
imfs.commit_change(&event).expect("Error applying IMFS change");
}
let patch_set = {
let _tree = tree.lock().unwrap();
// TODO: Apply changes to tree based on IMFS and
// calculate applied patch set from it.
AppliedPatchSet::new()
};
{
message_queue.push_messages(&[patch_set]);
}
},
recv(shutdown_receiver) -> _ => {
break;
},
}
}
log::trace!("ChangeProcessor thread stopping");
})
.expect("Could not start ChangeProcessor thread"); .expect("Could not start ChangeProcessor thread");
Self { Self {
@@ -75,6 +37,52 @@ impl ChangeProcessor {
thread_handle, thread_handle,
} }
} }
fn main_task<F: ImfsFetcher>(
shutdown_receiver: Receiver<()>,
tree: Arc<Mutex<RojoTree>>,
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
imfs: Arc<Mutex<Imfs<F>>>,
) {
log::trace!("ChangeProcessor thread started");
let imfs_receiver = {
let imfs = imfs.lock().unwrap();
imfs.change_receiver()
};
loop {
select! {
recv(imfs_receiver) -> event => {
let event = event.unwrap();
log::trace!("Imfs event: {:?}", event);
{
let mut imfs = imfs.lock().unwrap();
imfs.commit_change(&event).expect("Error applying IMFS change");
}
let patch_set = {
let _tree = tree.lock().unwrap();
// TODO: Apply changes to tree based on IMFS and
// calculate applied patch set from it.
AppliedPatchSet::new()
};
{
message_queue.push_messages(&[patch_set]);
}
},
recv(shutdown_receiver) -> _ => {
break;
},
}
}
log::trace!("ChangeProcessor thread stopping");
}
} }
impl Drop for ChangeProcessor { impl Drop for ChangeProcessor {