diff --git a/app/coffee/modules/events.coffee b/app/coffee/modules/events.coffee index 7b6ac3d9..84468cc9 100644 --- a/app/coffee/modules/events.coffee +++ b/app/coffee/modules/events.coffee @@ -31,16 +31,17 @@ class EventsService initialize: (sessionId) -> @.sessionId = sessionId @.subscriptions = {} + @.connected = false + @.error = false + @.pendingMessages = [] if @win.WebSocket is undefined - @log.debug "WebSockets not supported on your browser" + @log.info "WebSockets not supported on your browser" setupConnection: -> @.stopExistingConnection() - wshost = @config.get("eventsHost", "localhost:8888") - wsscheme = @config.get("eventsScheme", "ws") - url = "#{wsscheme}://#{wshost}/events" + url = @config.get("eventsUrl", "ws://localhost:8888/events") @.ws = new @win.WebSocket(url) @.ws.addEventListener("open", @.onOpen) @@ -52,15 +53,67 @@ class EventsService if @.ws is undefined return - @.ws.close() @.ws.removeEventListener("open", @.onOpen) @.ws.removeEventListener("close", @.onClose) @.ws.removeEventListener("error", @.onError) @.ws.removeEventListener("message", @.onMessage) + @.ws.close() delete @.ws + serialize: (message) -> + if _.isObject(message) + return JSON.stringify(message) + return message + + sendMessage: (message) -> + @.pendingMessages.push(message) + + if not @.connected + return + + messages = _.map(@.serialize, @.pendingMessages) + @.pendingMessages = [] + + for msg in messages + @.ws.send(msg) + + subscribe: (scope, routingKey, callback) -> + if @.error + return + + @log.debug("Subscribe to: #{routingKey}") + subscription = { + scope: scope, + routingKey: routingKey, + callback: _.debounce(callback, 500, {"leading": true, "trailing": false}) + } + + message = { + "cmd": "subscribe", + "routing_key": routingKey + } + + @.subscriptions[routingKey] = subscription + @.sendMessage(message) + scope.$on("$destroy", => @.unsubscribe(routingKey)) + + unsubscribe: (routingKey) -> + if @.error + return + + @log.debug("Unsubscribe from: #{routingKey}") + + message = { + "cmd": "unsubscribe", + "routing_key": routingKey + } + + @.sendMessage(message) + onOpen: -> + @.connected = true + @log.debug("WebSocket connection opened") token = @auth.getToken() @@ -69,7 +122,7 @@ class EventsService data: {token: token, sessionId: @.sessionId} } - @.ws.send(JSON.stringify(message)) + @.sendMessage(message) onMessage: (event) -> @.log.debug "WebSocket message received: #{event.data}" @@ -86,33 +139,11 @@ class EventsService onError: (error) -> @log.error("WebSocket error: #{error}") + @.error = true onClose: -> @log.debug("WebSocket closed.") - - subscribe: (scope, routingKey, callback) -> - subscription = { - scope: scope, - routingKey: routingKey, - callback: callback - } - - message = { - "cmd": "subscribe", - "routing_key": routingKey - } - - @.subscriptions[routingKey] = subscription - @.ws.send(JSON.stringify(message)) - scope.$on("$destroy", => @.unsubscribe(routingKey)) - - unsubscribe: (routingKey) -> - message = { - "cmd": "unsubscribe", - "routing_key": routingKey - } - - @.ws.send(JSON.stringify(message)) + @.connected = false class EventsProvider