Improved connection management on events service.
parent
ae61f8ac07
commit
4324e4f044
|
@ -31,16 +31,17 @@ class EventsService
|
|||
initialize: (sessionId) ->
|
||||
@.sessionId = sessionId
|
||||
@.subscriptions = {}
|
||||
@.connected = false
|
||||
@.error = false
|
||||
@.pendingMessages = []
|
||||
|
||||
if @win.WebSocket is undefined
|
||||
@log.debug "WebSockets not supported on your browser"
|
||||
@log.info "WebSockets not supported on your browser"
|
||||
|
||||
setupConnection: ->
|
||||
@.stopExistingConnection()
|
||||
|
||||
wshost = @config.get("eventsHost", "localhost:8888")
|
||||
wsscheme = @config.get("eventsScheme", "ws")
|
||||
url = "#{wsscheme}://#{wshost}/events"
|
||||
url = @config.get("eventsUrl", "ws://localhost:8888/events")
|
||||
|
||||
@.ws = new @win.WebSocket(url)
|
||||
@.ws.addEventListener("open", @.onOpen)
|
||||
|
@ -52,15 +53,67 @@ class EventsService
|
|||
if @.ws is undefined
|
||||
return
|
||||
|
||||
@.ws.close()
|
||||
@.ws.removeEventListener("open", @.onOpen)
|
||||
@.ws.removeEventListener("close", @.onClose)
|
||||
@.ws.removeEventListener("error", @.onError)
|
||||
@.ws.removeEventListener("message", @.onMessage)
|
||||
@.ws.close()
|
||||
|
||||
delete @.ws
|
||||
|
||||
serialize: (message) ->
|
||||
if _.isObject(message)
|
||||
return JSON.stringify(message)
|
||||
return message
|
||||
|
||||
sendMessage: (message) ->
|
||||
@.pendingMessages.push(message)
|
||||
|
||||
if not @.connected
|
||||
return
|
||||
|
||||
messages = _.map(@.serialize, @.pendingMessages)
|
||||
@.pendingMessages = []
|
||||
|
||||
for msg in messages
|
||||
@.ws.send(msg)
|
||||
|
||||
subscribe: (scope, routingKey, callback) ->
|
||||
if @.error
|
||||
return
|
||||
|
||||
@log.debug("Subscribe to: #{routingKey}")
|
||||
subscription = {
|
||||
scope: scope,
|
||||
routingKey: routingKey,
|
||||
callback: _.debounce(callback, 500, {"leading": true, "trailing": false})
|
||||
}
|
||||
|
||||
message = {
|
||||
"cmd": "subscribe",
|
||||
"routing_key": routingKey
|
||||
}
|
||||
|
||||
@.subscriptions[routingKey] = subscription
|
||||
@.sendMessage(message)
|
||||
scope.$on("$destroy", => @.unsubscribe(routingKey))
|
||||
|
||||
unsubscribe: (routingKey) ->
|
||||
if @.error
|
||||
return
|
||||
|
||||
@log.debug("Unsubscribe from: #{routingKey}")
|
||||
|
||||
message = {
|
||||
"cmd": "unsubscribe",
|
||||
"routing_key": routingKey
|
||||
}
|
||||
|
||||
@.sendMessage(message)
|
||||
|
||||
onOpen: ->
|
||||
@.connected = true
|
||||
|
||||
@log.debug("WebSocket connection opened")
|
||||
token = @auth.getToken()
|
||||
|
||||
|
@ -69,7 +122,7 @@ class EventsService
|
|||
data: {token: token, sessionId: @.sessionId}
|
||||
}
|
||||
|
||||
@.ws.send(JSON.stringify(message))
|
||||
@.sendMessage(message)
|
||||
|
||||
onMessage: (event) ->
|
||||
@.log.debug "WebSocket message received: #{event.data}"
|
||||
|
@ -86,33 +139,11 @@ class EventsService
|
|||
|
||||
onError: (error) ->
|
||||
@log.error("WebSocket error: #{error}")
|
||||
@.error = true
|
||||
|
||||
onClose: ->
|
||||
@log.debug("WebSocket closed.")
|
||||
|
||||
subscribe: (scope, routingKey, callback) ->
|
||||
subscription = {
|
||||
scope: scope,
|
||||
routingKey: routingKey,
|
||||
callback: callback
|
||||
}
|
||||
|
||||
message = {
|
||||
"cmd": "subscribe",
|
||||
"routing_key": routingKey
|
||||
}
|
||||
|
||||
@.subscriptions[routingKey] = subscription
|
||||
@.ws.send(JSON.stringify(message))
|
||||
scope.$on("$destroy", => @.unsubscribe(routingKey))
|
||||
|
||||
unsubscribe: (routingKey) ->
|
||||
message = {
|
||||
"cmd": "unsubscribe",
|
||||
"routing_key": routingKey
|
||||
}
|
||||
|
||||
@.ws.send(JSON.stringify(message))
|
||||
@.connected = false
|
||||
|
||||
|
||||
class EventsProvider
|
||||
|
|
Loading…
Reference in New Issue