Improve Rojo sync reliability (#739)

Uses non-recursive main loop and avoids hanging promises
This commit is contained in:
boatbomber
2023-07-16 14:59:30 -07:00
committed by GitHub
parent dd01a9bef3
commit 8662d2227c
2 changed files with 35 additions and 29 deletions

View File

@@ -11,13 +11,6 @@ local validateApiInfo = Types.ifEnabled(Types.ApiInfoResponse)
local validateApiRead = Types.ifEnabled(Types.ApiReadResponse) local validateApiRead = Types.ifEnabled(Types.ApiReadResponse)
local validateApiSubscribe = Types.ifEnabled(Types.ApiSubscribeResponse) local validateApiSubscribe = Types.ifEnabled(Types.ApiSubscribeResponse)
--[[
Returns a promise that will never resolve nor reject.
]]
local function hangingPromise()
return Promise.new(function() end)
end
local function rejectFailedRequests(response) local function rejectFailedRequests(response)
if response.code >= 400 then if response.code >= 400 then
local message = string.format("HTTP %s:\n%s", tostring(response.code), response.body) local message = string.format("HTTP %s:\n%s", tostring(response.code), response.body)
@@ -212,12 +205,8 @@ function ApiContext:retrieveMessages()
local function sendRequest() local function sendRequest()
local request = Http.get(url) local request = Http.get(url)
:catch(function(err) :catch(function(err)
if err.type == Http.Error.Kind.Timeout then if err.type == Http.Error.Kind.Timeout and self.__connected then
if self.__connected then
return sendRequest() return sendRequest()
else
return hangingPromise()
end
end end
return Promise.reject(err) return Promise.reject(err)

View File

@@ -299,11 +299,19 @@ function ServeSession:__initialSync(serverInfo)
end end
function ServeSession:__mainSyncLoop() function ServeSession:__mainSyncLoop()
return self.__apiContext:retrieveMessages() return Promise.new(function(resolve, reject)
while self.__status == Status.Connected do
local success, result = self.__apiContext:retrieveMessages()
:andThen(function(messages) :andThen(function(messages)
if self.__status == Status.Disconnected then
-- In the time it took to retrieve messages, we disconnected
-- so we just resolve immediately without patching anything
return
end
Log.trace("Serve session {} retrieved {} messages", tostring(self), #messages) Log.trace("Serve session {} retrieved {} messages", tostring(self), #messages)
for _, message in ipairs(messages) do for _, message in messages do
local unappliedPatch = self.__reconciler:applyPatch(message) local unappliedPatch = self.__reconciler:applyPatch(message)
if not PatchSet.isEmpty(unappliedPatch) then if not PatchSet.isEmpty(unappliedPatch) then
@@ -315,10 +323,19 @@ function ServeSession:__mainSyncLoop()
pcall(self.__patchAppliedCallback, message, unappliedPatch) pcall(self.__patchAppliedCallback, message, unappliedPatch)
end end
end end
end):await()
if self.__status ~= Status.Disconnected then if self.__status == Status.Disconnected then
return self:__mainSyncLoop() -- If we are no longer connected after applying, we stop silently
-- without checking for errors as they are no longer relevant
break
elseif success == false then
reject(result)
end end
end
-- We are no longer connected, so we resolve the promise
resolve()
end) end)
end end