Compare commits
35 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ea2348682e | |||
| 684e940266 | |||
| 5bd67aedb5 | |||
| 456b76ec71 | |||
| 04a68797f0 | |||
| 49fb05e1f2 | |||
| e76c9e736c | |||
| 9777d1acfe | |||
| 253549d2e7 | |||
| fee61a8d65 | |||
| 24f800c134 | |||
| 8e181f9801 | |||
| 47d793757e | |||
| 8f2aca6d5c | |||
| 75578ffdd2 | |||
| 86ae4baa3d | |||
| e84e20bba4 | |||
| 4fdafd8877 | |||
| 01b84b338c | |||
| fa4cd75dba | |||
| 66354e556d | |||
| 1f37edc011 | |||
| 865f0e13c7 | |||
| 5330ad8da7 | |||
| 5cc6fc1633 | |||
| f3336fed86 | |||
| 4f685b38e0 | |||
| eb6f67a222 | |||
| 3325d53d31 | |||
| 8c694087ce | |||
| d9c65ddc61 | |||
| 3d4fcec51b | |||
| 4a9b293891 | |||
| 3cb9589712 | |||
| 6722edb41b |
+1
-2
@@ -2,10 +2,9 @@
|
||||
.gitignore
|
||||
.gitmodules
|
||||
.npmignore
|
||||
.redcar
|
||||
.travis.yml
|
||||
dump.rdb
|
||||
Rakefile
|
||||
Makefile
|
||||
node_modules
|
||||
spec
|
||||
vendor
|
||||
|
||||
+11
-5
@@ -1,11 +1,17 @@
|
||||
language: node_js
|
||||
|
||||
node_js:
|
||||
- 0.4
|
||||
- 0.6
|
||||
- 0.8
|
||||
- "0.8"
|
||||
- "0.10"
|
||||
- "0.11"
|
||||
|
||||
services:
|
||||
- redis-server
|
||||
|
||||
before_install:
|
||||
- test $TRAVIS_NODE_VERSION = '0.8' && npm install -g npm@1.2.8000 || true
|
||||
|
||||
before_script:
|
||||
- rake prepare
|
||||
- make
|
||||
|
||||
env: TRAVIS=1
|
||||
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
### 0.2.0 / 2013-10-01
|
||||
|
||||
* Trigger the `close` event as required by Faye 1.0
|
||||
|
||||
|
||||
### 0.1.3 / 2013-05-11
|
||||
|
||||
* Fix a bug due to a misuse of `this`
|
||||
|
||||
|
||||
### 0.1.2 / 2013-04-28
|
||||
|
||||
* Improve garbage collection to avoid leaking Redis memory
|
||||
|
||||
|
||||
### 0.1.1 / 2012-07-15
|
||||
|
||||
* Fix an implicit global variable leak (missing semicolon)
|
||||
|
||||
|
||||
### 0.1.0 / 2012-02-26
|
||||
|
||||
* Initial release: Redis backend for Faye 0.8
|
||||
@@ -1,9 +0,0 @@
|
||||
=== 0.1.1 / 2012-07-15
|
||||
|
||||
* Fix an implicit global variable leak (missing semicolon)
|
||||
|
||||
|
||||
=== 0.1.0 / 2012-02-26
|
||||
|
||||
* Initial release: Redis backend for Faye 0.8
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
# Code of Conduct
|
||||
|
||||
All projects under the [Faye](https://github.com/faye) umbrella are covered by
|
||||
the [Code of Conduct](https://github.com/faye/code-of-conduct).
|
||||
@@ -0,0 +1,5 @@
|
||||
prepare:
|
||||
git submodule update --init --recursive
|
||||
cd vendor/faye && npm install
|
||||
cd vendor/faye && ./node_modules/.bin/wake
|
||||
npm install
|
||||
+15
-16
@@ -1,9 +1,9 @@
|
||||
# faye-redis [](http://travis-ci.org/faye/faye-redis-node)
|
||||
# faye-redis [](http://travis-ci.org/faye/faye-redis-node)
|
||||
|
||||
This plugin provides a Redis-based backend for the [Faye](http://faye.jcoglan.com)
|
||||
messaging server. It allows a single Faye service to be distributed across many
|
||||
front-end web servers by storing state and routing messages through a
|
||||
[Redis](http://redis.io) database server.
|
||||
This plugin provides a Redis-based backend for the
|
||||
[Faye](http://faye.jcoglan.com) messaging server. It allows a single Faye
|
||||
service to be distributed across many front-end web servers by storing state and
|
||||
routing messages through a [Redis](http://redis.io) database server.
|
||||
|
||||
|
||||
## Usage
|
||||
@@ -33,25 +33,25 @@ server.listen(8000);
|
||||
|
||||
The full list of settings is as follows.
|
||||
|
||||
* <b><tt>host</tt></b> - hostname of your Redis instance
|
||||
* <b><tt>port</tt></b> - port number, default is `6379`
|
||||
* <b><tt>password</tt></b> - password, if `requirepass` is set
|
||||
* <b><tt>database</tt></b> - number of database to use, default is `0`
|
||||
* <b><tt>namespace</tt></b> - prefix applied to all keys, default is `''`
|
||||
* <b><tt>socket</tt></b> - path to Unix socket if `unixsocket` is set
|
||||
* <b>`host`</b> - hostname of your Redis instance
|
||||
* <b>`port`</b> - port number, default is `6379`
|
||||
* <b>`password`</b> - password, if `requirepass` is set
|
||||
* <b>`database`</b> - number of database to use, default is `0`
|
||||
* <b>`namespace`</b> - prefix applied to all keys, default is `''`
|
||||
* <b>`socket`</b> - path to Unix socket if `unixsocket` is set
|
||||
|
||||
|
||||
## License
|
||||
|
||||
(The MIT License)
|
||||
|
||||
Copyright (c) 2011-2012 James Coglan
|
||||
Copyright (c) 2011-2013 James Coglan
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the 'Software'), to deal in
|
||||
the Software without restriction, including without limitation the rights to use,
|
||||
copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
|
||||
Software, and to permit persons to whom the Software is furnished to do so,
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
@@ -63,4 +63,3 @@ FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
require 'fileutils'
|
||||
|
||||
task :prepare do
|
||||
`git submodule update --init --recursive`
|
||||
`gem install jake`
|
||||
FileUtils.cd 'vendor/faye' do
|
||||
`bundle install`
|
||||
`npm install`
|
||||
`jake`
|
||||
end
|
||||
`npm install`
|
||||
end
|
||||
|
||||
task :default => :prepare
|
||||
|
||||
+87
-66
@@ -9,9 +9,9 @@ var Engine = function(server, options) {
|
||||
auth = this._options.password,
|
||||
gc = this._options.gc || this.DEFAULT_GC,
|
||||
socket = this._options.socket;
|
||||
|
||||
|
||||
this._ns = this._options.namespace || '';
|
||||
|
||||
|
||||
if (socket) {
|
||||
this._redis = redis.createClient(socket, {no_ready_check: true});
|
||||
this._subscriber = redis.createClient(socket, {no_ready_check: true});
|
||||
@@ -19,20 +19,25 @@ var Engine = function(server, options) {
|
||||
this._redis = redis.createClient(port, host, {no_ready_check: true});
|
||||
this._subscriber = redis.createClient(port, host, {no_ready_check: true});
|
||||
}
|
||||
|
||||
|
||||
if (auth) {
|
||||
this._redis.auth(auth);
|
||||
this._subscriber.auth(auth);
|
||||
}
|
||||
this._redis.select(db);
|
||||
this._subscriber.select(db);
|
||||
|
||||
|
||||
this._messageChannel = this._ns + '/notifications/messages';
|
||||
this._closeChannel = this._ns + '/notifications/close';
|
||||
|
||||
var self = this;
|
||||
this._subscriber.subscribe(this._ns + '/notifications');
|
||||
this._subscriber.subscribe(this._messageChannel);
|
||||
this._subscriber.subscribe(this._closeChannel);
|
||||
this._subscriber.on('message', function(topic, message) {
|
||||
self.emptyQueue(message);
|
||||
if (topic === self._messageChannel) self.emptyQueue(message);
|
||||
if (topic === self._closeChannel) self._server.trigger('close', message);
|
||||
});
|
||||
|
||||
|
||||
this._gc = setInterval(function() { self.gc() }, gc * 1000);
|
||||
};
|
||||
|
||||
@@ -46,134 +51,150 @@ Engine.prototype = {
|
||||
DEFAULT_DATABASE: 0,
|
||||
DEFAULT_GC: 60,
|
||||
LOCK_TIMEOUT: 120,
|
||||
|
||||
|
||||
disconnect: function() {
|
||||
this._redis.end();
|
||||
this._subscriber.unsubscribe();
|
||||
this._subscriber.end();
|
||||
clearInterval(this._gc);
|
||||
},
|
||||
|
||||
createClient: function(callback, scope) {
|
||||
|
||||
createClient: function(callback, context) {
|
||||
var clientId = this._server.generateId(), self = this;
|
||||
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
|
||||
if (added === 0) return self.createClient(callback, scope);
|
||||
if (added === 0) return self.createClient(callback, context);
|
||||
self._server.debug('Created new client ?', clientId);
|
||||
self.ping(clientId);
|
||||
self._server.trigger('handshake', clientId);
|
||||
callback.call(scope, clientId);
|
||||
callback.call(context, clientId);
|
||||
});
|
||||
},
|
||||
|
||||
destroyClient: function(clientId, callback, scope) {
|
||||
|
||||
clientExists: function(clientId, callback, context) {
|
||||
var cutoff = new Date().getTime() - (1000 * 1.6 * this._server.timeout);
|
||||
|
||||
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
|
||||
callback.call(context, parseInt(score, 10) > cutoff);
|
||||
});
|
||||
},
|
||||
|
||||
destroyClient: function(clientId, callback, context) {
|
||||
var self = this;
|
||||
this._redis.zrem(this._ns + '/clients', clientId);
|
||||
this._redis.del(this._ns + '/clients/' + clientId + '/messages');
|
||||
|
||||
|
||||
this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
|
||||
var n = channels.length, i = 0;
|
||||
if (i === n) return self.afterDestroy(clientId, callback, scope);
|
||||
|
||||
var multi = self._redis.multi();
|
||||
|
||||
multi.zadd(self._ns + '/clients', 0, clientId);
|
||||
|
||||
channels.forEach(function(channel) {
|
||||
self.unsubscribe(clientId, channel, function() {
|
||||
i += 1;
|
||||
if (i === n) self.afterDestroy(clientId, callback, scope);
|
||||
multi.srem(self._ns + '/clients/' + clientId + '/channels', channel);
|
||||
multi.srem(self._ns + '/channels' + channel, clientId);
|
||||
});
|
||||
multi.del(self._ns + '/clients/' + clientId + '/messages');
|
||||
multi.zrem(self._ns + '/clients', clientId);
|
||||
multi.publish(self._closeChannel, clientId);
|
||||
|
||||
multi.exec(function(error, results) {
|
||||
channels.forEach(function(channel, i) {
|
||||
if (results[2 * i + 1] !== 1) return;
|
||||
self._server.trigger('unsubscribe', clientId, channel);
|
||||
self._server.debug('Unsubscribed client ? from channel ?', clientId, channel);
|
||||
});
|
||||
|
||||
self._server.debug('Destroyed client ?', clientId);
|
||||
self._server.trigger('disconnect', clientId);
|
||||
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
afterDestroy: function(clientId, callback, scope) {
|
||||
this._server.debug('Destroyed client ?', clientId);
|
||||
this._server.trigger('disconnect', clientId);
|
||||
if (callback) callback.call(scope);
|
||||
},
|
||||
|
||||
clientExists: function(clientId, callback, scope) {
|
||||
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
|
||||
callback.call(scope, score !== null);
|
||||
});
|
||||
},
|
||||
|
||||
|
||||
ping: function(clientId) {
|
||||
var timeout = this._server.timeout;
|
||||
if (typeof timeout !== 'number') return;
|
||||
|
||||
|
||||
var time = new Date().getTime();
|
||||
|
||||
|
||||
this._server.debug('Ping ?, ?', clientId, time);
|
||||
this._redis.zadd(this._ns + '/clients', time, clientId);
|
||||
},
|
||||
|
||||
subscribe: function(clientId, channel, callback, scope) {
|
||||
|
||||
subscribe: function(clientId, channel, callback, context) {
|
||||
var self = this;
|
||||
this._redis.sadd(this._ns + '/clients/' + clientId + '/channels', channel, function(error, added) {
|
||||
if (added === 1) self._server.trigger('subscribe', clientId, channel);
|
||||
});
|
||||
this._redis.sadd(this._ns + '/channels' + channel, clientId, function() {
|
||||
self._server.debug('Subscribed client ? to channel ?', clientId, channel);
|
||||
if (callback) callback.call(scope);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
},
|
||||
|
||||
unsubscribe: function(clientId, channel, callback, scope) {
|
||||
|
||||
unsubscribe: function(clientId, channel, callback, context) {
|
||||
var self = this;
|
||||
this._redis.srem(this._ns + '/clients/' + clientId + '/channels', channel, function(error, removed) {
|
||||
if (removed === 1) self._server.trigger('unsubscribe', clientId, channel);
|
||||
});
|
||||
this._redis.srem(this._ns + '/channels' + channel, clientId, function() {
|
||||
self._server.debug('Unsubscribed client ? from channel ?', clientId, channel);
|
||||
if (callback) callback.call(scope);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
},
|
||||
|
||||
|
||||
publish: function(message, channels) {
|
||||
this._server.debug('Publishing message ?', message);
|
||||
|
||||
|
||||
var self = this,
|
||||
jsonMessage = JSON.stringify(message),
|
||||
keys = channels.map(function(c) { return self._ns + '/channels' + c });
|
||||
|
||||
|
||||
var notify = function(error, clients) {
|
||||
clients.forEach(function(clientId) {
|
||||
var queue = self._ns + '/clients/' + clientId + '/messages';
|
||||
|
||||
self._server.debug('Queueing for client ?: ?', clientId, message);
|
||||
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
|
||||
self._redis.publish(self._ns + '/notifications', clientId);
|
||||
self._redis.rpush(queue, jsonMessage);
|
||||
self._redis.publish(self._messageChannel, clientId);
|
||||
|
||||
self.clientExists(clientId, function(exists) {
|
||||
if (!exists) self._redis.del(queue);
|
||||
});
|
||||
});
|
||||
};
|
||||
keys.push(notify);
|
||||
this._redis.sunion.apply(this._redis, keys);
|
||||
|
||||
|
||||
this._server.trigger('publish', message.clientId, message.channel, message.data);
|
||||
},
|
||||
|
||||
|
||||
emptyQueue: function(clientId) {
|
||||
if (!this._server.hasConnection(clientId)) return;
|
||||
|
||||
|
||||
var key = this._ns + '/clients/' + clientId + '/messages',
|
||||
multi = this._redis.multi(),
|
||||
self = this;
|
||||
|
||||
|
||||
multi.lrange(key, 0, -1, function(error, jsonMessages) {
|
||||
if (!jsonMessages) return;
|
||||
var messages = jsonMessages.map(function(json) { return JSON.parse(json) });
|
||||
self._server.deliver(clientId, messages);
|
||||
});
|
||||
multi.del(key);
|
||||
multi.exec();
|
||||
},
|
||||
|
||||
|
||||
gc: function() {
|
||||
var timeout = this._server.timeout;
|
||||
if (typeof timeout !== 'number') return;
|
||||
|
||||
|
||||
this._withLock('gc', function(releaseLock) {
|
||||
var cutoff = new Date().getTime() - 1000 * 2 * timeout,
|
||||
self = this;
|
||||
|
||||
|
||||
this._redis.zrangebyscore(this._ns + '/clients', 0, cutoff, function(error, clients) {
|
||||
var i = 0, n = clients.length;
|
||||
if (i === n) return releaseLock();
|
||||
|
||||
|
||||
clients.forEach(function(clientId) {
|
||||
this.destroyClient(clientId, function() {
|
||||
i += 1;
|
||||
@@ -183,29 +204,29 @@ Engine.prototype = {
|
||||
});
|
||||
}, this);
|
||||
},
|
||||
|
||||
_withLock: function(lockName, callback, scope) {
|
||||
|
||||
_withLock: function(lockName, callback, context) {
|
||||
var lockKey = this._ns + '/locks/' + lockName,
|
||||
currentTime = new Date().getTime(),
|
||||
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
|
||||
self = this;
|
||||
|
||||
|
||||
var releaseLock = function() {
|
||||
if (new Date().getTime() < expiry) self._redis.del(lockKey);
|
||||
};
|
||||
|
||||
|
||||
this._redis.setnx(lockKey, expiry, function(error, set) {
|
||||
if (set === 1) return callback.call(scope, releaseLock);
|
||||
|
||||
if (set === 1) return callback.call(context, releaseLock);
|
||||
|
||||
self._redis.get(lockKey, function(error, timeout) {
|
||||
if (!timeout) return;
|
||||
|
||||
|
||||
var lockTimeout = parseInt(timeout, 10);
|
||||
if (currentTime < lockTimeout) return;
|
||||
|
||||
|
||||
self._redis.getset(lockKey, expiry, function(error, oldValue) {
|
||||
if (oldValue !== timeout) return;
|
||||
callback.call(scope, releaseLock);
|
||||
callback.call(context, releaseLock);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
+7
-13
@@ -3,25 +3,19 @@
|
||||
, "homepage" : "http://github.com/faye/faye-redis-node"
|
||||
, "author" : "James Coglan <jcoglan@gmail.com> (http://jcoglan.com/)"
|
||||
, "keywords" : ["pubsub", "bayeux"]
|
||||
, "license" : "MIT"
|
||||
|
||||
, "version" : "0.1.1"
|
||||
, "version" : "0.2.0"
|
||||
, "engines" : {"node": ">=0.4.0"}
|
||||
, "main" : "./faye-redis"
|
||||
, "dependencies" : {"hiredis": "", "redis": ""}
|
||||
, "devDependencies" : {"jsclass": ""}
|
||||
, "dependencies" : {"redis": ""}
|
||||
, "devDependencies" : {"jstest": ""}
|
||||
|
||||
, "scripts" : {"test": "node spec/runner.js"}
|
||||
|
||||
, "bugs" : "http://github.com/faye/faye-redis-node/issues"
|
||||
|
||||
, "licenses" : [ { "type" : "MIT"
|
||||
, "url" : "http://www.opensource.org/licenses/mit-license.php"
|
||||
}
|
||||
]
|
||||
|
||||
, "repositories" : [ { "type" : "git"
|
||||
, "url" : "git://github.com/faye/faye-redis-node.git"
|
||||
}
|
||||
]
|
||||
, "repository" : { "type" : "git"
|
||||
, "url" : "git://github.com/faye/faye-redis-node.git"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+13
-15
@@ -1,36 +1,34 @@
|
||||
var RedisEngine = require('../faye-redis')
|
||||
|
||||
JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this) {
|
||||
JS.Test.describe("Redis engine", function() { with(this) {
|
||||
before(function() {
|
||||
var pw = process.env.TRAVIS ? undefined : "foobared"
|
||||
this.engineOpts = {type: RedisEngine, password: pw, namespace: new Date().getTime().toString()}
|
||||
})
|
||||
|
||||
|
||||
after(function(resume) { with(this) {
|
||||
sync(function() {
|
||||
engine.disconnect()
|
||||
var redis = require('redis').createClient(6379, 'localhost', {no_ready_check: true})
|
||||
redis.auth(engineOpts.password)
|
||||
redis.flushall(function() {
|
||||
redis.end()
|
||||
resume()
|
||||
})
|
||||
disconnect_engine()
|
||||
var redis = require('redis').createClient(6379, 'localhost', {no_ready_check: true})
|
||||
redis.auth(engineOpts.password)
|
||||
redis.flushall(function() {
|
||||
redis.end()
|
||||
resume()
|
||||
})
|
||||
}})
|
||||
|
||||
|
||||
itShouldBehaveLike("faye engine")
|
||||
|
||||
|
||||
describe("distribution", function() { with(this) {
|
||||
itShouldBehaveLike("distributed engine")
|
||||
}})
|
||||
|
||||
|
||||
if (process.env.TRAVIS) return
|
||||
|
||||
|
||||
describe("using a Unix socket", function() { with(this) {
|
||||
before(function() { with(this) {
|
||||
this.engineOpts.socket = "/tmp/redis.sock"
|
||||
}})
|
||||
|
||||
|
||||
itShouldBehaveLike("faye engine")
|
||||
}})
|
||||
}})
|
||||
|
||||
@@ -39,4 +39,3 @@ list-max-ziplist-value 64
|
||||
set-max-intset-entries 512
|
||||
|
||||
activerehashing yes
|
||||
|
||||
|
||||
+2
-3
@@ -1,7 +1,6 @@
|
||||
require('jsclass')
|
||||
JS.require('JS.Range', 'JS.Test')
|
||||
JS = require('jstest')
|
||||
|
||||
Faye = require('../vendor/faye/build/faye-node')
|
||||
Faye = require('../vendor/faye/build/node/faye-node')
|
||||
require('../vendor/faye/spec/javascript/engine_spec')
|
||||
require('./faye_redis_spec')
|
||||
|
||||
|
||||
Vendored
+1
-1
Submodule vendor/faye updated: 6ebaa51670...aab4127591
Reference in New Issue
Block a user