Start ChangeProcessor pumping work

This commit is contained in:
Lucien Greathouse
2019-10-01 13:48:16 -07:00
parent b94f21402c
commit 5b7037550d

View File

@@ -9,9 +9,10 @@ use crossbeam_channel::{select, Receiver, Sender};
use jod_thread::JoinHandle;
use crate::{
imfs::{Imfs, ImfsFetcher},
imfs::{Imfs, ImfsEvent, ImfsFetcher},
message_queue::MessageQueue,
snapshot::{AppliedPatchSet, RojoTree},
snapshot::{apply_patch_set, compute_patch_set, AppliedPatchSet, RojoTree},
snapshot_middleware::snapshot_from_imfs,
};
pub struct ChangeProcessor {
@@ -29,7 +30,11 @@ impl ChangeProcessor {
let thread_handle = jod_thread::Builder::new()
.name("ChangeProcessor thread".to_owned())
.spawn(move || Self::main_task(shutdown_receiver, tree, message_queue, imfs))
.spawn(move || {
log::trace!("ChangeProcessor thread started");
Self::main_task(shutdown_receiver, tree, message_queue, imfs);
log::trace!("ChangeProcessor thread stopped");
})
.expect("Could not start ChangeProcessor thread");
Self {
@@ -44,8 +49,6 @@ impl ChangeProcessor {
message_queue: Arc<MessageQueue<AppliedPatchSet>>,
imfs: Arc<Mutex<Imfs<F>>>,
) {
log::trace!("ChangeProcessor thread started");
let imfs_receiver = {
let imfs = imfs.lock().unwrap();
imfs.change_receiver()
@@ -61,13 +64,49 @@ impl ChangeProcessor {
log::trace!("Imfs event: {:?}", event);
{
let patch_set = {
let mut imfs = imfs.lock().unwrap();
imfs.commit_change(&event).expect("Error applying IMFS change");
}
let patch_set = {
let _tree = tree.lock().unwrap();
let tree = tree.lock().unwrap();
match event {
ImfsEvent::Created(path) | ImfsEvent::Modified(path) | ImfsEvent::Removed(path) => {
let affected_ids = tree.get_ids_at_path(&path);
if affected_ids.len() == 0 {
log::info!("No instances were affected by this change.");
continue;
}
for &id in affected_ids {
let metadata = tree.get_metadata(id)
.expect("metadata missing for instance present in tree");
let instigating_path = match metadata.contributing_paths.get(0) {
Some(path) => path,
None => {
log::warn!("Instance {} did not have an instigating path, but was considered for an update.", id);
log::warn!("This is a Rojo bug. Please file an issue!");
continue;
}
};
let entry = imfs
.get(instigating_path)
.expect("could not get instigating path from filesystem");
let snapshot = snapshot_from_imfs(&mut imfs, &entry)
.expect("snapshot failed")
.expect("snapshot did not return an instance");
println!("Snapshot: {:?}", snapshot);
}
}
ImfsEvent::Removed(path) => {
log::warn!("TODO: Handle file remove events");
}
}
// TODO: Apply changes to tree based on IMFS and
// calculate applied patch set from it.
@@ -79,12 +118,11 @@ impl ChangeProcessor {
}
},
recv(shutdown_receiver) -> _ => {
log::trace!("ChangeProcessor shutdown signal received...");
break;
},
}
}
log::trace!("ChangeProcessor thread stopping");
}
}