Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 86ae4baa3d | |||
| e84e20bba4 | |||
| 4fdafd8877 | |||
| 01b84b338c | |||
| fa4cd75dba | |||
| 66354e556d | |||
| 1f37edc011 | |||
| 865f0e13c7 | |||
| 5330ad8da7 | |||
| 5cc6fc1633 | |||
| f3336fed86 | |||
| 4f685b38e0 | |||
| eb6f67a222 | |||
| 3325d53d31 | |||
| 8c694087ce | |||
| d9c65ddc61 | |||
| 3d4fcec51b | |||
| 4a9b293891 | |||
| 3cb9589712 | |||
| 6722edb41b | |||
| 10a7d59898 | |||
| 002e55ae6c | |||
| 1489e32d1f | |||
| 0b70daed71 | |||
| efaed17004 |
@@ -2,7 +2,6 @@
|
||||
.gitignore
|
||||
.gitmodules
|
||||
.npmignore
|
||||
.redcar
|
||||
.travis.yml
|
||||
dump.rdb
|
||||
Rakefile
|
||||
|
||||
+5
-3
@@ -1,8 +1,10 @@
|
||||
language: node_js
|
||||
|
||||
node_js:
|
||||
- 0.4
|
||||
- 0.6
|
||||
- 0.7
|
||||
- "0.6"
|
||||
- "0.8"
|
||||
- "0.10"
|
||||
- "0.11"
|
||||
|
||||
before_script:
|
||||
- rake prepare
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
=== 0.1.2 / 2013-04-28
|
||||
|
||||
* Improve garbage collection to avoid leaking Redis memory
|
||||
|
||||
|
||||
=== 0.1.1 / 2012-07-15
|
||||
|
||||
* Fix an implicit global variable leak (missing semicolon)
|
||||
|
||||
|
||||
=== 0.1.0 / 2012-02-26
|
||||
|
||||
* Initial release: Redis backend for Faye 0.8
|
||||
|
||||
@@ -1,66 +0,0 @@
|
||||
# faye-redis [](http://travis-ci.org/faye/faye-redis-node)
|
||||
|
||||
This plugin provides a Redis-based backend for the [Faye](http://faye.jcoglan.com)
|
||||
messaging server. It allows a single Faye service to be distributed across many
|
||||
front-end web servers by storing state and routing messages through a
|
||||
[Redis](http://redis.io) database server.
|
||||
|
||||
|
||||
## Usage
|
||||
|
||||
Pass in the engine and any settings you need when setting up your Faye server.
|
||||
|
||||
```js
|
||||
var faye = require('faye'),
|
||||
redis = require('faye-redis'),
|
||||
http = require('http');
|
||||
|
||||
var server = http.createServer();
|
||||
|
||||
var bayeux = new faye.NodeAdapter({
|
||||
mount: '/',
|
||||
timeout: 25,
|
||||
engine: {
|
||||
type: redis,
|
||||
host: 'redis.example.com',
|
||||
// more options
|
||||
}
|
||||
});
|
||||
|
||||
bayeux.attach(server);
|
||||
server.listen(8000);
|
||||
```
|
||||
|
||||
The full list of settings is as follows.
|
||||
|
||||
* <b><tt>host</tt></b> - hostname of your Redis instance
|
||||
* <b><tt>port</tt></b> - port number, default is `6379`
|
||||
* <b><tt>password</tt></b> - password, if `requirepass` is set
|
||||
* <b><tt>database</tt></b> - number of database to use, default is `0`
|
||||
* <b><tt>namespace</tt></b> - prefix applied to all keys, default is `''`
|
||||
* <b><tt>socket</tt></b> - path to Unix socket if `unixsocket` is set
|
||||
|
||||
|
||||
## License
|
||||
|
||||
(The MIT License)
|
||||
|
||||
Copyright (c) 2011-2012 James Coglan
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the 'Software'), to deal in
|
||||
the Software without restriction, including without limitation the rights to use,
|
||||
copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
|
||||
Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
# faye-redis [](http://travis-ci.org/faye/faye-redis-node)
|
||||
|
||||
This plugin provides a Redis-based backend for the
|
||||
[Faye](http://faye.jcoglan.com) messaging server. It allows a single Faye
|
||||
service to be distributed across many front-end web servers by storing state
|
||||
and routing messages through a [Redis](http://redis.io) database server.
|
||||
|
||||
|
||||
## Usage
|
||||
|
||||
Pass in the engine and any settings you need when setting up your Faye server.
|
||||
|
||||
```js
|
||||
var faye = require('faye'),
|
||||
redis = require('faye-redis'),
|
||||
http = require('http');
|
||||
|
||||
var server = http.createServer();
|
||||
|
||||
var bayeux = new faye.NodeAdapter({
|
||||
mount: '/',
|
||||
timeout: 25,
|
||||
engine: {
|
||||
type: redis,
|
||||
host: 'redis.example.com',
|
||||
// more options
|
||||
}
|
||||
});
|
||||
|
||||
bayeux.attach(server);
|
||||
server.listen(8000);
|
||||
```
|
||||
|
||||
The full list of settings is as follows.
|
||||
|
||||
* <b>`host`</b> - hostname of your Redis instance
|
||||
* <b>`port`</b> - port number, default is `6379`
|
||||
* <b>`password`</b> - password, if `requirepass` is set
|
||||
* <b>`database`</b> - number of database to use, default is `0`
|
||||
* <b>`namespace`</b> - prefix applied to all keys, default is `''`
|
||||
* <b>`socket`</b> - path to Unix socket if `unixsocket` is set
|
||||
|
||||
|
||||
## License
|
||||
|
||||
(The MIT License)
|
||||
|
||||
Copyright (c) 2011-2013 James Coglan
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the 'Software'), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
|
||||
of the Software, and to permit persons to whom the Software is furnished to do
|
||||
so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
||||
+78
-64
@@ -9,9 +9,9 @@ var Engine = function(server, options) {
|
||||
auth = this._options.password,
|
||||
gc = this._options.gc || this.DEFAULT_GC,
|
||||
socket = this._options.socket;
|
||||
|
||||
|
||||
this._ns = this._options.namespace || '';
|
||||
|
||||
|
||||
if (socket) {
|
||||
this._redis = redis.createClient(socket, {no_ready_check: true});
|
||||
this._subscriber = redis.createClient(socket, {no_ready_check: true});
|
||||
@@ -19,20 +19,20 @@ var Engine = function(server, options) {
|
||||
this._redis = redis.createClient(port, host, {no_ready_check: true});
|
||||
this._subscriber = redis.createClient(port, host, {no_ready_check: true});
|
||||
}
|
||||
|
||||
|
||||
if (auth) {
|
||||
this._redis.auth(auth);
|
||||
this._subscriber.auth(auth);
|
||||
}
|
||||
this._redis.select(db);
|
||||
this._subscriber.select(db);
|
||||
|
||||
|
||||
var self = this;
|
||||
this._subscriber.subscribe(this._ns + '/notifications');
|
||||
this._subscriber.on('message', function(topic, message) {
|
||||
self.emptyQueue(message);
|
||||
});
|
||||
|
||||
|
||||
this._gc = setInterval(function() { self.gc() }, gc * 1000);
|
||||
};
|
||||
|
||||
@@ -46,134 +46,148 @@ Engine.prototype = {
|
||||
DEFAULT_DATABASE: 0,
|
||||
DEFAULT_GC: 60,
|
||||
LOCK_TIMEOUT: 120,
|
||||
|
||||
|
||||
disconnect: function() {
|
||||
this._redis.end();
|
||||
this._subscriber.unsubscribe();
|
||||
this._subscriber.end();
|
||||
clearInterval(this._gc);
|
||||
},
|
||||
|
||||
createClient: function(callback, scope) {
|
||||
|
||||
createClient: function(callback, context) {
|
||||
var clientId = this._server.generateId(), self = this;
|
||||
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
|
||||
if (added === 0) return self.createClient(callback, scope);
|
||||
if (added === 0) return self.createClient(callback, context);
|
||||
self._server.debug('Created new client ?', clientId);
|
||||
self.ping(clientId);
|
||||
self._server.trigger('handshake', clientId);
|
||||
callback.call(scope, clientId);
|
||||
callback.call(context, clientId);
|
||||
});
|
||||
},
|
||||
|
||||
destroyClient: function(clientId, callback, scope) {
|
||||
|
||||
clientExists: function(clientId, callback, context) {
|
||||
var cutoff = new Date().getTime() - (1000 * 1.6 * this._server.timeout);
|
||||
|
||||
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
|
||||
callback.call(context, parseInt(score, 10) > cutoff);
|
||||
});
|
||||
},
|
||||
|
||||
destroyClient: function(clientId, callback, context) {
|
||||
var self = this;
|
||||
this._redis.zrem(this._ns + '/clients', clientId);
|
||||
this._redis.del(this._ns + '/clients/' + clientId + '/messages');
|
||||
|
||||
this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
|
||||
var n = channels.length, i = 0;
|
||||
if (i === n) return self.afterDestroy(clientId, callback, scope);
|
||||
|
||||
channels.forEach(function(channel) {
|
||||
self.unsubscribe(clientId, channel, function() {
|
||||
i += 1;
|
||||
if (i === n) self.afterDestroy(clientId, callback, scope);
|
||||
|
||||
this._redis.zadd(this._ns + '/clients', 0, clientId, function() {
|
||||
self._redis.smembers(self._ns + '/clients/' + clientId + '/channels', function(error, channels) {
|
||||
var n = channels.length, i = 0;
|
||||
if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);
|
||||
|
||||
channels.forEach(function(channel) {
|
||||
self.unsubscribe(clientId, channel, function() {
|
||||
i += 1;
|
||||
if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
afterDestroy: function(clientId, callback, scope) {
|
||||
this._server.debug('Destroyed client ?', clientId);
|
||||
this._server.trigger('disconnect', clientId);
|
||||
if (callback) callback.call(scope);
|
||||
},
|
||||
|
||||
clientExists: function(clientId, callback, scope) {
|
||||
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
|
||||
callback.call(scope, score !== null);
|
||||
|
||||
_afterSubscriptionsRemoved: function(clientId, callback, context) {
|
||||
var self = this;
|
||||
this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() {
|
||||
self._redis.zrem(self._ns + '/clients', clientId, function() {
|
||||
self._server.debug('Destroyed client ?', clientId);
|
||||
self._server.trigger('disconnect', clientId);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
|
||||
ping: function(clientId) {
|
||||
var timeout = this._server.timeout;
|
||||
if (typeof timeout !== 'number') return;
|
||||
|
||||
|
||||
var time = new Date().getTime();
|
||||
|
||||
|
||||
this._server.debug('Ping ?, ?', clientId, time);
|
||||
this._redis.zadd(this._ns + '/clients', time, clientId);
|
||||
},
|
||||
|
||||
subscribe: function(clientId, channel, callback, scope) {
|
||||
|
||||
subscribe: function(clientId, channel, callback, context) {
|
||||
var self = this;
|
||||
this._redis.sadd(this._ns + '/clients/' + clientId + '/channels', channel, function(error, added) {
|
||||
if (added === 1) self._server.trigger('subscribe', clientId, channel);
|
||||
});
|
||||
this._redis.sadd(this._ns + '/channels' + channel, clientId, function() {
|
||||
self._server.debug('Subscribed client ? to channel ?', clientId, channel);
|
||||
if (callback) callback.call(scope);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
},
|
||||
|
||||
unsubscribe: function(clientId, channel, callback, scope) {
|
||||
|
||||
unsubscribe: function(clientId, channel, callback, context) {
|
||||
var self = this;
|
||||
this._redis.srem(this._ns + '/clients/' + clientId + '/channels', channel, function(error, removed) {
|
||||
if (removed === 1) self._server.trigger('unsubscribe', clientId, channel);
|
||||
});
|
||||
this._redis.srem(this._ns + '/channels' + channel, clientId, function() {
|
||||
self._server.debug('Unsubscribed client ? from channel ?', clientId, channel);
|
||||
if (callback) callback.call(scope);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
},
|
||||
|
||||
|
||||
publish: function(message, channels) {
|
||||
this._server.debug('Publishing message ?', message);
|
||||
|
||||
|
||||
var self = this,
|
||||
jsonMessage = JSON.stringify(message),
|
||||
keys = channels.map(function(c) { return self._ns + '/channels' + c });
|
||||
|
||||
|
||||
var notify = function(error, clients) {
|
||||
clients.forEach(function(clientId) {
|
||||
var queue = self._ns + '/clients/' + clientId + '/messages';
|
||||
|
||||
self._server.debug('Queueing for client ?: ?', clientId, message);
|
||||
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
|
||||
self._redis.rpush(queue, jsonMessage);
|
||||
self._redis.publish(self._ns + '/notifications', clientId);
|
||||
|
||||
self.clientExists(clientId, function(exists) {
|
||||
if (!exists) self._redis.del(queue);
|
||||
});
|
||||
});
|
||||
};
|
||||
keys.push(notify);
|
||||
this._redis.sunion.apply(this._redis, keys);
|
||||
|
||||
|
||||
this._server.trigger('publish', message.clientId, message.channel, message.data);
|
||||
},
|
||||
|
||||
|
||||
emptyQueue: function(clientId) {
|
||||
if (!this._server.hasConnection(clientId)) return;
|
||||
|
||||
|
||||
var key = this._ns + '/clients/' + clientId + '/messages',
|
||||
multi = this._redis.multi(),
|
||||
self = this;
|
||||
|
||||
|
||||
multi.lrange(key, 0, -1, function(error, jsonMessages) {
|
||||
if (!jsonMessages) return;
|
||||
var messages = jsonMessages.map(function(json) { return JSON.parse(json) });
|
||||
self._server.deliver(clientId, messages);
|
||||
});
|
||||
multi.del(key);
|
||||
multi.exec();
|
||||
},
|
||||
|
||||
|
||||
gc: function() {
|
||||
var timeout = this._server.timeout;
|
||||
if (typeof timeout !== 'number') return;
|
||||
|
||||
|
||||
this._withLock('gc', function(releaseLock) {
|
||||
var cutoff = new Date().getTime() - 1000 * 2 * timeout,
|
||||
self = this;
|
||||
|
||||
|
||||
this._redis.zrangebyscore(this._ns + '/clients', 0, cutoff, function(error, clients) {
|
||||
var i = 0, n = clients.length;
|
||||
if (i === n) return releaseLock();
|
||||
|
||||
|
||||
clients.forEach(function(clientId) {
|
||||
this.destroyClient(clientId, function() {
|
||||
i += 1;
|
||||
@@ -183,29 +197,29 @@ Engine.prototype = {
|
||||
});
|
||||
}, this);
|
||||
},
|
||||
|
||||
_withLock: function(lockName, callback, scope) {
|
||||
|
||||
_withLock: function(lockName, callback, context) {
|
||||
var lockKey = this._ns + '/locks/' + lockName,
|
||||
currentTime = new Date().getTime(),
|
||||
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1;
|
||||
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
|
||||
self = this;
|
||||
|
||||
|
||||
var releaseLock = function() {
|
||||
if (new Date().getTime() < expiry) self._redis.del(lockKey);
|
||||
};
|
||||
|
||||
|
||||
this._redis.setnx(lockKey, expiry, function(error, set) {
|
||||
if (set === 1) return callback.call(scope, releaseLock);
|
||||
|
||||
if (set === 1) return callback.call(context, releaseLock);
|
||||
|
||||
self._redis.get(lockKey, function(error, timeout) {
|
||||
if (!timeout) return;
|
||||
|
||||
|
||||
var lockTimeout = parseInt(timeout, 10);
|
||||
if (currentTime < lockTimeout) return;
|
||||
|
||||
|
||||
self._redis.getset(lockKey, expiry, function(error, oldValue) {
|
||||
if (oldValue !== timeout) return;
|
||||
callback.call(scope, releaseLock);
|
||||
callback.call(context, releaseLock);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
+6
-6
@@ -4,23 +4,23 @@
|
||||
, "author" : "James Coglan <jcoglan@gmail.com> (http://jcoglan.com/)"
|
||||
, "keywords" : ["pubsub", "bayeux"]
|
||||
|
||||
, "version" : "0.1.0"
|
||||
, "version" : "0.1.3"
|
||||
, "engines" : {"node": ">=0.4.0"}
|
||||
, "main" : "./faye-redis"
|
||||
, "dependencies" : {"hiredis": "", "redis": ""}
|
||||
, "dependencies" : {"redis": ""}
|
||||
, "devDependencies" : {"jsclass": ""}
|
||||
|
||||
, "scripts" : {"test": "node spec/runner.js"}
|
||||
|
||||
, "bugs" : "http://github.com/faye/faye-redis-node/issues"
|
||||
|
||||
, "licenses" : [ { "type" : "MIT"
|
||||
, "url" : "http://www.opensource.org/licenses/mit-license.php"
|
||||
, "licenses" : [ { "type" : "MIT"
|
||||
, "url" : "http://www.opensource.org/licenses/mit-license.php"
|
||||
}
|
||||
]
|
||||
|
||||
, "repositories" : [ { "type" : "git"
|
||||
, "url" : "git://github.com/faye/faye-redis-node.git"
|
||||
, "repositories" : [ { "type" : "git"
|
||||
, "url" : "git://github.com/faye/faye-redis-node.git"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this)
|
||||
var pw = process.env.TRAVIS ? undefined : "foobared"
|
||||
this.engineOpts = {type: RedisEngine, password: pw, namespace: new Date().getTime().toString()}
|
||||
})
|
||||
|
||||
|
||||
after(function(resume) { with(this) {
|
||||
sync(function() {
|
||||
engine.disconnect()
|
||||
@@ -17,20 +17,20 @@ JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this)
|
||||
})
|
||||
})
|
||||
}})
|
||||
|
||||
|
||||
itShouldBehaveLike("faye engine")
|
||||
|
||||
|
||||
describe("distribution", function() { with(this) {
|
||||
itShouldBehaveLike("distributed engine")
|
||||
}})
|
||||
|
||||
|
||||
if (process.env.TRAVIS) return
|
||||
|
||||
|
||||
describe("using a Unix socket", function() { with(this) {
|
||||
before(function() { with(this) {
|
||||
this.engineOpts.socket = "/tmp/redis.sock"
|
||||
}})
|
||||
|
||||
|
||||
itShouldBehaveLike("faye engine")
|
||||
}})
|
||||
}})
|
||||
|
||||
+1
-1
@@ -1,7 +1,7 @@
|
||||
require('jsclass')
|
||||
JS.require('JS.Range', 'JS.Test')
|
||||
|
||||
Faye = require('../vendor/faye/build/faye-node')
|
||||
Faye = require('../vendor/faye/build/node/faye-node')
|
||||
require('../vendor/faye/spec/javascript/engine_spec')
|
||||
require('./faye_redis_spec')
|
||||
|
||||
|
||||
Vendored
+1
-1
Submodule vendor/faye updated: 6ebaa51670...c7fac489b1
Reference in New Issue
Block a user