handler taiga events

master
Juanfran 2015-02-12 15:00:58 +01:00
parent d38199cc76
commit b23226b30d
5 changed files with 107 additions and 1 deletions

36
clients.coffee Normal file
View File

@ -0,0 +1,36 @@
clients = []
subscriptions = {}
removeSubscription = (id, routing_key) ->
if subscriptions[routing_key]
subscriptions[routing_key] = subscriptions[routing_key].filter (client) -> client.id != id
exports.removeById = (id) ->
clients = clients.filter (client) -> client.id != id
Object.keys(subscriptions).forEach (routing_key) ->
removeSubscription(id, routing_key)
exports.getBySessionId = (session_id) ->
client = clients.filter (client) -> client.auth.sessionId == session_id
return client[0]
exports.add = (client) ->
clients.push(client)
exports.addSubscription = (id, routing_key) ->
if !subscriptions[routing_key]
subscriptions[routing_key] = []
client = clients.filter (client) -> client.id == id
subscriptions[routing_key].push(client[0])
exports.removeSubscription = removeSubscription
exports.getBySubscription = (subscription) ->
if !subscriptions[subscription]
return []
return subscriptions[subscription]

View File

@ -37,7 +37,7 @@
"value": "unix" "value": "unix"
}, },
"max_line_length": { "max_line_length": {
"value": 80, "value": 120,
"level": "error", "level": "error",
"limitComments": true "limitComments": true
}, },

11
config.json Normal file
View File

@ -0,0 +1,11 @@
{
"url": "amqp://guest:guest@localhost:5672",
"exchange": {
"name": "events",
"options": {}
},
"queue": {
"name": "EventsPushBackend",
"options": {}
}
}

View File

@ -1 +1,55 @@
amqp = require('amqp')
uuid = require('node-uuid')
config = require('./config')
clients = require('./clients')
connection = amqp.createConnection({ url: config.url }, {defaultExchangeName: config.exchange.name})
WebSocketServer = require('ws').Server
wss = new WebSocketServer({ port: 3000 })
connection.on 'ready', () ->
connection.queue config.queue.name, config.queue.options, (q) ->
console.log 'Queue ' + q.name + ' is open'
exc = connection.exchange config.exchange.name, config.exchange.options, (exchange) ->
console.log 'Exchange ' + exchange.name + ' is open'
q.bind(config.exchange.name, '#')
q.subscribe (msg, header, deliveryInfo) ->
msg = JSON.parse(msg.data.toString())
clientMsg = msg.data
clientMsg.routing_key = deliveryInfo.routingKey
senderClient = clients.getBySessionId(msg.session_id)
subscriptions = clients.getBySubscription(deliveryInfo.routingKey)
clientMsgStr = JSON.stringify(clientMsg)
subscriptions.forEach (client) ->
#excude sender client
if !senderClient || client.id != senderClient.id
client.ws.send clientMsgStr
wss.on 'connection', (ws) ->
clientId = uuid.v4()
ws.on 'message', (message) ->
msg = JSON.parse(message)
if msg.cmd == 'auth'
clients.add({
id: clientId,
ws: ws,
auth: msg.data
})
else if msg.cmd == 'subscribe'
clients.addSubscription(clientId, msg.routing_key)
else if msg.cmd == 'unsubscribe'
clients.removeSubscription(clientId, msg.routing_key)
ws.on 'close', (message) ->
clients.removeById(clientId)

View File

@ -22,5 +22,10 @@
"gulp-coffeelint": "^0.4.0", "gulp-coffeelint": "^0.4.0",
"gulp-nodemon": "^1.0.5", "gulp-nodemon": "^1.0.5",
"gulp-plumber": "^0.6.6" "gulp-plumber": "^0.6.6"
},
"dependencies": {
"amqp": "^0.2.3",
"node-uuid": "^1.4.2",
"ws": "^0.7.1"
} }
} }