diff --git a/clients.coffee b/clients.coffee new file mode 100644 index 0000000..7e0890c --- /dev/null +++ b/clients.coffee @@ -0,0 +1,36 @@ +clients = [] +subscriptions = {} + +removeSubscription = (id, routing_key) -> + if subscriptions[routing_key] + subscriptions[routing_key] = subscriptions[routing_key].filter (client) -> client.id != id + +exports.removeById = (id) -> + clients = clients.filter (client) -> client.id != id + + Object.keys(subscriptions).forEach (routing_key) -> + removeSubscription(id, routing_key) + +exports.getBySessionId = (session_id) -> + client = clients.filter (client) -> client.auth.sessionId == session_id + + return client[0] + +exports.add = (client) -> + clients.push(client) + +exports.addSubscription = (id, routing_key) -> + if !subscriptions[routing_key] + subscriptions[routing_key] = [] + + client = clients.filter (client) -> client.id == id + + subscriptions[routing_key].push(client[0]) + +exports.removeSubscription = removeSubscription + +exports.getBySubscription = (subscription) -> + if !subscriptions[subscription] + return [] + + return subscriptions[subscription] diff --git a/coffeelint.json b/coffeelint.json index ebb124a..a9e2233 100644 --- a/coffeelint.json +++ b/coffeelint.json @@ -37,7 +37,7 @@ "value": "unix" }, "max_line_length": { - "value": 80, + "value": 120, "level": "error", "limitComments": true }, diff --git a/config.json b/config.json new file mode 100644 index 0000000..7909b00 --- /dev/null +++ b/config.json @@ -0,0 +1,11 @@ +{ + "url": "amqp://guest:guest@localhost:5672", + "exchange": { + "name": "events", + "options": {} + }, + "queue": { + "name": "EventsPushBackend", + "options": {} + } +} diff --git a/index.coffee b/index.coffee index 8b13789..0f59b74 100644 --- a/index.coffee +++ b/index.coffee @@ -1 +1,55 @@ +amqp = require('amqp') +uuid = require('node-uuid') +config = require('./config') +clients = require('./clients') + +connection = amqp.createConnection({ url: config.url }, {defaultExchangeName: config.exchange.name}) + +WebSocketServer = require('ws').Server +wss = new WebSocketServer({ port: 3000 }) + +connection.on 'ready', () -> + connection.queue config.queue.name, config.queue.options, (q) -> + console.log 'Queue ' + q.name + ' is open' + + exc = connection.exchange config.exchange.name, config.exchange.options, (exchange) -> + console.log 'Exchange ' + exchange.name + ' is open' + + q.bind(config.exchange.name, '#') + + q.subscribe (msg, header, deliveryInfo) -> + msg = JSON.parse(msg.data.toString()) + + clientMsg = msg.data + clientMsg.routing_key = deliveryInfo.routingKey + + senderClient = clients.getBySessionId(msg.session_id) + subscriptions = clients.getBySubscription(deliveryInfo.routingKey) + + clientMsgStr = JSON.stringify(clientMsg) + + subscriptions.forEach (client) -> + #excude sender client + if !senderClient || client.id != senderClient.id + client.ws.send clientMsgStr + +wss.on 'connection', (ws) -> + clientId = uuid.v4() + + ws.on 'message', (message) -> + msg = JSON.parse(message) + + if msg.cmd == 'auth' + clients.add({ + id: clientId, + ws: ws, + auth: msg.data + }) + else if msg.cmd == 'subscribe' + clients.addSubscription(clientId, msg.routing_key) + else if msg.cmd == 'unsubscribe' + clients.removeSubscription(clientId, msg.routing_key) + + ws.on 'close', (message) -> + clients.removeById(clientId) diff --git a/package.json b/package.json index 03d7657..370f512 100644 --- a/package.json +++ b/package.json @@ -22,5 +22,10 @@ "gulp-coffeelint": "^0.4.0", "gulp-nodemon": "^1.0.5", "gulp-plumber": "^0.6.6" + }, + "dependencies": { + "amqp": "^0.2.3", + "node-uuid": "^1.4.2", + "ws": "^0.7.1" } }