20 Commits

Author SHA1 Message Date
James Coglan 86ae4baa3d Prepare for release 0.1.3. 2013-05-11 15:13:55 +01:00
James Coglan e84e20bba4 Test on Node 0.11. 2013-05-11 14:59:52 +01:00
James Coglan 4fdafd8877 Merge pull request #8 from bitium/self_vs_this
self vs this fix for inside a callback
2013-05-11 06:55:54 -07:00
Andrew Arrow 01b84b338c self vs this fix for inside a callback 2013-05-06 12:51:50 -07:00
James Coglan fa4cd75dba Bump copyright date. 2013-04-28 12:27:30 +01:00
James Coglan 66354e556d Bump version to 0.1.2. 2013-04-28 12:13:33 +01:00
James Coglan 1f37edc011 Treat the result of ZSCORE as an int. 2013-04-28 12:00:05 +01:00
James Coglan 865f0e13c7 Don't duplicate code to construct the queue key. 2013-04-28 11:55:27 +01:00
James Coglan 5330ad8da7 Declare Node versions properly in .travis.yml. 2013-04-28 11:51:34 +01:00
James Coglan 5cc6fc1633 Apply some of @jpignata's patches to reduce leaked memory. 2013-04-28 11:49:39 +01:00
James Coglan f3336fed86 Chech the output of LRANGE before calling map() on it. 2013-04-20 23:08:47 +01:00
James Coglan 4f685b38e0 Remove trailing whitespace. 2013-04-11 23:33:11 +01:00
James Coglan eb6f67a222 Test on Node 0.10. 2013-04-11 23:30:19 +01:00
James Coglan 3325d53d31 Bump Faye module to 0.8.9. 2013-04-11 23:29:59 +01:00
James Coglan 8c694087ce Bump Faye submodule, remove hiredis dependency, test on Node 0.9 on Travis. 2012-12-22 20:50:14 +00:00
James Coglan d9c65ddc61 Don't test on Node 0.9, hiredis does not build there. 2012-09-14 07:53:07 +01:00
James Coglan 3d4fcec51b Fix path to faye library. 2012-09-14 07:48:30 +01:00
James Coglan 4a9b293891 Change the order of operations in destroyClient() so that the ID is not unregistered until its subscriptions and message queue have been deleted. 2012-09-14 07:45:11 +01:00
James Coglan 3cb9589712 Update Node versions in .travis.yml. 2012-09-14 07:36:03 +01:00
James Coglan 6722edb41b Bump Faye submodule to 0.8.3. 2012-09-14 07:35:27 +01:00
10 changed files with 164 additions and 143 deletions
-1
View File
@@ -2,7 +2,6 @@
.gitignore
.gitmodules
.npmignore
.redcar
.travis.yml
dump.rdb
Rakefile
+5 -3
View File
@@ -1,8 +1,10 @@
language: node_js
node_js:
- 0.4
- 0.6
- 0.8
- "0.6"
- "0.8"
- "0.10"
- "0.11"
before_script:
- rake prepare
+5
View File
@@ -1,3 +1,8 @@
=== 0.1.2 / 2013-04-28
* Improve garbage collection to avoid leaking Redis memory
=== 0.1.1 / 2012-07-15
* Fix an implicit global variable leak (missing semicolon)
-66
View File
@@ -1,66 +0,0 @@
# faye-redis [![Build Status](https://secure.travis-ci.org/faye/faye-redis-node.png)](http://travis-ci.org/faye/faye-redis-node)
This plugin provides a Redis-based backend for the [Faye](http://faye.jcoglan.com)
messaging server. It allows a single Faye service to be distributed across many
front-end web servers by storing state and routing messages through a
[Redis](http://redis.io) database server.
## Usage
Pass in the engine and any settings you need when setting up your Faye server.
```js
var faye = require('faye'),
redis = require('faye-redis'),
http = require('http');
var server = http.createServer();
var bayeux = new faye.NodeAdapter({
mount: '/',
timeout: 25,
engine: {
type: redis,
host: 'redis.example.com',
// more options
}
});
bayeux.attach(server);
server.listen(8000);
```
The full list of settings is as follows.
* <b><tt>host</tt></b> - hostname of your Redis instance
* <b><tt>port</tt></b> - port number, default is `6379`
* <b><tt>password</tt></b> - password, if `requirepass` is set
* <b><tt>database</tt></b> - number of database to use, default is `0`
* <b><tt>namespace</tt></b> - prefix applied to all keys, default is `''`
* <b><tt>socket</tt></b> - path to Unix socket if `unixsocket` is set
## License
(The MIT License)
Copyright (c) 2011-2012 James Coglan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the 'Software'), to deal in
the Software without restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+67
View File
@@ -0,0 +1,67 @@
# faye-redis [![Build Status](https://secure.travis-ci.org/faye/faye-redis-node.png)](http://travis-ci.org/faye/faye-redis-node)
This plugin provides a Redis-based backend for the
[Faye](http://faye.jcoglan.com) messaging server. It allows a single Faye
service to be distributed across many front-end web servers by storing state
and routing messages through a [Redis](http://redis.io) database server.
## Usage
Pass in the engine and any settings you need when setting up your Faye server.
```js
var faye = require('faye'),
redis = require('faye-redis'),
http = require('http');
var server = http.createServer();
var bayeux = new faye.NodeAdapter({
mount: '/',
timeout: 25,
engine: {
type: redis,
host: 'redis.example.com',
// more options
}
});
bayeux.attach(server);
server.listen(8000);
```
The full list of settings is as follows.
* <b>`host`</b> - hostname of your Redis instance
* <b>`port`</b> - port number, default is `6379`
* <b>`password`</b> - password, if `requirepass` is set
* <b>`database`</b> - number of database to use, default is `0`
* <b>`namespace`</b> - prefix applied to all keys, default is `''`
* <b>`socket`</b> - path to Unix socket if `unixsocket` is set
## License
(The MIT License)
Copyright (c) 2011-2013 James Coglan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the 'Software'), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+77 -63
View File
@@ -9,9 +9,9 @@ var Engine = function(server, options) {
auth = this._options.password,
gc = this._options.gc || this.DEFAULT_GC,
socket = this._options.socket;
this._ns = this._options.namespace || '';
if (socket) {
this._redis = redis.createClient(socket, {no_ready_check: true});
this._subscriber = redis.createClient(socket, {no_ready_check: true});
@@ -19,20 +19,20 @@ var Engine = function(server, options) {
this._redis = redis.createClient(port, host, {no_ready_check: true});
this._subscriber = redis.createClient(port, host, {no_ready_check: true});
}
if (auth) {
this._redis.auth(auth);
this._subscriber.auth(auth);
}
this._redis.select(db);
this._subscriber.select(db);
var self = this;
this._subscriber.subscribe(this._ns + '/notifications');
this._subscriber.on('message', function(topic, message) {
self.emptyQueue(message);
});
this._gc = setInterval(function() { self.gc() }, gc * 1000);
};
@@ -46,134 +46,148 @@ Engine.prototype = {
DEFAULT_DATABASE: 0,
DEFAULT_GC: 60,
LOCK_TIMEOUT: 120,
disconnect: function() {
this._redis.end();
this._subscriber.unsubscribe();
this._subscriber.end();
clearInterval(this._gc);
},
createClient: function(callback, scope) {
createClient: function(callback, context) {
var clientId = this._server.generateId(), self = this;
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
if (added === 0) return self.createClient(callback, scope);
if (added === 0) return self.createClient(callback, context);
self._server.debug('Created new client ?', clientId);
self.ping(clientId);
self._server.trigger('handshake', clientId);
callback.call(scope, clientId);
callback.call(context, clientId);
});
},
destroyClient: function(clientId, callback, scope) {
clientExists: function(clientId, callback, context) {
var cutoff = new Date().getTime() - (1000 * 1.6 * this._server.timeout);
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
callback.call(context, parseInt(score, 10) > cutoff);
});
},
destroyClient: function(clientId, callback, context) {
var self = this;
this._redis.zrem(this._ns + '/clients', clientId);
this._redis.del(this._ns + '/clients/' + clientId + '/messages');
this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
var n = channels.length, i = 0;
if (i === n) return self.afterDestroy(clientId, callback, scope);
channels.forEach(function(channel) {
self.unsubscribe(clientId, channel, function() {
i += 1;
if (i === n) self.afterDestroy(clientId, callback, scope);
this._redis.zadd(this._ns + '/clients', 0, clientId, function() {
self._redis.smembers(self._ns + '/clients/' + clientId + '/channels', function(error, channels) {
var n = channels.length, i = 0;
if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);
channels.forEach(function(channel) {
self.unsubscribe(clientId, channel, function() {
i += 1;
if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context);
});
});
});
});
},
afterDestroy: function(clientId, callback, scope) {
this._server.debug('Destroyed client ?', clientId);
this._server.trigger('disconnect', clientId);
if (callback) callback.call(scope);
},
clientExists: function(clientId, callback, scope) {
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
callback.call(scope, score !== null);
_afterSubscriptionsRemoved: function(clientId, callback, context) {
var self = this;
this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() {
self._redis.zrem(self._ns + '/clients', clientId, function() {
self._server.debug('Destroyed client ?', clientId);
self._server.trigger('disconnect', clientId);
if (callback) callback.call(context);
});
});
},
ping: function(clientId) {
var timeout = this._server.timeout;
if (typeof timeout !== 'number') return;
var time = new Date().getTime();
this._server.debug('Ping ?, ?', clientId, time);
this._redis.zadd(this._ns + '/clients', time, clientId);
},
subscribe: function(clientId, channel, callback, scope) {
subscribe: function(clientId, channel, callback, context) {
var self = this;
this._redis.sadd(this._ns + '/clients/' + clientId + '/channels', channel, function(error, added) {
if (added === 1) self._server.trigger('subscribe', clientId, channel);
});
this._redis.sadd(this._ns + '/channels' + channel, clientId, function() {
self._server.debug('Subscribed client ? to channel ?', clientId, channel);
if (callback) callback.call(scope);
if (callback) callback.call(context);
});
},
unsubscribe: function(clientId, channel, callback, scope) {
unsubscribe: function(clientId, channel, callback, context) {
var self = this;
this._redis.srem(this._ns + '/clients/' + clientId + '/channels', channel, function(error, removed) {
if (removed === 1) self._server.trigger('unsubscribe', clientId, channel);
});
this._redis.srem(this._ns + '/channels' + channel, clientId, function() {
self._server.debug('Unsubscribed client ? from channel ?', clientId, channel);
if (callback) callback.call(scope);
if (callback) callback.call(context);
});
},
publish: function(message, channels) {
this._server.debug('Publishing message ?', message);
var self = this,
jsonMessage = JSON.stringify(message),
keys = channels.map(function(c) { return self._ns + '/channels' + c });
var notify = function(error, clients) {
clients.forEach(function(clientId) {
var queue = self._ns + '/clients/' + clientId + '/messages';
self._server.debug('Queueing for client ?: ?', clientId, message);
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
self._redis.rpush(queue, jsonMessage);
self._redis.publish(self._ns + '/notifications', clientId);
self.clientExists(clientId, function(exists) {
if (!exists) self._redis.del(queue);
});
});
};
keys.push(notify);
this._redis.sunion.apply(this._redis, keys);
this._server.trigger('publish', message.clientId, message.channel, message.data);
},
emptyQueue: function(clientId) {
if (!this._server.hasConnection(clientId)) return;
var key = this._ns + '/clients/' + clientId + '/messages',
multi = this._redis.multi(),
self = this;
multi.lrange(key, 0, -1, function(error, jsonMessages) {
if (!jsonMessages) return;
var messages = jsonMessages.map(function(json) { return JSON.parse(json) });
self._server.deliver(clientId, messages);
});
multi.del(key);
multi.exec();
},
gc: function() {
var timeout = this._server.timeout;
if (typeof timeout !== 'number') return;
this._withLock('gc', function(releaseLock) {
var cutoff = new Date().getTime() - 1000 * 2 * timeout,
self = this;
this._redis.zrangebyscore(this._ns + '/clients', 0, cutoff, function(error, clients) {
var i = 0, n = clients.length;
if (i === n) return releaseLock();
clients.forEach(function(clientId) {
this.destroyClient(clientId, function() {
i += 1;
@@ -183,29 +197,29 @@ Engine.prototype = {
});
}, this);
},
_withLock: function(lockName, callback, scope) {
_withLock: function(lockName, callback, context) {
var lockKey = this._ns + '/locks/' + lockName,
currentTime = new Date().getTime(),
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
self = this;
var releaseLock = function() {
if (new Date().getTime() < expiry) self._redis.del(lockKey);
};
this._redis.setnx(lockKey, expiry, function(error, set) {
if (set === 1) return callback.call(scope, releaseLock);
if (set === 1) return callback.call(context, releaseLock);
self._redis.get(lockKey, function(error, timeout) {
if (!timeout) return;
var lockTimeout = parseInt(timeout, 10);
if (currentTime < lockTimeout) return;
self._redis.getset(lockKey, expiry, function(error, oldValue) {
if (oldValue !== timeout) return;
callback.call(scope, releaseLock);
callback.call(context, releaseLock);
});
});
});
+2 -2
View File
@@ -4,10 +4,10 @@
, "author" : "James Coglan <jcoglan@gmail.com> (http://jcoglan.com/)"
, "keywords" : ["pubsub", "bayeux"]
, "version" : "0.1.1"
, "version" : "0.1.3"
, "engines" : {"node": ">=0.4.0"}
, "main" : "./faye-redis"
, "dependencies" : {"hiredis": "", "redis": ""}
, "dependencies" : {"redis": ""}
, "devDependencies" : {"jsclass": ""}
, "scripts" : {"test": "node spec/runner.js"}
+6 -6
View File
@@ -5,7 +5,7 @@ JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this)
var pw = process.env.TRAVIS ? undefined : "foobared"
this.engineOpts = {type: RedisEngine, password: pw, namespace: new Date().getTime().toString()}
})
after(function(resume) { with(this) {
sync(function() {
engine.disconnect()
@@ -17,20 +17,20 @@ JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this)
})
})
}})
itShouldBehaveLike("faye engine")
describe("distribution", function() { with(this) {
itShouldBehaveLike("distributed engine")
}})
if (process.env.TRAVIS) return
describe("using a Unix socket", function() { with(this) {
before(function() { with(this) {
this.engineOpts.socket = "/tmp/redis.sock"
}})
itShouldBehaveLike("faye engine")
}})
}})
+1 -1
View File
@@ -1,7 +1,7 @@
require('jsclass')
JS.require('JS.Range', 'JS.Test')
Faye = require('../vendor/faye/build/faye-node')
Faye = require('../vendor/faye/build/node/faye-node')
require('../vendor/faye/spec/javascript/engine_spec')
require('./faye_redis_spec')
Vendored
+1 -1