15 Commits

Author SHA1 Message Date
James Coglan 66354e556d Bump version to 0.1.2. 2013-04-28 12:13:33 +01:00
James Coglan 1f37edc011 Treat the result of ZSCORE as an int. 2013-04-28 12:00:05 +01:00
James Coglan 865f0e13c7 Don't duplicate code to construct the queue key. 2013-04-28 11:55:27 +01:00
James Coglan 5330ad8da7 Declare Node versions properly in .travis.yml. 2013-04-28 11:51:34 +01:00
James Coglan 5cc6fc1633 Apply some of @jpignata's patches to reduce leaked memory. 2013-04-28 11:49:39 +01:00
James Coglan f3336fed86 Chech the output of LRANGE before calling map() on it. 2013-04-20 23:08:47 +01:00
James Coglan 4f685b38e0 Remove trailing whitespace. 2013-04-11 23:33:11 +01:00
James Coglan eb6f67a222 Test on Node 0.10. 2013-04-11 23:30:19 +01:00
James Coglan 3325d53d31 Bump Faye module to 0.8.9. 2013-04-11 23:29:59 +01:00
James Coglan 8c694087ce Bump Faye submodule, remove hiredis dependency, test on Node 0.9 on Travis. 2012-12-22 20:50:14 +00:00
James Coglan d9c65ddc61 Don't test on Node 0.9, hiredis does not build there. 2012-09-14 07:53:07 +01:00
James Coglan 3d4fcec51b Fix path to faye library. 2012-09-14 07:48:30 +01:00
James Coglan 4a9b293891 Change the order of operations in destroyClient() so that the ID is not unregistered until its subscriptions and message queue have been deleted. 2012-09-14 07:45:11 +01:00
James Coglan 3cb9589712 Update Node versions in .travis.yml. 2012-09-14 07:36:03 +01:00
James Coglan 6722edb41b Bump Faye submodule to 0.8.3. 2012-09-14 07:35:27 +01:00
7 changed files with 96 additions and 76 deletions
+4 -3
View File
@@ -1,8 +1,9 @@
language: node_js
node_js:
- 0.4
- 0.6
- 0.8
- "0.6"
- "0.8"
- "0.10"
before_script:
- rake prepare
+5
View File
@@ -1,3 +1,8 @@
=== 0.1.2 / 2013-04-28
* Improve garbage collection to avoid leaking Redis memory
=== 0.1.1 / 2012-07-15
* Fix an implicit global variable leak (missing semicolon)
+77 -63
View File
@@ -9,9 +9,9 @@ var Engine = function(server, options) {
auth = this._options.password,
gc = this._options.gc || this.DEFAULT_GC,
socket = this._options.socket;
this._ns = this._options.namespace || '';
if (socket) {
this._redis = redis.createClient(socket, {no_ready_check: true});
this._subscriber = redis.createClient(socket, {no_ready_check: true});
@@ -19,20 +19,20 @@ var Engine = function(server, options) {
this._redis = redis.createClient(port, host, {no_ready_check: true});
this._subscriber = redis.createClient(port, host, {no_ready_check: true});
}
if (auth) {
this._redis.auth(auth);
this._subscriber.auth(auth);
}
this._redis.select(db);
this._subscriber.select(db);
var self = this;
this._subscriber.subscribe(this._ns + '/notifications');
this._subscriber.on('message', function(topic, message) {
self.emptyQueue(message);
});
this._gc = setInterval(function() { self.gc() }, gc * 1000);
};
@@ -46,134 +46,148 @@ Engine.prototype = {
DEFAULT_DATABASE: 0,
DEFAULT_GC: 60,
LOCK_TIMEOUT: 120,
disconnect: function() {
this._redis.end();
this._subscriber.unsubscribe();
this._subscriber.end();
clearInterval(this._gc);
},
createClient: function(callback, scope) {
createClient: function(callback, context) {
var clientId = this._server.generateId(), self = this;
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
if (added === 0) return self.createClient(callback, scope);
if (added === 0) return self.createClient(callback, context);
self._server.debug('Created new client ?', clientId);
self.ping(clientId);
self._server.trigger('handshake', clientId);
callback.call(scope, clientId);
callback.call(context, clientId);
});
},
destroyClient: function(clientId, callback, scope) {
clientExists: function(clientId, callback, context) {
var cutoff = new Date().getTime() - (1000 * 1.6 * this._server.timeout);
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
callback.call(context, parseInt(score, 10) > cutoff);
});
},
destroyClient: function(clientId, callback, context) {
var self = this;
this._redis.zrem(this._ns + '/clients', clientId);
this._redis.del(this._ns + '/clients/' + clientId + '/messages');
this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
var n = channels.length, i = 0;
if (i === n) return self.afterDestroy(clientId, callback, scope);
channels.forEach(function(channel) {
self.unsubscribe(clientId, channel, function() {
i += 1;
if (i === n) self.afterDestroy(clientId, callback, scope);
this._redis.zadd(this._ns + '/clients', 0, clientId, function() {
self._redis.smembers(self._ns + '/clients/' + clientId + '/channels', function(error, channels) {
var n = channels.length, i = 0;
if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);
channels.forEach(function(channel) {
self.unsubscribe(clientId, channel, function() {
i += 1;
if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context);
});
});
});
});
},
afterDestroy: function(clientId, callback, scope) {
this._server.debug('Destroyed client ?', clientId);
this._server.trigger('disconnect', clientId);
if (callback) callback.call(scope);
},
clientExists: function(clientId, callback, scope) {
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
callback.call(scope, score !== null);
_afterSubscriptionsRemoved: function(clientId, callback, context) {
var self = this;
this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() {
self._redis.zrem(self._ns + '/clients', clientId, function() {
self._server.debug('Destroyed client ?', clientId);
self._server.trigger('disconnect', clientId);
if (callback) callback.call(context);
});
});
},
ping: function(clientId) {
var timeout = this._server.timeout;
if (typeof timeout !== 'number') return;
var time = new Date().getTime();
this._server.debug('Ping ?, ?', clientId, time);
this._redis.zadd(this._ns + '/clients', time, clientId);
},
subscribe: function(clientId, channel, callback, scope) {
subscribe: function(clientId, channel, callback, context) {
var self = this;
this._redis.sadd(this._ns + '/clients/' + clientId + '/channels', channel, function(error, added) {
if (added === 1) self._server.trigger('subscribe', clientId, channel);
});
this._redis.sadd(this._ns + '/channels' + channel, clientId, function() {
self._server.debug('Subscribed client ? to channel ?', clientId, channel);
if (callback) callback.call(scope);
if (callback) callback.call(context);
});
},
unsubscribe: function(clientId, channel, callback, scope) {
unsubscribe: function(clientId, channel, callback, context) {
var self = this;
this._redis.srem(this._ns + '/clients/' + clientId + '/channels', channel, function(error, removed) {
if (removed === 1) self._server.trigger('unsubscribe', clientId, channel);
});
this._redis.srem(this._ns + '/channels' + channel, clientId, function() {
self._server.debug('Unsubscribed client ? from channel ?', clientId, channel);
if (callback) callback.call(scope);
if (callback) callback.call(context);
});
},
publish: function(message, channels) {
this._server.debug('Publishing message ?', message);
var self = this,
jsonMessage = JSON.stringify(message),
keys = channels.map(function(c) { return self._ns + '/channels' + c });
var notify = function(error, clients) {
clients.forEach(function(clientId) {
var queue = self._ns + '/clients/' + clientId + '/messages';
self._server.debug('Queueing for client ?: ?', clientId, message);
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
self._redis.rpush(queue, jsonMessage);
self._redis.publish(self._ns + '/notifications', clientId);
self.clientExists(clientId, function(exists) {
if (!exists) this._redis.del(queue);
});
});
};
keys.push(notify);
this._redis.sunion.apply(this._redis, keys);
this._server.trigger('publish', message.clientId, message.channel, message.data);
},
emptyQueue: function(clientId) {
if (!this._server.hasConnection(clientId)) return;
var key = this._ns + '/clients/' + clientId + '/messages',
multi = this._redis.multi(),
self = this;
multi.lrange(key, 0, -1, function(error, jsonMessages) {
if (!jsonMessages) return;
var messages = jsonMessages.map(function(json) { return JSON.parse(json) });
self._server.deliver(clientId, messages);
});
multi.del(key);
multi.exec();
},
gc: function() {
var timeout = this._server.timeout;
if (typeof timeout !== 'number') return;
this._withLock('gc', function(releaseLock) {
var cutoff = new Date().getTime() - 1000 * 2 * timeout,
self = this;
this._redis.zrangebyscore(this._ns + '/clients', 0, cutoff, function(error, clients) {
var i = 0, n = clients.length;
if (i === n) return releaseLock();
clients.forEach(function(clientId) {
this.destroyClient(clientId, function() {
i += 1;
@@ -183,29 +197,29 @@ Engine.prototype = {
});
}, this);
},
_withLock: function(lockName, callback, scope) {
_withLock: function(lockName, callback, context) {
var lockKey = this._ns + '/locks/' + lockName,
currentTime = new Date().getTime(),
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
self = this;
var releaseLock = function() {
if (new Date().getTime() < expiry) self._redis.del(lockKey);
};
this._redis.setnx(lockKey, expiry, function(error, set) {
if (set === 1) return callback.call(scope, releaseLock);
if (set === 1) return callback.call(context, releaseLock);
self._redis.get(lockKey, function(error, timeout) {
if (!timeout) return;
var lockTimeout = parseInt(timeout, 10);
if (currentTime < lockTimeout) return;
self._redis.getset(lockKey, expiry, function(error, oldValue) {
if (oldValue !== timeout) return;
callback.call(scope, releaseLock);
callback.call(context, releaseLock);
});
});
});
+2 -2
View File
@@ -4,10 +4,10 @@
, "author" : "James Coglan <jcoglan@gmail.com> (http://jcoglan.com/)"
, "keywords" : ["pubsub", "bayeux"]
, "version" : "0.1.1"
, "version" : "0.1.2"
, "engines" : {"node": ">=0.4.0"}
, "main" : "./faye-redis"
, "dependencies" : {"hiredis": "", "redis": ""}
, "dependencies" : {"redis": ""}
, "devDependencies" : {"jsclass": ""}
, "scripts" : {"test": "node spec/runner.js"}
+6 -6
View File
@@ -5,7 +5,7 @@ JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this)
var pw = process.env.TRAVIS ? undefined : "foobared"
this.engineOpts = {type: RedisEngine, password: pw, namespace: new Date().getTime().toString()}
})
after(function(resume) { with(this) {
sync(function() {
engine.disconnect()
@@ -17,20 +17,20 @@ JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this)
})
})
}})
itShouldBehaveLike("faye engine")
describe("distribution", function() { with(this) {
itShouldBehaveLike("distributed engine")
}})
if (process.env.TRAVIS) return
describe("using a Unix socket", function() { with(this) {
before(function() { with(this) {
this.engineOpts.socket = "/tmp/redis.sock"
}})
itShouldBehaveLike("faye engine")
}})
}})
+1 -1
View File
@@ -1,7 +1,7 @@
require('jsclass')
JS.require('JS.Range', 'JS.Test')
Faye = require('../vendor/faye/build/faye-node')
Faye = require('../vendor/faye/build/node/faye-node')
require('../vendor/faye/spec/javascript/engine_spec')
require('./faye_redis_spec')
Vendored
+1 -1