diff --git a/lib/faye/cluster.js b/lib/faye/cluster.js index d82f5cc..ac99fa4 100644 --- a/lib/faye/cluster.js +++ b/lib/faye/cluster.js @@ -1,43 +1,2 @@ -var faye = require('faye'); - -var Cluster = function(endpoint) { - this._endpoint = endpoint; -}; - -var META = /^\/meta\//, - SERVICE = /^\/service\//; - -Cluster.prototype.incoming = function(message, callback) { - callback(message); - var channel = message.channel; - if (META.test(channel) || SERVICE.test(channel)) return; - if (message.ext && message.ext.cluster) return; - this._forward(message); -}; - -Cluster.prototype.outgoing = function(message, callback) { - if (message.ext) delete message.ext.cluster; - callback(message); -}; - -Cluster.prototype.connect = function(endpoint) { - this._upstream = new faye.Client(endpoint); - this._upstream.addExtension({ - outgoing: function(message, callback) { - if (META.test(message.channel)) return callback(message); - message.channel = message.data.channel; - message.ext = message.data.ext; - message.data = message.data.data; - callback(message); - } - }); -}; - -Cluster.prototype._forward = function(message) { - message.ext = message.ext || {}; - message.ext.cluster = {publish: true}; - this._upstream.publish('/cluster', message); -}; - -module.exports = Cluster; +exports.Node = require('./node'); diff --git a/lib/faye/connection.js b/lib/faye/connection.js new file mode 100644 index 0000000..3da3dc8 --- /dev/null +++ b/lib/faye/connection.js @@ -0,0 +1,35 @@ +var faye = require('faye'), + META = /^\/meta\//, + FORWARD = '/faye/cluster/forward'; + +var Connection = function(node, endpoint) { + this._node = node; + this._endpoint = endpoint; + this._remote = new faye.Client(endpoint); + + this._remote.addExtension(this); + + var sub = this._remote.subscribe('/faye/cluster/endpoints', function(endpoints) { + this._node.connect(endpoints); + }, this); + + sub.callback(function() { + this._remote.publish('/faye/cluster/endpoint', this._node._endpoint); + }, this); +}; + +Connection.prototype.outgoing = function(message, callback) { + if (message.channel === FORWARD) { + message.channel = message.data.channel; + message.ext = message.data.ext; + message.data = message.data.data; + } + callback(message); +}; + +Connection.prototype.forward = function(message) { + this._remote.publish(FORWARD, message); +}; + +module.exports = Connection; + diff --git a/lib/faye/node.js b/lib/faye/node.js new file mode 100644 index 0000000..abbe2cc --- /dev/null +++ b/lib/faye/node.js @@ -0,0 +1,58 @@ +var faye = require('faye'), + Connection = require('./connection'), + META = /^\/meta\//, + SERVICE = /^\/service\//, + CLUSTER = /^\/faye\/cluster\//; + +var Node = function(endpoint) { + this._endpoint = endpoint; + this._connections = {}; +}; + +Node.prototype.added = function(server) { + this._server = server; + this._client = new faye.Client(server); + + this._client.subscribe('/faye/cluster/endpoint', function(endpoint) { + this.connect(endpoint); + var remotes = Object.keys(this._connections); + this._client.publish('/faye/cluster/endpoints', remotes); + }, this); +}; + +Node.prototype.incoming = function(message, callback) { + callback(message); + var channel = message.channel; + if (META.test(channel) || SERVICE.test(channel) || CLUSTER.test(channel)) return; + if (message.ext && message.ext.cluster) return; + this._forward(message); +}; + +Node.prototype.outgoing = function(message, callback) { + if (message.ext) delete message.ext.cluster; + callback(message); +}; + +Node.prototype.connect = function(endpoints) { + endpoints = [].concat(endpoints); + + var i = endpoints.length, + conns = this._connections; + + while (i--) { + if (endpoints[i] !== this._endpoint) { + conns[endpoints[i]] = conns[endpoints[i]] || + new Connection(this, endpoints[i]); + } + } +}; + +Node.prototype._forward = function(message) { + message.ext = message.ext || {}; + message.ext.cluster = {publish: true}; + for (var endpoint in this._connections) + this._connections[endpoint].forward(message); +}; + +module.exports = Node; + diff --git a/package.json b/package.json index 360f42f..543b116 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ , "version" : "0.1.0" , "engines" : {"node": ">=0.4.0"} , "main" : "./lib/faye/websocket" -, "devDependencies" : {"faye": ">=0.5.0"} +, "devDependencies" : {"faye": ">=0.7.1"} , "bugs" : "http://github.com/faye/faye-cluster-node/issues" diff --git a/test.js b/test.js index 310d604..b58ec8b 100644 --- a/test.js +++ b/test.js @@ -1,5 +1,5 @@ var faye = require('faye'), - Cluster = require('./lib/faye/cluster'); + cluster = require('./lib/faye/cluster'); var makeServer = function(port) { var server = new faye.NodeAdapter({mount: '/faye', timeout: 60}); @@ -16,16 +16,15 @@ var sA = makeServer(8000), sB = makeServer(8001); // Add the cluster extension to each server -var eA = new Cluster('http://localhost:8000/faye'), - eB = new Cluster('http://localhost:8001/faye'); +var eA = new cluster.Node('http://localhost:8000/faye'), + eB = new cluster.Node('http://localhost:8001/faye'); sA.addExtension(eA); sB.addExtension(eB); -// Connect extension B to server A +// Connect extension A to server B process.nextTick(function() { eA.connect('http://localhost:8001/faye'); - eB.connect('http://localhost:8000/faye'); }); setTimeout(function() { @@ -33,11 +32,16 @@ setTimeout(function() { var cA = makeClient(8000), cB = makeClient(8001); - // Subscribe to a channel on the first server - var sub = cA.subscribe('/msg', function(m) { console.log('A', m) }); + // Make clients subscribe to channels on their respective servers + var subA = cA.subscribe('/msg/a', function(m) { console.log('A', m) }), + subB = cB.subscribe('/msg/b', function(m) { console.log('B', m) }); - sub.callback(function() { - // Publish to a channel on the second server - cB.publish('/msg', {hello: 'world'}); + subA.callback(function() { + subB.callback(function() { + // Send messages between both clients + cA.publish('/msg/b', {hello: 'world'}); + cB.publish('/msg/a', {hello: 'world'}); + }); }); }, 100); +