new taiga events

- one queue per subription
- one channel per websocket connection
- one consumer per subscription
master
Juanfran 2015-02-19 15:20:10 +01:00
parent 9bf956644f
commit cc149d4025
8 changed files with 248 additions and 95 deletions

50
client.coffee Normal file
View File

@ -0,0 +1,50 @@
uuid = require('node-uuid')
signing = require('./signing')
SubscriptionManager = require('./subscription').SubscriptionManager
clients = {}
class Client
constructor: (@ws) ->
@id = uuid.v4()
@handleEvents()
handleEvents: () ->
@ws.on 'message', @handleMessage.bind(@)
handleMessage: (message) ->
msg = JSON.parse(message)
if msg.cmd == 'auth'
@auth(msg.data)
else if msg.cmd == 'subscribe'
@addSubscription(msg.routing_key)
else if msg.cmd == 'unsubscribe'
@removeSubscription(msg.routing_key)
auth: (auth) ->
if auth.token and auth.sessionId and signing.verify(auth.token)
@auth = auth
addSubscription: (routing_key) ->
if @auth
if !@subscriptionManager
@subscriptionManager = new SubscriptionManager(@id, @auth, @ws)
@subscriptionManager.add(routing_key)
close: () ->
if @subscriptionManager
@subscriptionManager.destroy()
removeSubscription: (routing_key) ->
if @subscriptionManager
@subscriptionManager.remove(routing_key)
exports.createClient = (ws) ->
client = new Client(ws)
clients[client.id] = client
client.ws.on 'close', (() ->
@.close()
delete clients[@id]
).bind(client)

View File

@ -1,36 +0,0 @@
clients = []
subscriptions = {}
removeSubscription = (id, routing_key) ->
if subscriptions[routing_key]
subscriptions[routing_key] = subscriptions[routing_key].filter (client) -> client.id != id
exports.removeById = (id) ->
clients = clients.filter (client) -> client.id != id
Object.keys(subscriptions).forEach (routing_key) ->
removeSubscription(id, routing_key)
exports.getBySessionId = (session_id) ->
client = clients.filter (client) -> client.auth.sessionId == session_id
return client[0]
exports.add = (client) ->
clients.push(client)
exports.addSubscription = (id, routing_key) ->
if !subscriptions[routing_key]
subscriptions[routing_key] = []
client = clients.filter (client) -> client.id == id
subscriptions[routing_key].push(client[0])
exports.removeSubscription = removeSubscription
exports.getBySubscription = (subscription) ->
if !subscriptions[subscription]
return []
return subscriptions[subscription]

View File

@ -1,14 +1,7 @@
{ {
"url": "amqp://guest:guest@localhost:5672", "url": "amqp://guest:guest@localhost:5672",
"exchange": { "secret": "mysecret",
"name": "events",
"options": {}
},
"queue": {
"name": "EventsPushBackend",
"options": {}
},
"webSocketServer": { "webSocketServer": {
"port": 3000 "port": 8888
} }
} }

View File

@ -1,55 +1,8 @@
amqp = require('amqp')
uuid = require('node-uuid')
config = require('./config') config = require('./config')
clients = require('./clients') client = require('./client')
connection = amqp.createConnection({ url: config.url }, {defaultExchangeName: config.exchange.name})
WebSocketServer = require('ws').Server WebSocketServer = require('ws').Server
wss = new WebSocketServer(config.webSocketServer) wss = new WebSocketServer(config.webSocketServer)
connection.on 'ready', () ->
connection.queue config.queue.name, config.queue.options, (q) ->
console.log 'Queue ' + q.name + ' is open'
exc = connection.exchange config.exchange.name, config.exchange.options, (exchange) ->
console.log 'Exchange ' + exchange.name + ' is open'
q.bind(config.exchange.name, '#')
q.subscribe (msg, header, deliveryInfo) ->
msg = JSON.parse(msg.data.toString())
clientMsg = msg.data
clientMsg.routing_key = deliveryInfo.routingKey
senderClient = clients.getBySessionId(msg.session_id)
subscriptions = clients.getBySubscription(deliveryInfo.routingKey)
clientMsgStr = JSON.stringify(clientMsg)
subscriptions.forEach (client) ->
#exclude sender client
if !senderClient || client.id != senderClient.id
client.ws.send clientMsgStr
wss.on 'connection', (ws) -> wss.on 'connection', (ws) ->
clientId = uuid.v4() client.createClient(ws)
ws.on 'message', (message) ->
msg = JSON.parse(message)
if msg.cmd == 'auth'
clients.add({
id: clientId,
ws: ws,
auth: msg.data
})
else if msg.cmd == 'subscribe'
clients.addSubscription(clientId, msg.routing_key)
else if msg.cmd == 'unsubscribe'
clients.removeSubscription(clientId, msg.routing_key)
ws.on 'close', (message) ->
clients.removeById(clientId)

View File

