23 Commits

Author SHA1 Message Date
James Coglan 69ef5f8be2 Update the changelog for 0.2.0. 2013-10-01 15:46:54 +01:00
James Coglan a2a76ce200 Migrate readme and changelog to markdown. 2013-10-01 15:43:50 +01:00
James Coglan 9e0c198013 Trigger the :close event as required by Faye 1.0. 2013-10-01 15:37:00 +01:00
James Coglan 19cd96eed9 Bump faye submodule to fix JRuby tests. 2013-10-01 15:24:36 +01:00
James Coglan f894a1deb5 Don't test on 1.8.7. 2013-10-01 15:14:36 +01:00
James Coglan f893c15efd Bring things up to date: update the faye submodule, replace Yajl with MultiJson. 2013-10-01 15:08:43 +01:00
James Coglan 2de09943e0 Bump copyright date. 2013-04-28 12:27:00 +01:00
James Coglan 9d682fb177 Include Changelog in the distribution. 2013-04-28 12:17:31 +01:00
James Coglan 7cdbb1ceaf Apply some of @jpignata's patches to reduce leaked memory. 2013-04-28 12:04:13 +01:00
James Coglan af32a60460 Don't test on 1.9.2. 2013-04-28 10:39:21 +01:00
James Coglan 2efea591fd Test on Rubinius. 2013-04-28 10:32:42 +01:00
James Coglan f2ab1a54a5 Bump version 0.1.1. 2013-04-28 10:19:51 +01:00
James Coglan c00e1cd15e Use --color flag when running tests on Travis. 2013-04-28 10:18:22 +01:00
James Coglan 6520c49ffe Test on MRI 2.0.0. 2013-04-28 10:15:19 +01:00
James Coglan 7c8b0160cf Change the order of operations in destroy_client to avoid leaking /messages keys. 2013-04-28 10:11:20 +01:00
James Coglan 579920543d Bump Faye submodule to 0.8.9. 2013-04-14 19:16:40 +01:00
James Coglan 4d2ad5c83d Fetch gems from https://rubygems.org. 2013-04-14 19:13:16 +01:00
James Coglan fb1134e15e Remove trailing whitespace. 2013-04-14 19:11:58 +01:00
James Coglan ccecb0212e Refactor client creation code. 2013-04-14 19:10:50 +01:00
James Coglan 6be7bb1e61 Merge pull request #6 from le0pard/master
Support new em-hiredis v0.2.0
2013-04-14 11:08:46 -07:00
Alexey Vasiliev 5b47a87519 support new em-hiredis v0.2.0 2013-04-14 21:06:00 +03:00
James Coglan 4c90ababed Bump faye submodule with better em-rspec. 2012-12-03 21:39:30 +00:00
James Coglan 0e64286120 Remove branch identifier from Travis image. 2012-03-28 00:55:30 +01:00
10 changed files with 200 additions and 157 deletions
+5 -3
View File
@@ -1,13 +1,15 @@
language: ruby
rvm:
- 1.8.7
- 1.9.2
- 1.9.3
- 2.0.0
- jruby-19mode
- rbx-19mode
before_script:
- git submodule update --init --recursive
script: bundle exec rspec spec/
script: bundle exec rspec -c spec/
env: TRAVIS=1
+15
View File
@@ -0,0 +1,15 @@
### 0.2.0 / 2013-10-01
* Migrate from Yajl to MultiJson to support JRuby
* Trigger `close` event as required by Faye 1.0
### 0.1.1 / 2013-04-28
* Improve garbage collection to avoid leaking Redis memory
### 0.1.0 / 2012-02-26
* Initial release: Redis backend for Faye 0.8
+1 -1
View File
@@ -1,2 +1,2 @@
source "http://rubygems.org/"
source "https://rubygems.org/"
gemspec
+61
View File
@@ -0,0 +1,61 @@
# Faye::Redis [![Build Status](https://travis-ci.org/faye/faye-redis-ruby.png)](https://travis-ci.org/faye/faye-redis-ruby)
This plugin provides a Redis-based backend for the
[Faye](http://faye.jcoglan.com) messaging server. It allows a single Faye
service to be distributed across many front-end web servers by storing state and
routing messages through a [Redis](http://redis.io) database server.
## Usage
Pass in the engine and any settings you need when setting up your Faye server.
```rb
require 'faye'
require 'faye/redis'
bayeux = Faye::RackAdapter.new(
:mount => '/',
:timeout => 25,
:engine => {
:type => Faye::Redis,
:host => 'redis.example.com',
# more options
}
)
```
The full list of settings is as follows.
* <b>`:uri`</b> - redis URL (example: `redis://:secretpassword@example.com:9000/4`)
* <b>`:host`</b> - hostname of your Redis instance
* <b>`:port`</b> - port number, default is `6379`
* <b>`:password`</b> - password, if `requirepass` is set
* <b>`:database`</b> - number of database to use, default is `0`
* <b>`:namespace`</b> - prefix applied to all keys, default is `''`
* <b>`:socket`</b> - path to Unix socket if `unixsocket` is set
## License
(The MIT License)
Copyright (c) 2011-2013 James Coglan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the 'Software'), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-58
View File
@@ -1,58 +0,0 @@
= Faye::Redis {<img src="https://secure.travis-ci.org/faye/faye-redis-ruby.png?branch=master" alt="Build Status" />}[http://travis-ci.org/faye/faye-redis-ruby]
This plugin provides a Redis-based backend for the {Faye}[http://faye.jcoglan.com]
messaging server. It allows a single Faye service to be distributed across many
front-end web servers by storing state and routing messages through a
{Redis}[http://redis.io] database server.
== Usage
Pass in the engine and any settings you need when setting up your Faye server.
require 'faye'
require 'faye/redis'
bayeux = Faye::RackAdapter.new(
:mount => '/',
:timeout => 25,
:engine => {
:type => Faye::Redis,
:host => 'redis.example.com',
# more options
}
)
The full list of settings is as follows.
* <b><tt>:host</tt></b> - hostname of your Redis instance
* <b><tt>:port</tt></b> - port number, default is +6379+
* <b><tt>:password</tt></b> - password, if +requirepass+ is set
* <b><tt>:database</tt></b> - number of database to use, default is +0+
* <b><tt>:namespace</tt></b> - prefix applied to all keys, default is <tt>''</tt>
* <b><tt>:socket</tt></b> - path to Unix socket if +unixsocket+ is set
== License
(The MIT License)
Copyright (c) 2011-2012 James Coglan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the 'Software'), to deal in
the Software without restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+15 -13
View File
@@ -1,20 +1,22 @@
Gem::Specification.new do |s|
s.name = "faye-redis"
s.version = "0.1.0"
s.summary = "Redis backend engine for Faye"
s.author = "James Coglan"
s.email = "jcoglan@gmail.com"
s.homepage = "http://github.com/faye/faye-redis-ruby"
s.name = 'faye-redis'
s.version = '0.2.0'
s.summary = 'Redis backend engine for Faye'
s.author = 'James Coglan'
s.email = 'jcoglan@gmail.com'
s.homepage = 'http://github.com/faye/faye-redis-ruby'
s.extra_rdoc_files = %w[README.rdoc]
s.rdoc_options = %w[--main README.rdoc]
s.extra_rdoc_files = %w[README.md]
s.rdoc_options = %w[--main README.md --markup markdown]
s.require_paths = %w[lib]
s.files = %w[README.rdoc] + Dir.glob("{spec,lib}/**/*")
s.files = %w[CHANGELOG.md README.md] +
Dir.glob('lib/**/*.rb')
s.add_dependency "eventmachine", ">= 0.12.0"
s.add_dependency "em-hiredis", ">= 0.0.1"
s.add_dependency "yajl-ruby", ">= 1.0.0"
s.add_dependency 'eventmachine', '>= 0.12.0'
s.add_dependency 'em-hiredis', '>= 0.2.0'
s.add_dependency 'multi_json', '>= 1.0.0'
s.add_development_dependency "rspec", "~> 2.8.0"
s.add_development_dependency 'rspec'
s.add_development_dependency 'websocket-driver'
end
+93 -77
View File
@@ -1,62 +1,65 @@
require 'em-hiredis'
require 'yajl'
require 'multi_json'
module Faye
class Redis
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 6379
DEFAULT_DATABASE = 0
DEFAULT_GC = 60
LOCK_TIMEOUT = 120
def self.create(server, options)
new(server, options)
end
def initialize(server, options)
@server = server
@options = options
end
def init
return if @redis
uri = @options[:uri] || nil
host = @options[:host] || DEFAULT_HOST
port = @options[:port] || DEFAULT_PORT
db = @options[:database] || DEFAULT_DATABASE
auth = @options[:password]
auth = @options[:password] || nil
gc = @options[:gc] || DEFAULT_GC
@ns = @options[:namespace] || ''
socket = @options[:socket]
if socket
@redis = EventMachine::Hiredis::Client.connect(socket, nil)
@subscriber = EventMachine::Hiredis::Client.connect(socket, nil)
socket = @options[:socket] || nil
if uri
@redis = EventMachine::Hiredis.connect(uri)
elsif socket
@redis = EventMachine::Hiredis::Client.new(socket, nil, auth, db).connect
else
@redis = EventMachine::Hiredis::Client.connect(host, port)
@subscriber = EventMachine::Hiredis::Client.connect(host, port)
@redis = EventMachine::Hiredis::Client.new(host, port, auth, db).connect
end
if auth
@redis.auth(auth)
@subscriber.auth(auth)
end
@redis.select(db)
@subscriber.select(db)
@subscriber.subscribe(@ns + '/notifications')
@subscriber = @redis.pubsub
@message_channel = @ns + '/notifications/messages'
@close_channel = @ns + '/notifications/close'
@subscriber.subscribe(@message_channel)
@subscriber.subscribe(@close_channel)
@subscriber.on(:message) do |topic, message|
empty_queue(message) if topic == @ns + '/notifications'
empty_queue(message) if topic == @message_channel
@server.trigger(:close, message) if topic == @close_channel
end
@gc = EventMachine.add_periodic_timer(gc, &method(:gc))
end
def disconnect
@subscriber.unsubscribe(@ns + '/notifications')
return unless @redis
@subscriber.unsubscribe(@message_channel)
@subscriber.unsubscribe(@close_channel)
EventMachine.cancel_timer(@gc)
end
def create_client(&callback)
init
client_id = @server.generate_id
@@ -68,48 +71,54 @@ module Faye
callback.call(client_id)
end
end
def client_exists(client_id, &callback)
init
cutoff = get_current_time - (1000 * 1.6 * @server.timeout)
@redis.zscore(@ns + '/clients', client_id) do |score|
callback.call(score.to_i > cutoff)
end
end
def destroy_client(client_id, &callback)
init
@redis.zrem(@ns + '/clients', client_id)
@redis.del(@ns + "/clients/#{client_id}/messages")
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
i, n = 0, channels.size
next after_destroy(client_id, &callback) if i == n
channels.each do |channel|
unsubscribe(client_id, channel) do
i += 1
after_destroy(client_id, &callback) if i == n
@redis.zadd(@ns + '/clients', 0, client_id) do
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
i, n = 0, channels.size
next after_subscriptions_removed(client_id, &callback) if i == n
channels.each do |channel|
unsubscribe(client_id, channel) do
i += 1
after_subscriptions_removed(client_id, &callback) if i == n
end
end
end
end
end
def after_destroy(client_id, &callback)
@server.debug 'Destroyed client ?', client_id
@server.trigger(:disconnect, client_id)
callback.call if callback
end
def client_exists(client_id, &callback)
init
@redis.zscore(@ns + '/clients', client_id) do |score|
callback.call(score != nil)
def after_subscriptions_removed(client_id, &callback)
@redis.del(@ns + "/clients/#{client_id}/messages") do
@redis.zrem(@ns + '/clients', client_id) do
@server.debug 'Destroyed client ?', client_id
@server.trigger(:disconnect, client_id)
@redis.publish(@close_channel, client_id)
callback.call if callback
end
end
end
def ping(client_id)
init
timeout = @server.timeout
return unless Numeric === timeout
time = get_current_time
@server.debug 'Ping ?, ?', client_id, time
@redis.zadd(@ns + '/clients', time, client_id)
end
def subscribe(client_id, channel, &callback)
init
@redis.sadd(@ns + "/clients/#{client_id}/channels", channel) do |added|
@@ -120,7 +129,7 @@ module Faye
callback.call if callback
end
end
def unsubscribe(client_id, channel, &callback)
init
@redis.srem(@ns + "/clients/#{client_id}/channels", channel) do |removed|
@@ -131,57 +140,64 @@ module Faye
callback.call if callback
end
end
def publish(message, channels)
init
@server.debug 'Publishing message ?', message
json_message = Yajl::Encoder.encode(message)
json_message = MultiJson.dump(message)
channels = Channel.expand(message['channel'])
keys = channels.map { |c| @ns + "/channels#{c}" }
@redis.sunion(*keys) do |clients|
clients.each do |client_id|
queue = @ns + "/clients/#{client_id}/messages"
@server.debug 'Queueing for client ?: ?', client_id, message
@redis.rpush(@ns + "/clients/#{client_id}/messages", json_message)
@redis.publish(@ns + '/notifications', client_id)
@redis.rpush(queue, json_message)
@redis.publish(@message_channel, client_id)
client_exists(client_id) do |exists|
@redis.del(queue) unless exists
end
end
end
@server.trigger(:publish, message['clientId'], message['channel'], message['data'])
end
def empty_queue(client_id)
return unless @server.has_connection?(client_id)
init
key = @ns + "/clients/#{client_id}/messages"
@redis.multi
@redis.lrange(key, 0, -1)
@redis.del(key)
@redis.exec.callback do |json_messages, deleted|
messages = json_messages.map { |json| Yajl::Parser.parse(json) }
next unless json_messages
messages = json_messages.map { |json| MultiJson.load(json) }
@server.deliver(client_id, messages)
end
end
private
def get_current_time
(Time.now.to_f * 1000).to_i
end
def gc
timeout = @server.timeout
return unless Numeric === timeout
with_lock 'gc' do |release_lock|
cutoff = get_current_time - 1000 * 2 * timeout
@redis.zrangebyscore(@ns + '/clients', 0, cutoff) do |clients|
i, n = 0, clients.size
next release_lock.call if i == n
clients.each do |client_id|
destroy_client(client_id) do
i += 1
@@ -191,32 +207,32 @@ module Faye
end
end
end
def with_lock(lock_name, &block)
lock_key = @ns + '/locks/' + lock_name
current_time = get_current_time
expiry = current_time + LOCK_TIMEOUT * 1000 + 1
release_lock = lambda do
@redis.del(lock_key) if get_current_time < expiry
end
@redis.setnx(lock_key, expiry) do |set|
next block.call(release_lock) if set == 1
@redis.get(lock_key) do |timeout|
next unless timeout
lock_timeout = timeout.to_i(10)
next if current_time < lock_timeout
@redis.getset(lock_key, expiry) do |old_value|
block.call(release_lock) if old_value == timeout
end
end
end
end
end
end
+4 -4
View File
@@ -5,19 +5,19 @@ describe Faye::Redis do
pw = ENV["TRAVIS"] ? nil : "foobared"
{:type => Faye::Redis, :password => pw, :namespace => Time.now.to_i.to_s}
end
after do
engine.disconnect
redis = EM::Hiredis::Client.connect('localhost', 6379)
redis.auth(engine_opts[:password])
redis.flushall
end
it_should_behave_like "faye engine"
it_should_behave_like "distributed engine"
next if ENV["TRAVIS"]
describe "using a Unix socket" do
before { engine_opts[:socket] = "/tmp/redis.sock" }
it_should_behave_like "faye engine"
+5
View File
@@ -1,3 +1,8 @@
require File.expand_path('../../lib/faye/redis', __FILE__)
require 'websocket/driver'
require File.expand_path('../../vendor/faye/spec/ruby/engine_examples', __FILE__)
class << Faye
attr_accessor :logger
end
Vendored
+1 -1