diff --git a/src/change_processor.rs b/src/change_processor.rs index 0220025f..8e2f7f02 100644 --- a/src/change_processor.rs +++ b/src/change_processor.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; -use crossbeam_channel::{select, Sender}; +use crossbeam_channel::{select, Receiver, Sender}; use jod_thread::JoinHandle; use crate::{ @@ -26,48 +26,10 @@ impl ChangeProcessor { imfs: Arc>>, ) -> Self { let (shutdown_sender, shutdown_receiver) = crossbeam_channel::bounded(1); - let imfs_receiver = { - let imfs = imfs.lock().unwrap(); - imfs.change_receiver() - }; let thread_handle = jod_thread::Builder::new() .name("ChangeProcessor thread".to_owned()) - .spawn(move || { - log::trace!("ChangeProcessor thread started"); - - loop { - select! { - recv(imfs_receiver) -> event => { - let event = event.unwrap(); - - log::trace!("Imfs event: {:?}", event); - - { - let mut imfs = imfs.lock().unwrap(); - imfs.commit_change(&event).expect("Error applying IMFS change"); - } - - let patch_set = { - let _tree = tree.lock().unwrap(); - - // TODO: Apply changes to tree based on IMFS and - // calculate applied patch set from it. - AppliedPatchSet::new() - }; - - { - message_queue.push_messages(&[patch_set]); - } - }, - recv(shutdown_receiver) -> _ => { - break; - }, - } - } - - log::trace!("ChangeProcessor thread stopping"); - }) + .spawn(move || Self::main_task(shutdown_receiver, tree, message_queue, imfs)) .expect("Could not start ChangeProcessor thread"); Self { @@ -75,6 +37,52 @@ impl ChangeProcessor { thread_handle, } } + + fn main_task( + shutdown_receiver: Receiver<()>, + tree: Arc>, + message_queue: Arc>, + imfs: Arc>>, + ) { + log::trace!("ChangeProcessor thread started"); + + let imfs_receiver = { + let imfs = imfs.lock().unwrap(); + imfs.change_receiver() + }; + + loop { + select! { + recv(imfs_receiver) -> event => { + let event = event.unwrap(); + + log::trace!("Imfs event: {:?}", event); + + { + let mut imfs = imfs.lock().unwrap(); + imfs.commit_change(&event).expect("Error applying IMFS change"); + } + + let patch_set = { + let _tree = tree.lock().unwrap(); + + // TODO: Apply changes to tree based on IMFS and + // calculate applied patch set from it. + AppliedPatchSet::new() + }; + + { + message_queue.push_messages(&[patch_set]); + } + }, + recv(shutdown_receiver) -> _ => { + break; + }, + } + } + + log::trace!("ChangeProcessor thread stopping"); + } } impl Drop for ChangeProcessor {