@ -24,7 +24,9 @@
"gulp-plumber": "^0.6.6" "gulp-plumber": "^0.6.6"
}, },
"dependencies": { "dependencies": {
"amqp": "^0.2.3", "amqplib": "^0.3.1",
"base64-url": "^1.2.1",
"bluebird": "^2.9.10",
"node-uuid": "^1.4.2", "node-uuid": "^1.4.2",
"ws": "^0.7.1" "ws": "^0.7.1"
} }

115
rabbit.coffee Normal file
View File

@ -0,0 +1,115 @@
amqp = require('amqplib')
Promise = require('bluebird')
amqpUrl = require('./config').url
config = {
"exchange": {
"name": "events",
"type": "topic",
"options": {
"durable": false,
"autoDelete": true
}
},
"queue": {
"name": ""
"options": {
"autoDelete": true,
"exclusive": true
}
},
"channel": {
noAck: true
}
}
# Return the connection, creates the connection if it does not exist.
getConnection = do ->
connection = null
return () ->
return new Promise (resolve, reject) ->
if (!connection)
amqp.connect(amqpUrl).then (conn) ->
connection = conn
resolve(connection)
else
resolve(connection)
# Return the user channel
channels = do ->
chs = {}
removeClient = (client_id) ->
get(client_id).then (channel) ->
channel.close()
delete chs[client_id]
get = (client_id) ->
return new Promise (resolve, reject) ->
if !chs[client_id]
getConnection()
.then (connection) -> connection.createChannel()
.then (channel) ->
chs[client_id] = channel
return resolve(chs[client_id])
else
resolve(chs[client_id])
return {
removeClient: removeClient
get: get
}
# Return a new queue
queues = do ->
getExchange = (channel) ->
return channel.assertExchange(config.exchange.name, config.exchange.type, config.exchange.options)
getQueue = (channel, exchange) ->
return channel.assertQueue(config.queue.name, config.queue.options).then (qok) -> qok.queue
return {
create: (channel, client_id, routing_key) ->
return getExchange(channel)
.then (exchange) -> getQueue(channel)
}
subscriptions = do ->
subs = {}
bindAndSubscribe = (channel, queue, routing_key, cb) ->
channel.bindQueue(queue, config.exchange.name, routing_key)
return channel.consume(queue, cb, {noAck: true})
registerSubscription = (client_id, routing_key, consumerTag) ->
subs[client_id] = subs[client_id] || {}
subs[client_id][routing_key] = consumerTag
subscribe = (client_id, routing_key, cb) ->
channels.get(client_id)
.then (channel) ->
queues.create(channel)
.then (queue) -> bindAndSubscribe(channel, queue, routing_key, cb)
.then (ok) -> registerSubscription(client_id, routing_key, ok.consumerTag)
unsubscribe = (client_id, routing_key) ->
channels.get(client_id).then (channel) ->
channel.cancel(subs[client_id][routing_key])
removeClient = (client_id) ->
delete subs[client_id]
return {
subscribe: subscribe
unsubscribe: unsubscribe
removeClient: removeClient
}
exports.destroy = (client_id) ->
subscriptions.removeClient(client_id)
channels.removeClient(client_id)
exports.subscribe = subscriptions.subscribe
exports.unsubscribe = subscriptions.unsubscribe

30
signing.coffee Normal file
View File

@ -0,0 +1,30 @@
crypto = require('crypto')
base64url = require('base64-url')
config = require('./config')
salt = 'django.core.signing'
rsplit = (token, sep, maxsplit) ->
split = token.split(sep)
if maxsplit
return [ split.slice(0, -maxsplit).join(sep) ].concat(split.slice(-maxsplit))
return split
exports.verify = (token) ->
[value, sig] = rsplit(token, ':', 1)
shasum = crypto.createHash('sha1')
shasum.update(salt + 'signer' + config.secret)
hmacKey = shasum.digest()
hmac = crypto.createHmac('sha1', hmacKey)
hmac.setEncoding('base64')
hmac.update(value)
key = base64url.escape(hmac.digest('base64'))
return key == sig

46
subscription.coffee Normal file
View File

@ -0,0 +1,46 @@
queue = require('./rabbit')
class Subscription
constructor: (@client_id, @auth, @ws, @routing_key) ->
handleMessage: (msg) ->
content = JSON.parse(msg.content.toString())
if content.session_id == @auth.sessionId
return
clientMsg = content
clientMsg.routing_key = msg.fields.routingKey
clientMsgStr = JSON.stringify(clientMsg)
@ws.send clientMsgStr
start: () ->
queue.subscribe(@client_id, @routing_key, @handleMessage.bind(@))
stop: () ->
queue.unsubscribe(@client_id, @routing_key)
class SubscriptionManager
constructor: (@client_id, @auth, @ws) ->
@subscriptions = {}
add: (routing_key) ->
if !@subscriptions[routing_key]
@subscriptions[routing_key] = {}
else
@subscriptions[routing_key].stop()
@subscriptions[routing_key] = new Subscription(@client_id, @auth, @ws, routing_key)
@subscriptions[routing_key].start()
remove: (routing_key) ->
@subscriptions[routing_key].stop()
delete @subscriptions[routing_key]
destroy: () ->
@subscriptions = {}
queue.destroy(@client_id)
exports.SubscriptionManager = SubscriptionManager