Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 66354e556d | |||
| 1f37edc011 | |||
| 865f0e13c7 | |||
| 5330ad8da7 | |||
| 5cc6fc1633 | |||
| f3336fed86 | |||
| 4f685b38e0 | |||
| eb6f67a222 | |||
| 3325d53d31 | |||
| 8c694087ce | |||
| d9c65ddc61 | |||
| 3d4fcec51b | |||
| 4a9b293891 | |||
| 3cb9589712 | |||
| 6722edb41b |
+4
-3
@@ -1,8 +1,9 @@
|
||||
language: node_js
|
||||
|
||||
node_js:
|
||||
- 0.4
|
||||
- 0.6
|
||||
- 0.8
|
||||
- "0.6"
|
||||
- "0.8"
|
||||
- "0.10"
|
||||
|
||||
before_script:
|
||||
- rake prepare
|
||||
|
||||
@@ -1,3 +1,8 @@
|
||||
=== 0.1.2 / 2013-04-28
|
||||
|
||||
* Improve garbage collection to avoid leaking Redis memory
|
||||
|
||||
|
||||
=== 0.1.1 / 2012-07-15
|
||||
|
||||
* Fix an implicit global variable leak (missing semicolon)
|
||||
|
||||
+77
-63
@@ -9,9 +9,9 @@ var Engine = function(server, options) {
|
||||
auth = this._options.password,
|
||||
gc = this._options.gc || this.DEFAULT_GC,
|
||||
socket = this._options.socket;
|
||||
|
||||
|
||||
this._ns = this._options.namespace || '';
|
||||
|
||||
|
||||
if (socket) {
|
||||
this._redis = redis.createClient(socket, {no_ready_check: true});
|
||||
this._subscriber = redis.createClient(socket, {no_ready_check: true});
|
||||
@@ -19,20 +19,20 @@ var Engine = function(server, options) {
|
||||
this._redis = redis.createClient(port, host, {no_ready_check: true});
|
||||
this._subscriber = redis.createClient(port, host, {no_ready_check: true});
|
||||
}
|
||||
|
||||
|
||||
if (auth) {
|
||||
this._redis.auth(auth);
|
||||
this._subscriber.auth(auth);
|
||||
}
|
||||
this._redis.select(db);
|
||||
this._subscriber.select(db);
|
||||
|
||||
|
||||
var self = this;
|
||||
this._subscriber.subscribe(this._ns + '/notifications');
|
||||
this._subscriber.on('message', function(topic, message) {
|
||||
self.emptyQueue(message);
|
||||
});
|
||||
|
||||
|
||||
this._gc = setInterval(function() { self.gc() }, gc * 1000);
|
||||
};
|
||||
|
||||
@@ -46,134 +46,148 @@ Engine.prototype = {
|
||||
DEFAULT_DATABASE: 0,
|
||||
DEFAULT_GC: 60,
|
||||
LOCK_TIMEOUT: 120,
|
||||
|
||||
|
||||
disconnect: function() {
|
||||
this._redis.end();
|
||||
this._subscriber.unsubscribe();
|
||||
this._subscriber.end();
|
||||
clearInterval(this._gc);
|
||||
},
|
||||
|
||||
createClient: function(callback, scope) {
|
||||
|
||||
createClient: function(callback, context) {
|
||||
var clientId = this._server.generateId(), self = this;
|
||||
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
|
||||
if (added === 0) return self.createClient(callback, scope);
|
||||
if (added === 0) return self.createClient(callback, context);
|
||||
self._server.debug('Created new client ?', clientId);
|
||||
self.ping(clientId);
|
||||
self._server.trigger('handshake', clientId);
|
||||
callback.call(scope, clientId);
|
||||
callback.call(context, clientId);
|
||||
});
|
||||
},
|
||||
|
||||
destroyClient: function(clientId, callback, scope) {
|
||||
|
||||
clientExists: function(clientId, callback, context) {
|
||||
var cutoff = new Date().getTime() - (1000 * 1.6 * this._server.timeout);
|
||||
|
||||
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
|
||||
callback.call(context, parseInt(score, 10) > cutoff);
|
||||
});
|
||||
},
|
||||
|
||||
destroyClient: function(clientId, callback, context) {
|
||||
var self = this;
|
||||
this._redis.zrem(this._ns + '/clients', clientId);
|
||||
this._redis.del(this._ns + '/clients/' + clientId + '/messages');
|
||||
|
||||
this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
|
||||
var n = channels.length, i = 0;
|
||||
if (i === n) return self.afterDestroy(clientId, callback, scope);
|
||||
|
||||
channels.forEach(function(channel) {
|
||||
self.unsubscribe(clientId, channel, function() {
|
||||
i += 1;
|
||||
if (i === n) self.afterDestroy(clientId, callback, scope);
|
||||
|
||||
this._redis.zadd(this._ns + '/clients', 0, clientId, function() {
|
||||
self._redis.smembers(self._ns + '/clients/' + clientId + '/channels', function(error, channels) {
|
||||
var n = channels.length, i = 0;
|
||||
if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);
|
||||
|
||||
channels.forEach(function(channel) {
|
||||
self.unsubscribe(clientId, channel, function() {
|
||||
i += 1;
|
||||
if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
afterDestroy: function(clientId, callback, scope) {
|
||||
this._server.debug('Destroyed client ?', clientId);
|
||||
this._server.trigger('disconnect', clientId);
|
||||
if (callback) callback.call(scope);
|
||||
},
|
||||
|
||||
clientExists: function(clientId, callback, scope) {
|
||||
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
|
||||
callback.call(scope, score !== null);
|
||||
|
||||
_afterSubscriptionsRemoved: function(clientId, callback, context) {
|
||||
var self = this;
|
||||
this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() {
|
||||
self._redis.zrem(self._ns + '/clients', clientId, function() {
|
||||
self._server.debug('Destroyed client ?', clientId);
|
||||
self._server.trigger('disconnect', clientId);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
|
||||
ping: function(clientId) {
|
||||
var timeout = this._server.timeout;
|
||||
if (typeof timeout !== 'number') return;
|
||||
|
||||
|
||||
var time = new Date().getTime();
|
||||
|
||||
|
||||
this._server.debug('Ping ?, ?', clientId, time);
|
||||
this._redis.zadd(this._ns + '/clients', time, clientId);
|
||||
},
|
||||
|
||||
subscribe: function(clientId, channel, callback, scope) {
|
||||
|
||||
subscribe: function(clientId, channel, callback, context) {
|
||||
var self = this;
|
||||
this._redis.sadd(this._ns + '/clients/' + clientId + '/channels', channel, function(error, added) {
|
||||
if (added === 1) self._server.trigger('subscribe', clientId, channel);
|
||||
});
|
||||
this._redis.sadd(this._ns + '/channels' + channel, clientId, function() {
|
||||
self._server.debug('Subscribed client ? to channel ?', clientId, channel);
|
||||
if (callback) callback.call(scope);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
},
|
||||
|
||||
unsubscribe: function(clientId, channel, callback, scope) {
|
||||
|
||||
unsubscribe: function(clientId, channel, callback, context) {
|
||||
var self = this;
|
||||
this._redis.srem(this._ns + '/clients/' + clientId + '/channels', channel, function(error, removed) {
|
||||
if (removed === 1) self._server.trigger('unsubscribe', clientId, channel);
|
||||
});
|
||||
this._redis.srem(this._ns + '/channels' + channel, clientId, function() {
|
||||
self._server.debug('Unsubscribed client ? from channel ?', clientId, channel);
|
||||
if (callback) callback.call(scope);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
},
|
||||
|
||||
|
||||
publish: function(message, channels) {
|
||||
this._server.debug('Publishing message ?', message);
|
||||
|
||||
|
||||
var self = this,
|
||||
jsonMessage = JSON.stringify(message),
|
||||
keys = channels.map(function(c) { return self._ns + '/channels' + c });
|
||||
|
||||
|
||||
var notify = function(error, clients) {
|
||||
clients.forEach(function(clientId) {
|
||||
var queue = self._ns + '/clients/' + clientId + '/messages';
|
||||
|
||||
self._server.debug('Queueing for client ?: ?', clientId, message);
|
||||
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
|
||||
self._redis.rpush(queue, jsonMessage);
|
||||
self._redis.publish(self._ns + '/notifications', clientId);
|
||||
|
||||
self.clientExists(clientId, function(exists) {
|
||||
if (!exists) this._redis.del(queue);
|
||||
});
|
||||
});
|
||||
};
|
||||
keys.push(notify);
|
||||
this._redis.sunion.apply(this._redis, keys);
|
||||
|
||||
|
||||
this._server.trigger('publish', message.clientId, message.channel, message.data);
|
||||
},
|
||||
|
||||
|
||||
emptyQueue: function(clientId) {
|
||||
if (!this._server.hasConnection(clientId)) return;
|
||||
|
||||
|
||||
var key = this._ns + '/clients/' + clientId + '/messages',
|
||||
multi = this._redis.multi(),
|
||||
self = this;
|
||||
|
||||
|
||||
multi.lrange(key, 0, -1, function(error, jsonMessages) {
|
||||
if (!jsonMessages) return;
|
||||
var messages = jsonMessages.map(function(json) { return JSON.parse(json) });
|
||||
self._server.deliver(clientId, messages);
|
||||
});
|
||||
multi.del(key);
|
||||
multi.exec();
|
||||
},
|
||||
|
||||
|
||||
gc: function() {
|
||||
var timeout = this._server.timeout;
|
||||
if (typeof timeout !== 'number') return;
|
||||
|
||||
|
||||
this._withLock('gc', function(releaseLock) {
|
||||
var cutoff = new Date().getTime() - 1000 * 2 * timeout,
|
||||
self = this;
|
||||
|
||||
|
||||
this._redis.zrangebyscore(this._ns + '/clients', 0, cutoff, function(error, clients) {
|
||||
var i = 0, n = clients.length;
|
||||
if (i === n) return releaseLock();
|
||||
|
||||
|
||||
clients.forEach(function(clientId) {
|
||||
this.destroyClient(clientId, function() {
|
||||
i += 1;
|
||||
@@ -183,29 +197,29 @@ Engine.prototype = {
|
||||
});
|
||||
}, this);
|
||||
},
|
||||
|
||||
_withLock: function(lockName, callback, scope) {
|
||||
|
||||
_withLock: function(lockName, callback, context) {
|
||||
var lockKey = this._ns + '/locks/' + lockName,
|
||||
currentTime = new Date().getTime(),
|
||||
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
|
||||
self = this;
|
||||
|
||||
|
||||
var releaseLock = function() {
|
||||
if (new Date().getTime() < expiry) self._redis.del(lockKey);
|
||||
};
|
||||
|
||||
|
||||
this._redis.setnx(lockKey, expiry, function(error, set) {
|
||||
if (set === 1) return callback.call(scope, releaseLock);
|
||||
|
||||
if (set === 1) return callback.call(context, releaseLock);
|
||||
|
||||
self._redis.get(lockKey, function(error, timeout) {
|
||||
if (!timeout) return;
|
||||
|
||||
|
||||
var lockTimeout = parseInt(timeout, 10);
|
||||
if (currentTime < lockTimeout) return;
|
||||
|
||||
|
||||
self._redis.getset(lockKey, expiry, function(error, oldValue) {
|
||||
if (oldValue !== timeout) return;
|
||||
callback.call(scope, releaseLock);
|
||||
callback.call(context, releaseLock);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
+2
-2
@@ -4,10 +4,10 @@
|
||||
, "author" : "James Coglan <jcoglan@gmail.com> (http://jcoglan.com/)"
|
||||
, "keywords" : ["pubsub", "bayeux"]
|
||||
|
||||
, "version" : "0.1.1"
|
||||
, "version" : "0.1.2"
|
||||
, "engines" : {"node": ">=0.4.0"}
|
||||
, "main" : "./faye-redis"
|
||||
, "dependencies" : {"hiredis": "", "redis": ""}
|
||||
, "dependencies" : {"redis": ""}
|
||||
, "devDependencies" : {"jsclass": ""}
|
||||
|
||||
, "scripts" : {"test": "node spec/runner.js"}
|
||||
|
||||
@@ -5,7 +5,7 @@ JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this)
|
||||
var pw = process.env.TRAVIS ? undefined : "foobared"
|
||||
this.engineOpts = {type: RedisEngine, password: pw, namespace: new Date().getTime().toString()}
|
||||
})
|
||||
|
||||
|
||||
after(function(resume) { with(this) {
|
||||
sync(function() {
|
||||
engine.disconnect()
|
||||
@@ -17,20 +17,20 @@ JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this)
|
||||
})
|
||||
})
|
||||
}})
|
||||
|
||||
|
||||
itShouldBehaveLike("faye engine")
|
||||
|
||||
|
||||
describe("distribution", function() { with(this) {
|
||||
itShouldBehaveLike("distributed engine")
|
||||
}})
|
||||
|
||||
|
||||
if (process.env.TRAVIS) return
|
||||
|
||||
|
||||
describe("using a Unix socket", function() { with(this) {
|
||||
before(function() { with(this) {
|
||||
this.engineOpts.socket = "/tmp/redis.sock"
|
||||
}})
|
||||
|
||||
|
||||
itShouldBehaveLike("faye engine")
|
||||
}})
|
||||
}})
|
||||
|
||||
+1
-1
@@ -1,7 +1,7 @@
|
||||
require('jsclass')
|
||||
JS.require('JS.Range', 'JS.Test')
|
||||
|
||||
Faye = require('../vendor/faye/build/faye-node')
|
||||
Faye = require('../vendor/faye/build/node/faye-node')
|
||||
require('../vendor/faye/spec/javascript/engine_spec')
|
||||
require('./faye_redis_spec')
|
||||
|
||||
|
||||
Vendored
+1
-1
Submodule vendor/faye updated: 6ebaa51670...c7fac489b1
Reference in New Issue
Block a user