Make forwarding work back and forth between two servers.

This commit is contained in:
James Coglan
2011-12-22 19:03:50 +00:00
parent ac37e4f8c7
commit b5fd1be6fb
5 changed files with 109 additions and 53 deletions
+1 -42
View File
@@ -1,43 +1,2 @@
var faye = require('faye');
var Cluster = function(endpoint) {
this._endpoint = endpoint;
};
var META = /^\/meta\//,
SERVICE = /^\/service\//;
Cluster.prototype.incoming = function(message, callback) {
callback(message);
var channel = message.channel;
if (META.test(channel) || SERVICE.test(channel)) return;
if (message.ext && message.ext.cluster) return;
this._forward(message);
};
Cluster.prototype.outgoing = function(message, callback) {
if (message.ext) delete message.ext.cluster;
callback(message);
};
Cluster.prototype.connect = function(endpoint) {
this._upstream = new faye.Client(endpoint);
this._upstream.addExtension({
outgoing: function(message, callback) {
if (META.test(message.channel)) return callback(message);
message.channel = message.data.channel;
message.ext = message.data.ext;
message.data = message.data.data;
callback(message);
}
});
};
Cluster.prototype._forward = function(message) {
message.ext = message.ext || {};
message.ext.cluster = {publish: true};
this._upstream.publish('/cluster', message);
};
module.exports = Cluster;
exports.Node = require('./node');
+35
View File
@@ -0,0 +1,35 @@
var faye = require('faye'),
META = /^\/meta\//,
FORWARD = '/faye/cluster/forward';
var Connection = function(node, endpoint) {
this._node = node;
this._endpoint = endpoint;
this._remote = new faye.Client(endpoint);
this._remote.addExtension(this);
var sub = this._remote.subscribe('/faye/cluster/endpoints', function(endpoints) {
this._node.connect(endpoints);
}, this);
sub.callback(function() {
this._remote.publish('/faye/cluster/endpoint', this._node._endpoint);
}, this);
};
Connection.prototype.outgoing = function(message, callback) {
if (message.channel === FORWARD) {
message.channel = message.data.channel;
message.ext = message.data.ext;
message.data = message.data.data;
}
callback(message);
};
Connection.prototype.forward = function(message) {
this._remote.publish(FORWARD, message);
};
module.exports = Connection;
+58
View File
@@ -0,0 +1,58 @@
var faye = require('faye'),
Connection = require('./connection'),
META = /^\/meta\//,
SERVICE = /^\/service\//,
CLUSTER = /^\/faye\/cluster\//;
var Node = function(endpoint) {
this._endpoint = endpoint;
this._connections = {};
};
Node.prototype.added = function(server) {
this._server = server;
this._client = new faye.Client(server);
this._client.subscribe('/faye/cluster/endpoint', function(endpoint) {
this.connect(endpoint);
var remotes = Object.keys(this._connections);
this._client.publish('/faye/cluster/endpoints', remotes);
}, this);
};
Node.prototype.incoming = function(message, callback) {
callback(message);
var channel = message.channel;
if (META.test(channel) || SERVICE.test(channel) || CLUSTER.test(channel)) return;
if (message.ext && message.ext.cluster) return;
this._forward(message);
};
Node.prototype.outgoing = function(message, callback) {
if (message.ext) delete message.ext.cluster;
callback(message);
};
Node.prototype.connect = function(endpoints) {
endpoints = [].concat(endpoints);
var i = endpoints.length,
conns = this._connections;
while (i--) {
if (endpoints[i] !== this._endpoint) {
conns[endpoints[i]] = conns[endpoints[i]] ||
new Connection(this, endpoints[i]);
}
}
};
Node.prototype._forward = function(message) {
message.ext = message.ext || {};
message.ext.cluster = {publish: true};
for (var endpoint in this._connections)
this._connections[endpoint].forward(message);
};
module.exports = Node;
+1 -1
View File
@@ -7,7 +7,7 @@
, "version" : "0.1.0"
, "engines" : {"node": ">=0.4.0"}
, "main" : "./lib/faye/websocket"
, "devDependencies" : {"faye": ">=0.5.0"}
, "devDependencies" : {"faye": ">=0.7.1"}
, "bugs" : "http://github.com/faye/faye-cluster-node/issues"
+14 -10
View File
@@ -1,5 +1,5 @@
var faye = require('faye'),
Cluster = require('./lib/faye/cluster');
cluster = require('./lib/faye/cluster');
var makeServer = function(port) {
var server = new faye.NodeAdapter({mount: '/faye', timeout: 60});
@@ -16,16 +16,15 @@ var sA = makeServer(8000),
sB = makeServer(8001);
// Add the cluster extension to each server
var eA = new Cluster('http://localhost:8000/faye'),
eB = new Cluster('http://localhost:8001/faye');
var eA = new cluster.Node('http://localhost:8000/faye'),
eB = new cluster.Node('http://localhost:8001/faye');
sA.addExtension(eA);
sB.addExtension(eB);
// Connect extension B to server A
// Connect extension A to server B
process.nextTick(function() {
eA.connect('http://localhost:8001/faye');
eB.connect('http://localhost:8000/faye');
});
setTimeout(function() {
@@ -33,11 +32,16 @@ setTimeout(function() {
var cA = makeClient(8000),
cB = makeClient(8001);
// Subscribe to a channel on the first server
var sub = cA.subscribe('/msg', function(m) { console.log('A', m) });
// Make clients subscribe to channels on their respective servers
var subA = cA.subscribe('/msg/a', function(m) { console.log('A', m) }),
subB = cB.subscribe('/msg/b', function(m) { console.log('B', m) });
sub.callback(function() {
// Publish to a channel on the second server
cB.publish('/msg', {hello: 'world'});
subA.callback(function() {
subB.callback(function() {
// Send messages between both clients
cA.publish('/msg/b', {hello: 'world'});
cB.publish('/msg/a', {hello: 'world'});
});
});
}, 100);