Refactor Session and ApiContext to allow cancelation

This commit is contained in:
Lucien Greathouse
2019-01-11 15:45:32 -08:00
parent f21f01be1a
commit 30ce927621
3 changed files with 83 additions and 75 deletions

View File

@@ -19,17 +19,14 @@ setmetatable(ApiContext.Error, {
end end
}) })
-- TODO: Switch to onMessages and batch processing
function ApiContext.new(baseUrl) function ApiContext.new(baseUrl)
assert(type(baseUrl) == "string") assert(type(baseUrl) == "string")
local self = { local self = {
baseUrl = baseUrl, baseUrl = baseUrl,
onMessageCallback = nil,
serverId = nil, serverId = nil,
rootInstanceId = nil, rootInstanceId = nil,
instanceMetadataMap = nil, instanceMetadataMap = nil,
connected = false,
messageCursor = -1, messageCursor = -1,
partitionRoutes = nil, partitionRoutes = nil,
} }
@@ -100,21 +97,14 @@ function ApiContext:connect()
self.partitionRoutes = body.partitions self.partitionRoutes = body.partitions
self.rootInstanceId = body.rootInstanceId self.rootInstanceId = body.rootInstanceId
self.instanceMetadataMap = body.instanceMetadataMap self.instanceMetadataMap = body.instanceMetadataMap
self.connected = true
end) end)
end end
function ApiContext:read(ids) function ApiContext:read(ids)
if not self.connected then
return Promise.reject()
end
local url = ("%s/api/read/%s"):format(self.baseUrl, table.concat(ids, ",")) local url = ("%s/api/read/%s"):format(self.baseUrl, table.concat(ids, ","))
return Http.get(url) return Http.get(url)
:catch(function(err) :catch(function(err)
self.connected = false
return Promise.reject(err) return Promise.reject(err)
end) end)
:andThen(function(response) :andThen(function(response)
@@ -131,10 +121,6 @@ function ApiContext:read(ids)
end end
function ApiContext:retrieveMessages() function ApiContext:retrieveMessages()
if not self.connected then
return Promise.reject()
end
local url = ("%s/api/subscribe/%s"):format(self.baseUrl, self.messageCursor) local url = ("%s/api/subscribe/%s"):format(self.baseUrl, self.messageCursor)
return Http.get(url) return Http.get(url)
@@ -143,8 +129,6 @@ function ApiContext:retrieveMessages()
return self:retrieveMessages() return self:retrieveMessages()
end end
self.connected = false
return Promise.reject(err) return Promise.reject(err)
end) end)
:andThen(function(response) :andThen(function(response)
@@ -154,20 +138,9 @@ function ApiContext:retrieveMessages()
return Promise.reject("Server changed ID") return Promise.reject("Server changed ID")
end end
local promise = Promise.resolve(nil)
for _, message in ipairs(body.messages) do
promise = promise:andThen(function()
return self.onMessageCallback(message)
end)
end
self.messageCursor = body.messageCursor self.messageCursor = body.messageCursor
return promise return body.messages
end)
:andThen(function()
return self:retrieveMessages()
end) end)
end end

View File

@@ -95,8 +95,9 @@ function App:render()
local success, session = Session.new({ local success, session = Session.new({
address = address, address = address,
port = port, port = port,
onError = function() onError = function(message)
Logging.trace("Session terminated") Logging.warn("%s", tostring(message))
Logging.trace("Session terminated due to error")
self.currentSession = nil self.currentSession = nil
self:setState({ self:setState({
@@ -145,12 +146,13 @@ function App:didMount()
if self.state.sessionStatus == SessionStatus.Connected then if self.state.sessionStatus == SessionStatus.Connected then
Logging.trace("Disconnecting session") Logging.trace("Disconnecting session")
self.currentSession:disconnect()
self.currentSession = nil self.currentSession = nil
self:setState({ self:setState({
sessionStatus = SessionStatus.Disconnected, sessionStatus = SessionStatus.Disconnected,
}) })
error("TODO: Actually disconnect old session") Logging.trace("Session terminated by user")
elseif self.state.sessionStatus == SessionStatus.Disconnected then elseif self.state.sessionStatus == SessionStatus.Disconnected then
Logging.trace("Starting session configuration") Logging.trace("Starting session configuration")

View File

@@ -1,66 +1,99 @@
local Rojo = script:FindFirstAncestor("Rojo")
local Promise = require(Rojo.Promise)
local ApiContext = require(script.Parent.ApiContext) local ApiContext = require(script.Parent.ApiContext)
local Logging = require(script.Parent.Logging)
local Reconciler = require(script.Parent.Reconciler) local Reconciler = require(script.Parent.Reconciler)
local Session = {} local Session = {}
Session.__index = Session Session.__index = Session
function Session.new(config) function Session.new(config)
local self = {}
self.onError = config.onError
local hasErrors = false
local reconciler
local remoteUrl = ("http://%s:%s"):format(config.address, config.port) local remoteUrl = ("http://%s:%s"):format(config.address, config.port)
local api = ApiContext.new(remoteUrl) local api = ApiContext.new(remoteUrl)
ApiContext:onMessage(function(message) local self = {
local requestedIds = {} onError = config.onError,
disconnected = false,
for _, id in ipairs(message.added) do reconciler = nil,
table.insert(requestedIds, id) api = api,
end }
for _, id in ipairs(message.updated) do
table.insert(requestedIds, id)
end
for _, id in ipairs(message.removed) do
table.insert(requestedIds, id)
end
return api:read(requestedIds)
:andThen(function(response)
return reconciler:applyUpdate(requestedIds, response.instances)
end)
:catch(function(message)
hasErrors = true
Logging.warn("%s", tostring(message))
self.onError()
end)
end)
api:connect() api:connect()
:andThen(function() :andThen(function()
reconciler = Reconciler.new(api.instanceMetadataMap) if self.disconnected then
return Promise.resolve()
end
self.reconciler = Reconciler.new(api.instanceMetadataMap)
return api:read({api.rootInstanceId}) return api:read({api.rootInstanceId})
end) :andThen(function(response)
:andThen(function(response) if self.disconnected then
reconciler:reconcile(response.instances, api.rootInstanceId, game) return Promise.resolve()
return api:retrieveMessages() end
self.reconciler:reconcile(response.instances, api.rootInstanceId, game)
return self:__processMessages()
end)
end) end)
:catch(function(message) :catch(function(message)
hasErrors = true self.disconnected = true
Logging.warn("%s", tostring(message)) self.onError(message)
self.onError()
end) end)
return not hasErrors, setmetatable(self, Session) return not self.disconnected, setmetatable(self, Session)
end
function Session:__processMessages()
if self.disconnected then
return Promise.resolve()
end
return self.api:retrieveMessages()
:andThen(function(messages)
local promise = Promise.resolve(nil)
for _, message in ipairs(messages) do
promise = promise:andThen(function()
return self:__onMessage(message)
end)
end
return promise
end)
:andThen(function()
return self:__processMessages()
end)
end
function Session:__onMessage(message)
if self.disconnected then
return Promise.resolve()
end
local requestedIds = {}
for _, id in ipairs(message.added) do
table.insert(requestedIds, id)
end
for _, id in ipairs(message.updated) do
table.insert(requestedIds, id)
end
for _, id in ipairs(message.removed) do
table.insert(requestedIds, id)
end
return self.api:read(requestedIds)
:andThen(function(response)
return self.reconciler:applyUpdate(requestedIds, response.instances)
end)
end
function Session:disconnect()
self.disconnected = true
end end
return Session return Session