Use WebSocket instead of Long Polling (#1142)

This commit is contained in:
boatbomber
2025-11-26 19:57:01 -08:00
committed by GitHub
parent a61a1bef55
commit 87f58e0a55
44 changed files with 1750 additions and 971 deletions

View File

@@ -9,7 +9,9 @@ use std::{
sync::Arc,
};
use futures::{sink::SinkExt, stream::StreamExt};
use hyper::{body, Body, Method, Request, Response, StatusCode};
use hyper_tungstenite::{is_upgrade_request, tungstenite::Message, upgrade, HyperWebsocket};
use opener::OpenError;
use rbx_dom_weak::{
types::{Ref, Variant},
@@ -22,16 +24,16 @@ use crate::{
snapshot::{InstanceWithMeta, PatchSet, PatchUpdate},
web::{
interface::{
ErrorResponse, Instance, OpenResponse, ReadResponse, ServerInfoResponse,
SubscribeMessage, SubscribeResponse, WriteRequest, WriteResponse, PROTOCOL_VERSION,
SERVER_VERSION,
ErrorResponse, Instance, MessagesPacket, OpenResponse, ReadResponse,
ServerInfoResponse, SocketPacket, SocketPacketBody, SocketPacketType, SubscribeMessage,
WriteRequest, WriteResponse, PROTOCOL_VERSION, SERVER_VERSION,
},
util::{json, json_ok},
},
web_api::{BufferEncode, InstanceUpdate, RefPatchResponse, SerializeResponse},
};
pub async fn call(serve_session: Arc<ServeSession>, request: Request<Body>) -> Response<Body> {
pub async fn call(serve_session: Arc<ServeSession>, mut request: Request<Body>) -> Response<Body> {
let service = ApiService::new(serve_session);
match (request.method(), request.uri().path()) {
@@ -39,8 +41,17 @@ pub async fn call(serve_session: Arc<ServeSession>, request: Request<Body>) -> R
(&Method::GET, path) if path.starts_with("/api/read/") => {
service.handle_api_read(request).await
}
(&Method::GET, path) if path.starts_with("/api/subscribe/") => {
service.handle_api_subscribe(request).await
(&Method::GET, path) if path.starts_with("/api/socket/") => {
if is_upgrade_request(&request) {
service.handle_api_socket(&mut request).await
} else {
json(
ErrorResponse::bad_request(
"/api/socket must be called as a websocket upgrade request",
),
StatusCode::BAD_REQUEST,
)
}
}
(&Method::GET, path) if path.starts_with("/api/serialize/") => {
service.handle_api_serialize(request).await
@@ -88,10 +99,9 @@ impl ApiService {
})
}
/// Retrieve any messages past the given cursor index, and if
/// there weren't any, subscribe to receive any new messages.
async fn handle_api_subscribe(&self, request: Request<Body>) -> Response<Body> {
let argument = &request.uri().path()["/api/subscribe/".len()..];
/// Handle WebSocket upgrade for real-time message streaming
async fn handle_api_socket(&self, request: &mut Request<Body>) -> Response<Body> {
let argument = &request.uri().path()["/api/socket/".len()..];
let input_cursor: u32 = match argument.parse() {
Ok(v) => v,
Err(err) => {
@@ -102,36 +112,29 @@ impl ApiService {
}
};
let session_id = self.serve_session.session_id();
let result = self
.serve_session
.message_queue()
.subscribe(input_cursor)
.await;
let tree_handle = self.serve_session.tree_handle();
match result {
Ok((message_cursor, messages)) => {
let tree = tree_handle.lock().unwrap();
let api_messages = messages
.into_iter()
.map(|patch| SubscribeMessage::from_patch_update(&tree, patch))
.collect();
json_ok(SubscribeResponse {
session_id,
message_cursor,
messages: api_messages,
})
// Upgrade the connection to WebSocket
let (response, websocket) = match upgrade(request, None) {
Ok(result) => result,
Err(err) => {
return json(
ErrorResponse::internal_error(format!("WebSocket upgrade failed: {}", err)),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
Err(_) => json(
ErrorResponse::internal_error("Message queue disconnected sender"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
};
let serve_session = Arc::clone(&self.serve_session);
// Spawn a task to handle the WebSocket connection
tokio::spawn(async move {
if let Err(e) =
handle_websocket_subscription(serve_session, websocket, input_cursor).await
{
log::error!("Error in websocket subscription: {}", e);
}
});
response
}
async fn handle_api_write(&self, request: Request<Body>) -> Response<Body> {
@@ -444,6 +447,113 @@ fn pick_script_path(instance: InstanceWithMeta<'_>) -> Option<PathBuf> {
.map(|path| path.to_owned())
}
/// Handle WebSocket connection for streaming subscription messages
async fn handle_websocket_subscription(
serve_session: Arc<ServeSession>,
websocket: HyperWebsocket,
input_cursor: u32,
) -> anyhow::Result<()> {
let mut websocket = websocket.await?;
let session_id = serve_session.session_id();
let tree_handle = serve_session.tree_handle();
let message_queue = serve_session.message_queue();
log::debug!(
"WebSocket subscription established for session {}",
session_id
);
// Now continuously listen for new messages using select to handle both incoming messages
// and WebSocket control messages concurrently
let mut cursor = input_cursor;
loop {
let receiver = message_queue.subscribe(cursor);
tokio::select! {
// Handle new messages from the message queue
result = receiver => {
match result {
Ok((new_cursor, messages)) => {
if !messages.is_empty() {
let json_message = {
let tree = tree_handle.lock().unwrap();
let api_messages = messages
.into_iter()
.map(|patch| SubscribeMessage::from_patch_update(&tree, patch))
.collect();
let response = SocketPacket {
session_id,
packet_type: SocketPacketType::Messages,
body: SocketPacketBody::Messages(MessagesPacket {
message_cursor: new_cursor,
messages: api_messages,
}),
};
serde_json::to_string(&response)?
};
log::debug!("Sending batch of messages over WebSocket subscription");
if websocket.send(Message::Text(json_message)).await.is_err() {
// Client disconnected
log::debug!("WebSocket subscription closed by client");
break;
}
cursor = new_cursor;
}
}
Err(_) => {
// Message queue disconnected
log::debug!("Message queue disconnected; closing WebSocket subscription");
let _ = websocket.send(Message::Close(None)).await;
break;
}
}
}
// Handle incoming WebSocket messages (ping/pong/close)
msg = websocket.next() => {
match msg {
Some(Ok(Message::Close(_))) => {
log::debug!("WebSocket subscription closed by client");
break;
}
Some(Ok(Message::Ping(data))) => {
// tungstenite handles pong automatically
log::debug!("Received ping: {:?}", data);
}
Some(Ok(Message::Pong(data))) => {
log::debug!("Received pong: {:?}", data);
}
Some(Ok(Message::Text(_))) | Some(Ok(Message::Binary(_))) => {
// Ignore text/binary messages from client for subscription endpoint
// TODO: Use this for bidirectional sync or requesting fallbacks?
log::debug!("Ignoring message from client since we don't use it for anything yet: {:?}", msg);
}
Some(Ok(Message::Frame(_))) => {
// This should never happen according to tungstenite docs
unreachable!();
}
Some(Err(e)) => {
log::error!("WebSocket error: {}", e);
break;
}
None => {
// WebSocket stream ended
log::debug!("WebSocket stream ended");
break;
}
}
}
}
}
Ok(())
}
/// Certain Instances MUST be a child of specific classes. This function
/// tracks that information for the Serialize endpoint.
///