30 Commits

Author SHA1 Message Date
James Coglan 24f800c134 Ignore Makefile. 2013-10-01 17:07:04 +01:00
James Coglan 8e181f9801 Trigger the 'close' event as required by Faye 1.0. 2013-10-01 16:08:22 +01:00
James Coglan 47d793757e Replace Rake with Make. 2013-10-01 16:00:58 +01:00
James Coglan 8f2aca6d5c Bring tests up to date with Faye 1.0. 2013-10-01 15:59:55 +01:00
James Coglan 75578ffdd2 Update the changelog. 2013-05-11 18:19:10 +01:00
James Coglan 86ae4baa3d Prepare for release 0.1.3. 2013-05-11 15:13:55 +01:00
James Coglan e84e20bba4 Test on Node 0.11. 2013-05-11 14:59:52 +01:00
James Coglan 4fdafd8877 Merge pull request #8 from bitium/self_vs_this
self vs this fix for inside a callback
2013-05-11 06:55:54 -07:00
Andrew Arrow 01b84b338c self vs this fix for inside a callback 2013-05-06 12:51:50 -07:00
James Coglan fa4cd75dba Bump copyright date. 2013-04-28 12:27:30 +01:00
James Coglan 66354e556d Bump version to 0.1.2. 2013-04-28 12:13:33 +01:00
James Coglan 1f37edc011 Treat the result of ZSCORE as an int. 2013-04-28 12:00:05 +01:00
James Coglan 865f0e13c7 Don't duplicate code to construct the queue key. 2013-04-28 11:55:27 +01:00
James Coglan 5330ad8da7 Declare Node versions properly in .travis.yml. 2013-04-28 11:51:34 +01:00
James Coglan 5cc6fc1633 Apply some of @jpignata's patches to reduce leaked memory. 2013-04-28 11:49:39 +01:00
James Coglan f3336fed86 Chech the output of LRANGE before calling map() on it. 2013-04-20 23:08:47 +01:00
James Coglan 4f685b38e0 Remove trailing whitespace. 2013-04-11 23:33:11 +01:00
James Coglan eb6f67a222 Test on Node 0.10. 2013-04-11 23:30:19 +01:00
James Coglan 3325d53d31 Bump Faye module to 0.8.9. 2013-04-11 23:29:59 +01:00
James Coglan 8c694087ce Bump Faye submodule, remove hiredis dependency, test on Node 0.9 on Travis. 2012-12-22 20:50:14 +00:00
James Coglan d9c65ddc61 Don't test on Node 0.9, hiredis does not build there. 2012-09-14 07:53:07 +01:00
James Coglan 3d4fcec51b Fix path to faye library. 2012-09-14 07:48:30 +01:00
James Coglan 4a9b293891 Change the order of operations in destroyClient() so that the ID is not unregistered until its subscriptions and message queue have been deleted. 2012-09-14 07:45:11 +01:00
James Coglan 3cb9589712 Update Node versions in .travis.yml. 2012-09-14 07:36:03 +01:00
James Coglan 6722edb41b Bump Faye submodule to 0.8.3. 2012-09-14 07:35:27 +01:00
James Coglan 10a7d59898 Test on Node 0.8 with Travis. 2012-07-15 13:14:05 +01:00
James Coglan 002e55ae6c Bump version to 0.1.1. 2012-07-15 13:13:21 +01:00
James Coglan 1489e32d1f Merge pull request #1 from pselden4/master
Fix implicit variable creation.
2012-05-11 07:53:55 -07:00
Paul Selden 0b70daed71 Fix implicit variable creation. 2012-05-11 10:47:23 -04:00
James Coglan efaed17004 Remove branch identifier from Travis image. 2012-03-28 00:55:38 +01:00
11 changed files with 163 additions and 134 deletions
+1 -2
View File
@@ -2,10 +2,9 @@
.gitignore
.gitmodules
.npmignore
.redcar
.travis.yml
dump.rdb
Rakefile
Makefile
node_modules
spec
vendor
+6 -4
View File
@@ -1,11 +1,13 @@
language: node_js
node_js:
- 0.4
- 0.6
- 0.7
- "0.6"
- "0.8"
- "0.10"
- "0.11"
before_script:
- rake prepare
- make
env: TRAVIS=1
+24
View File
@@ -0,0 +1,24 @@
### 0.2.0 / 2013-10-01
* Trigger the `close` event as required by Faye 1.0
### 0.1.3 / 2013-05-11
* Fix a bug due to a misuse of `this`
### 0.1.2 / 2013-04-28
* Improve garbage collection to avoid leaking Redis memory
### 0.1.1 / 2012-07-15
* Fix an implicit global variable leak (missing semicolon)
### 0.1.0 / 2012-02-26
* Initial release: Redis backend for Faye 0.8
+6
View File
@@ -0,0 +1,6 @@
prepare:
git submodule update --init --recursive
cd vendor/faye && npm install
cd vendor/faye && ./node_modules/.bin/wake
npm install
+15 -15
View File
@@ -1,9 +1,9 @@
# faye-redis [![Build Status](https://secure.travis-ci.org/faye/faye-redis-node.png?branch=master)](http://travis-ci.org/faye/faye-redis-node)
# faye-redis [![Build Status](https://secure.travis-ci.org/faye/faye-redis-node.png)](http://travis-ci.org/faye/faye-redis-node)
This plugin provides a Redis-based backend for the [Faye](http://faye.jcoglan.com)
messaging server. It allows a single Faye service to be distributed across many
front-end web servers by storing state and routing messages through a
[Redis](http://redis.io) database server.
This plugin provides a Redis-based backend for the
[Faye](http://faye.jcoglan.com) messaging server. It allows a single Faye
service to be distributed across many front-end web servers by storing state and
routing messages through a [Redis](http://redis.io) database server.
## Usage
@@ -33,25 +33,25 @@ server.listen(8000);
The full list of settings is as follows.
* <b><tt>host</tt></b> - hostname of your Redis instance
* <b><tt>port</tt></b> - port number, default is `6379`
* <b><tt>password</tt></b> - password, if `requirepass` is set
* <b><tt>database</tt></b> - number of database to use, default is `0`
* <b><tt>namespace</tt></b> - prefix applied to all keys, default is `''`
* <b><tt>socket</tt></b> - path to Unix socket if `unixsocket` is set
* <b>`host`</b> - hostname of your Redis instance
* <b>`port`</b> - port number, default is `6379`
* <b>`password`</b> - password, if `requirepass` is set
* <b>`database`</b> - number of database to use, default is `0`
* <b>`namespace`</b> - prefix applied to all keys, default is `''`
* <b>`socket`</b> - path to Unix socket if `unixsocket` is set
## License
(The MIT License)
Copyright (c) 2011-2012 James Coglan
Copyright (c) 2011-2013 James Coglan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the 'Software'), to deal in
the Software without restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
Software, and to permit persons to whom the Software is furnished to do so,
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
-15
View File
@@ -1,15 +0,0 @@
require 'fileutils'
task :prepare do
`git submodule update --init --recursive`
`gem install jake`
FileUtils.cd 'vendor/faye' do
`bundle install`
`npm install`
`jake`
end
`npm install`
end
task :default => :prepare
+87 -67
View File
@@ -9,9 +9,9 @@ var Engine = function(server, options) {
auth = this._options.password,
gc = this._options.gc || this.DEFAULT_GC,
socket = this._options.socket;
this._ns = this._options.namespace || '';
if (socket) {
this._redis = redis.createClient(socket, {no_ready_check: true});
this._subscriber = redis.createClient(socket, {no_ready_check: true});
@@ -19,20 +19,25 @@ var Engine = function(server, options) {
this._redis = redis.createClient(port, host, {no_ready_check: true});
this._subscriber = redis.createClient(port, host, {no_ready_check: true});
}
if (auth) {
this._redis.auth(auth);
this._subscriber.auth(auth);
}
this._redis.select(db);
this._subscriber.select(db);
this._messageChannel = this._ns + '/notifications/messages';
this._closeChannel = this._ns + '/notifications/close';
var self = this;
this._subscriber.subscribe(this._ns + '/notifications');
this._subscriber.subscribe(this._messageChannel);
this._subscriber.subscribe(this._closeChannel);
this._subscriber.on('message', function(topic, message) {
self.emptyQueue(message);
if (topic === self._messageChannel) self.emptyQueue(message);
if (topic === self._closeChannel) self._server.trigger('close', message);
});
this._gc = setInterval(function() { self.gc() }, gc * 1000);
};
@@ -46,134 +51,149 @@ Engine.prototype = {
DEFAULT_DATABASE: 0,
DEFAULT_GC: 60,
LOCK_TIMEOUT: 120,
disconnect: function() {
this._redis.end();
this._subscriber.unsubscribe();
this._subscriber.end();
clearInterval(this._gc);
},
createClient: function(callback, scope) {
createClient: function(callback, context) {
var clientId = this._server.generateId(), self = this;
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
if (added === 0) return self.createClient(callback, scope);
if (added === 0) return self.createClient(callback, context);
self._server.debug('Created new client ?', clientId);
self.ping(clientId);
self._server.trigger('handshake', clientId);
callback.call(scope, clientId);
callback.call(context, clientId);
});
},
destroyClient: function(clientId, callback, scope) {
clientExists: function(clientId, callback, context) {
var cutoff = new Date().getTime() - (1000 * 1.6 * this._server.timeout);
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
callback.call(context, parseInt(score, 10) > cutoff);
});
},
destroyClient: function(clientId, callback, context) {
var self = this;
this._redis.zrem(this._ns + '/clients', clientId);
this._redis.del(this._ns + '/clients/' + clientId + '/messages');
this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
var n = channels.length, i = 0;
if (i === n) return self.afterDestroy(clientId, callback, scope);
channels.forEach(function(channel) {
self.unsubscribe(clientId, channel, function() {
i += 1;
if (i === n) self.afterDestroy(clientId, callback, scope);
this._redis.zadd(this._ns + '/clients', 0, clientId, function() {
self._redis.smembers(self._ns + '/clients/' + clientId + '/channels', function(error, channels) {
var n = channels.length, i = 0;
if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);
channels.forEach(function(channel) {
self.unsubscribe(clientId, channel, function() {
i += 1;
if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context);
});
});
});
});
},
afterDestroy: function(clientId, callback, scope) {
this._server.debug('Destroyed client ?', clientId);
this._server.trigger('disconnect', clientId);
if (callback) callback.call(scope);
},
clientExists: function(clientId, callback, scope) {
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
callback.call(scope, score !== null);
_afterSubscriptionsRemoved: function(clientId, callback, context) {
var self = this;
this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() {
self._redis.zrem(self._ns + '/clients', clientId, function() {
self._server.debug('Destroyed client ?', clientId);
self._server.trigger('disconnect', clientId);
self._redis.publish(self._closeChannel, clientId);
if (callback) callback.call(context);
});
});
},
ping: function(clientId) {
var timeout = this._server.timeout;
if (typeof timeout !== 'number') return;
var time = new Date().getTime();
this._server.debug('Ping ?, ?', clientId, time);
this._redis.zadd(this._ns + '/clients', time, clientId);
},
subscribe: function(clientId, channel, callback, scope) {
subscribe: function(clientId, channel, callback, context) {
var self = this;
this._redis.sadd(this._ns + '/clients/' + clientId + '/channels', channel, function(error, added) {
if (added === 1) self._server.trigger('subscribe', clientId, channel);
});
this._redis.sadd(this._ns + '/channels' + channel, clientId, function() {
self._server.debug('Subscribed client ? to channel ?', clientId, channel);
if (callback) callback.call(scope);
if (callback) callback.call(context);
});
},
unsubscribe: function(clientId, channel, callback, scope) {
unsubscribe: function(clientId, channel, callback, context) {
var self = this;
this._redis.srem(this._ns + '/clients/' + clientId + '/channels', channel, function(error, removed) {
if (removed === 1) self._server.trigger('unsubscribe', clientId, channel);
});
this._redis.srem(this._ns + '/channels' + channel, clientId, function() {
self._server.debug('Unsubscribed client ? from channel ?', clientId, channel);
if (callback) callback.call(scope);
if (callback) callback.call(context);
});
},
publish: function(message, channels) {
this._server.debug('Publishing message ?', message);
var self = this,
jsonMessage = JSON.stringify(message),
keys = channels.map(function(c) { return self._ns + '/channels' + c });
var notify = function(error, clients) {
clients.forEach(function(clientId) {
var queue = self._ns + '/clients/' + clientId + '/messages';
self._server.debug('Queueing for client ?: ?', clientId, message);
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
self._redis.publish(self._ns + '/notifications', clientId);
self._redis.rpush(queue, jsonMessage);
self._redis.publish(self._messageChannel, clientId);
self.clientExists(clientId, function(exists) {
if (!exists) self._redis.del(queue);
});
});
};
keys.push(notify);
this._redis.sunion.apply(this._redis, keys);
this._server.trigger('publish', message.clientId, message.channel, message.data);
},
emptyQueue: function(clientId) {
if (!this._server.hasConnection(clientId)) return;
var key = this._ns + '/clients/' + clientId + '/messages',
multi = this._redis.multi(),
self = this;
multi.lrange(key, 0, -1, function(error, jsonMessages) {
if (!jsonMessages) return;
var messages = jsonMessages.map(function(json) { return JSON.parse(json) });
self._server.deliver(clientId, messages);
});
multi.del(key);
multi.exec();
},
gc: function() {
var timeout = this._server.timeout;
if (typeof timeout !== 'number') return;
this._withLock('gc', function(releaseLock) {
var cutoff = new Date().getTime() - 1000 * 2 * timeout,
self = this;
this._redis.zrangebyscore(this._ns + '/clients', 0, cutoff, function(error, clients) {
var i = 0, n = clients.length;
if (i === n) return releaseLock();
clients.forEach(function(clientId) {
this.destroyClient(clientId, function() {
i += 1;
@@ -183,29 +203,29 @@ Engine.prototype = {
});
}, this);
},
_withLock: function(lockName, callback, scope) {
_withLock: function(lockName, callback, context) {
var lockKey = this._ns + '/locks/' + lockName,
currentTime = new Date().getTime(),
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1;
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
self = this;
var releaseLock = function() {
if (new Date().getTime() < expiry) self._redis.del(lockKey);
};
this._redis.setnx(lockKey, expiry, function(error, set) {
if (set === 1) return callback.call(scope, releaseLock);
if (set === 1) return callback.call(context, releaseLock);
self._redis.get(lockKey, function(error, timeout) {
if (!timeout) return;
var lockTimeout = parseInt(timeout, 10);
if (currentTime < lockTimeout) return;
self._redis.getset(lockKey, expiry, function(error, oldValue) {
if (oldValue !== timeout) return;
callback.call(scope, releaseLock);
callback.call(context, releaseLock);
});
});
});
+7 -12
View File
@@ -3,25 +3,20 @@
, "homepage" : "http://github.com/faye/faye-redis-node"
, "author" : "James Coglan <jcoglan@gmail.com> (http://jcoglan.com/)"
, "keywords" : ["pubsub", "bayeux"]
, "license" : "MIT"
, "version" : "0.1.0"
, "version" : "0.2.0"
, "engines" : {"node": ">=0.4.0"}
, "main" : "./faye-redis"
, "dependencies" : {"hiredis": "", "redis": ""}
, "devDependencies" : {"jsclass": ""}
, "dependencies" : {"redis": ""}
, "devDependencies" : {"jstest": ""}
, "scripts" : {"test": "node spec/runner.js"}
, "bugs" : "http://github.com/faye/faye-redis-node/issues"
, "licenses" : [ { "type" : "MIT"
, "url" : "http://www.opensource.org/licenses/mit-license.php"
}
]
, "repositories" : [ { "type" : "git"
, "url" : "git://github.com/faye/faye-redis-node.git"
}
]
, "repository" : { "type" : "git"
, "url" : "git://github.com/faye/faye-redis-node.git"
}
}
+13 -15
View File
@@ -1,36 +1,34 @@
var RedisEngine = require('../faye-redis')
JS.ENV.FayeRedisSpec = JS.Test.describe("Redis engine", function() { with(this) {
JS.Test.describe("Redis engine", function() { with(this) {
before(function() {
var pw = process.env.TRAVIS ? undefined : "foobared"
this.engineOpts = {type: RedisEngine, password: pw, namespace: new Date().getTime().toString()}
})
after(function(resume) { with(this) {
sync(function() {
engine.disconnect()
var redis = require('redis').createClient(6379, 'localhost', {no_ready_check: true})
redis.auth(engineOpts.password)
redis.flushall(function() {
redis.end()
resume()
})
engine.disconnect()
var redis = require('redis').createClient(6379, 'localhost', {no_ready_check: true})
redis.auth(engineOpts.password)
redis.flushall(function() {
redis.end()
resume()
})
}})
itShouldBehaveLike("faye engine")
describe("distribution", function() { with(this) {
itShouldBehaveLike("distributed engine")
}})
if (process.env.TRAVIS) return
describe("using a Unix socket", function() { with(this) {
before(function() { with(this) {
this.engineOpts.socket = "/tmp/redis.sock"
}})
itShouldBehaveLike("faye engine")
}})
}})
+3 -3
View File
@@ -1,8 +1,8 @@
require('jsclass')
JS.require('JS.Range', 'JS.Test')
JS = require('jstest')
Faye = require('../vendor/faye/build/faye-node')
Faye = require('../vendor/faye/build/node/faye-node')
require('../vendor/faye/spec/javascript/engine_spec')
require('./faye_redis_spec')
JS.Test.autorun()
Vendored
+1 -1