//! Defines Rojo's HTTP API, all under /api. These endpoints generally return //! JSON. use std::{collections::HashMap, fs, path::PathBuf, str::FromStr, sync::Arc}; use futures::{sink::SinkExt, stream::StreamExt}; use hyper::{body, Body, Method, Request, Response, StatusCode}; use hyper_tungstenite::{is_upgrade_request, tungstenite::Message, upgrade, HyperWebsocket}; use opener::OpenError; use rbx_dom_weak::{ types::{Ref, Variant}, InstanceBuilder, UstrMap, WeakDom, }; use crate::{ serve_session::ServeSession, snapshot::{InstanceWithMeta, PatchSet, PatchUpdate}, web::{ interface::{ ErrorResponse, Instance, MessagesPacket, OpenResponse, ReadResponse, ServerInfoResponse, SocketPacket, SocketPacketBody, SocketPacketType, SubscribeMessage, WriteRequest, WriteResponse, PROTOCOL_VERSION, SERVER_VERSION, }, util::{deserialize_msgpack, msgpack, msgpack_ok, serialize_msgpack}, }, web_api::{ InstanceUpdate, RefPatchRequest, RefPatchResponse, SerializeRequest, SerializeResponse, }, }; pub async fn call(serve_session: Arc, mut request: Request) -> Response { let service = ApiService::new(serve_session); match (request.method(), request.uri().path()) { (&Method::GET, "/api/rojo") => service.handle_api_rojo().await, (&Method::GET, path) if path.starts_with("/api/read/") => { service.handle_api_read(request).await } (&Method::GET, path) if path.starts_with("/api/socket/") => { if is_upgrade_request(&request) { service.handle_api_socket(&mut request).await } else { msgpack( ErrorResponse::bad_request( "/api/socket must be called as a websocket upgrade request", ), StatusCode::BAD_REQUEST, ) } } (&Method::POST, "/api/serialize") => service.handle_api_serialize(request).await, (&Method::POST, "/api/ref-patch") => service.handle_api_ref_patch(request).await, (&Method::POST, path) if path.starts_with("/api/open/") => { service.handle_api_open(request).await } (&Method::POST, "/api/write") => service.handle_api_write(request).await, (_method, path) => msgpack( ErrorResponse::not_found(format!("Route not found: {}", path)), StatusCode::NOT_FOUND, ), } } pub struct ApiService { serve_session: Arc, } impl ApiService { pub fn new(serve_session: Arc) -> Self { ApiService { serve_session } } /// Get a summary of information about the server async fn handle_api_rojo(&self) -> Response { let tree = self.serve_session.tree(); let root_instance_id = tree.get_root_id(); msgpack_ok(&ServerInfoResponse { server_version: SERVER_VERSION.to_owned(), protocol_version: PROTOCOL_VERSION, session_id: self.serve_session.session_id(), project_name: self.serve_session.project_name().to_owned(), expected_place_ids: self.serve_session.serve_place_ids().cloned(), unexpected_place_ids: self.serve_session.blocked_place_ids().cloned(), place_id: self.serve_session.place_id(), game_id: self.serve_session.game_id(), root_instance_id, }) } /// Handle WebSocket upgrade for real-time message streaming async fn handle_api_socket(&self, request: &mut Request) -> Response { let argument = &request.uri().path()["/api/socket/".len()..]; let input_cursor: u32 = match argument.parse() { Ok(v) => v, Err(err) => { return msgpack( ErrorResponse::bad_request(format!("Malformed message cursor: {}", err)), StatusCode::BAD_REQUEST, ); } }; // Upgrade the connection to WebSocket let (response, websocket) = match upgrade(request, None) { Ok(result) => result, Err(err) => { return msgpack( ErrorResponse::internal_error(format!("WebSocket upgrade failed: {}", err)), StatusCode::INTERNAL_SERVER_ERROR, ); } }; let serve_session = Arc::clone(&self.serve_session); // Spawn a task to handle the WebSocket connection tokio::spawn(async move { if let Err(e) = handle_websocket_subscription(serve_session, websocket, input_cursor).await { log::error!("Error in websocket subscription: {}", e); } }); response } async fn handle_api_write(&self, request: Request) -> Response { let session_id = self.serve_session.session_id(); let tree_mutation_sender = self.serve_session.tree_mutation_sender(); let body = body::to_bytes(request.into_body()).await.unwrap(); let request: WriteRequest = match deserialize_msgpack(&body) { Ok(request) => request, Err(err) => { return msgpack( ErrorResponse::bad_request(format!("Invalid body: {}", err)), StatusCode::BAD_REQUEST, ); } }; if request.session_id != session_id { return msgpack( ErrorResponse::bad_request("Wrong session ID"), StatusCode::BAD_REQUEST, ); } let updated_instances = request .updated .into_iter() .map(|update| PatchUpdate { id: update.id, changed_class_name: update.changed_class_name, changed_name: update.changed_name, changed_properties: update.changed_properties, changed_metadata: None, }) .collect(); tree_mutation_sender .send(PatchSet { removed_instances: Vec::new(), added_instances: Vec::new(), updated_instances, }) .unwrap(); msgpack_ok(WriteResponse { session_id }) } async fn handle_api_read(&self, request: Request) -> Response { let argument = &request.uri().path()["/api/read/".len()..]; let requested_ids: Result, _> = argument.split(',').map(Ref::from_str).collect(); let requested_ids = match requested_ids { Ok(ids) => ids, Err(_) => { return msgpack( ErrorResponse::bad_request("Malformed ID list"), StatusCode::BAD_REQUEST, ); } }; let message_queue = self.serve_session.message_queue(); let message_cursor = message_queue.cursor(); let tree = self.serve_session.tree(); let mut instances = HashMap::new(); for id in requested_ids { if let Some(instance) = tree.get_instance(id) { instances.insert(id, Instance::from_rojo_instance(instance)); for descendant in tree.descendants(id) { instances.insert(descendant.id(), Instance::from_rojo_instance(descendant)); } } } msgpack_ok(ReadResponse { session_id: self.serve_session.session_id(), message_cursor, instances, }) } /// Accepts a list of IDs and returns them serialized as a binary model. /// The model is sent in a schema that causes Roblox to deserialize it as /// a Luau `buffer`. /// /// The returned model is a folder that contains ObjectValues with names /// that correspond to the requested Instances. These values have their /// `Value` property set to point to the requested Instance. async fn handle_api_serialize(&self, request: Request) -> Response { let session_id = self.serve_session.session_id(); let body = body::to_bytes(request.into_body()).await.unwrap(); let request: SerializeRequest = match deserialize_msgpack(&body) { Ok(request) => request, Err(err) => { return msgpack( ErrorResponse::bad_request(format!("Invalid body: {}", err)), StatusCode::BAD_REQUEST, ); } }; if request.session_id != session_id { return msgpack( ErrorResponse::bad_request("Wrong session ID"), StatusCode::BAD_REQUEST, ); } let mut response_dom = WeakDom::new(InstanceBuilder::new("Folder")); let tree = self.serve_session.tree(); for id in &request.ids { if let Some(instance) = tree.get_instance(*id) { let clone = response_dom.insert( Ref::none(), InstanceBuilder::new(instance.class_name()) .with_name(instance.name()) .with_properties(instance.properties().clone()), ); let object_value = response_dom.insert( response_dom.root_ref(), InstanceBuilder::new("ObjectValue") .with_name(id.to_string()) .with_property("Value", clone), ); let mut child_ref = clone; if let Some(parent_class) = parent_requirements(&instance.class_name()) { child_ref = response_dom.insert(object_value, InstanceBuilder::new(parent_class)); response_dom.transfer_within(clone, child_ref); } response_dom.transfer_within(child_ref, object_value); } else { msgpack( ErrorResponse::bad_request(format!("provided id {id} is not in the tree")), StatusCode::BAD_REQUEST, ); } } drop(tree); let mut source = Vec::new(); rbx_binary::to_writer(&mut source, &response_dom, &[response_dom.root_ref()]).unwrap(); msgpack_ok(SerializeResponse { session_id: self.serve_session.session_id(), model_contents: source, }) } /// Returns a list of all referent properties that point towards the /// provided IDs. Used because the plugin does not store a RojoTree, /// and referent properties need to be updated after the serialize /// endpoint is used. async fn handle_api_ref_patch(self, request: Request) -> Response { let session_id = self.serve_session.session_id(); let body = body::to_bytes(request.into_body()).await.unwrap(); let request: RefPatchRequest = match deserialize_msgpack(&body) { Ok(request) => request, Err(err) => { return msgpack( ErrorResponse::bad_request(format!("Invalid body: {}", err)), StatusCode::BAD_REQUEST, ); } }; if request.session_id != session_id { return msgpack( ErrorResponse::bad_request("Wrong session ID"), StatusCode::BAD_REQUEST, ); } let mut instance_updates: HashMap = HashMap::new(); let tree = self.serve_session.tree(); for instance in tree.descendants(tree.get_root_id()) { for (prop_name, prop_value) in instance.properties() { let Variant::Ref(prop_value) = prop_value else { continue; }; if let Some(target_id) = request.ids.get(prop_value) { let instance_id = instance.id(); let update = instance_updates .entry(instance_id) .or_insert_with(|| InstanceUpdate { id: instance_id, changed_class_name: None, changed_name: None, changed_metadata: None, changed_properties: UstrMap::default(), }); update .changed_properties .insert(*prop_name, Some(Variant::Ref(*target_id))); } } } msgpack_ok(RefPatchResponse { session_id: self.serve_session.session_id(), patch: SubscribeMessage { added: HashMap::new(), removed: Vec::new(), updated: instance_updates.into_values().collect(), }, }) } /// Open a script with the given ID in the user's default text editor. async fn handle_api_open(&self, request: Request) -> Response { let argument = &request.uri().path()["/api/open/".len()..]; let requested_id = match Ref::from_str(argument) { Ok(id) => id, Err(_) => { return msgpack( ErrorResponse::bad_request("Invalid instance ID"), StatusCode::BAD_REQUEST, ); } }; let tree = self.serve_session.tree(); let instance = match tree.get_instance(requested_id) { Some(instance) => instance, None => { return msgpack( ErrorResponse::bad_request("Instance not found"), StatusCode::NOT_FOUND, ); } }; let script_path = match pick_script_path(instance) { Some(path) => path, None => { return msgpack( ErrorResponse::bad_request( "No appropriate file could be found to open this script", ), StatusCode::NOT_FOUND, ); } }; match opener::open(&script_path) { Ok(()) => {} Err(error) => match error { OpenError::Io(io_error) => { return msgpack( ErrorResponse::internal_error(format!( "Attempting to open {} failed because of the following io error: {}", script_path.display(), io_error )), StatusCode::INTERNAL_SERVER_ERROR, ) } OpenError::ExitStatus { cmd, status, stderr, } => { return msgpack( ErrorResponse::internal_error(format!( r#"The command '{}' to open '{}' failed with the error code '{}'. Error logs: {}"#, cmd, script_path.display(), status, stderr )), StatusCode::INTERNAL_SERVER_ERROR, ) } }, }; msgpack_ok(OpenResponse { session_id: self.serve_session.session_id(), }) } } /// If this instance is represented by a script, try to find the correct .lua or .luau /// file to open to edit it. fn pick_script_path(instance: InstanceWithMeta<'_>) -> Option { match instance.class_name().as_str() { "Script" | "LocalScript" | "ModuleScript" => {} _ => return None, } // Pick the first listed relevant path that has an extension of .lua or .luau that // exists. instance .metadata() .relevant_paths .iter() .find(|path| { // We should only ever open Lua or Luau files to be safe. match path.extension().and_then(|ext| ext.to_str()) { Some("lua") => {} Some("luau") => {} _ => return false, } fs::metadata(path) .map(|meta| meta.is_file()) .unwrap_or(false) }) .map(|path| path.to_owned()) } /// Handle WebSocket connection for streaming subscription messages async fn handle_websocket_subscription( serve_session: Arc, websocket: HyperWebsocket, input_cursor: u32, ) -> anyhow::Result<()> { let mut websocket = websocket.await?; let session_id = serve_session.session_id(); let tree_handle = serve_session.tree_handle(); let message_queue = serve_session.message_queue(); log::debug!( "WebSocket subscription established for session {}", session_id ); // Now continuously listen for new messages using select to handle both incoming messages // and WebSocket control messages concurrently let mut cursor = input_cursor; loop { let receiver = message_queue.subscribe(cursor); tokio::select! { // Handle new messages from the message queue result = receiver => { match result { Ok((new_cursor, messages)) => { if !messages.is_empty() { let msgpack_message = { let tree = tree_handle.lock().unwrap(); let api_messages = messages .into_iter() .map(|patch| SubscribeMessage::from_patch_update(&tree, patch)) .collect(); let response = SocketPacket { session_id, packet_type: SocketPacketType::Messages, body: SocketPacketBody::Messages(MessagesPacket { message_cursor: new_cursor, messages: api_messages, }), }; serialize_msgpack(response)? }; log::debug!("Sending batch of messages over WebSocket subscription"); if websocket.send(Message::Binary(msgpack_message)).await.is_err() { // Client disconnected log::debug!("WebSocket subscription closed by client"); break; } cursor = new_cursor; } } Err(_) => { // Message queue disconnected log::debug!("Message queue disconnected; closing WebSocket subscription"); let _ = websocket.send(Message::Close(None)).await; break; } } } // Handle incoming WebSocket messages (ping/pong/close) msg = websocket.next() => { match msg { Some(Ok(Message::Close(_))) => { log::debug!("WebSocket subscription closed by client"); break; } Some(Ok(Message::Ping(data))) => { // tungstenite handles pong automatically log::debug!("Received ping: {:?}", data); } Some(Ok(Message::Pong(data))) => { log::debug!("Received pong: {:?}", data); } Some(Ok(Message::Text(_))) | Some(Ok(Message::Binary(_))) => { // Ignore text/binary messages from client for subscription endpoint // TODO: Use this for bidirectional sync or requesting fallbacks? log::debug!("Ignoring message from client since we don't use it for anything yet: {:?}", msg); } Some(Ok(Message::Frame(_))) => { // This should never happen according to tungstenite docs unreachable!(); } Some(Err(e)) => { log::error!("WebSocket error: {}", e); break; } None => { // WebSocket stream ended log::debug!("WebSocket stream ended"); break; } } } } } Ok(()) } /// Certain Instances MUST be a child of specific classes. This function /// tracks that information for the Serialize endpoint. /// /// If a parent requirement exists, it will be returned. /// Otherwise returns `None`. fn parent_requirements(class: &str) -> Option<&str> { Some(match class { "Attachment" | "Bone" => "Part", "Animator" => "Humanoid", "BaseWrap" | "WrapLayer" | "WrapTarget" | "WrapDeformer" => "MeshPart", _ => return None, }) }