Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 69ef5f8be2 | |||
| a2a76ce200 | |||
| 9e0c198013 | |||
| 19cd96eed9 | |||
| f894a1deb5 | |||
| f893c15efd | |||
| 2de09943e0 | |||
| 9d682fb177 | |||
| 7cdbb1ceaf | |||
| af32a60460 | |||
| 2efea591fd | |||
| f2ab1a54a5 | |||
| c00e1cd15e | |||
| 6520c49ffe | |||
| 7c8b0160cf | |||
| 579920543d | |||
| 4d2ad5c83d | |||
| fb1134e15e | |||
| ccecb0212e | |||
| 6be7bb1e61 | |||
| 5b47a87519 | |||
| 4c90ababed | |||
| 0e64286120 |
+5
-3
@@ -1,13 +1,15 @@
|
||||
language: ruby
|
||||
|
||||
rvm:
|
||||
- 1.8.7
|
||||
- 1.9.2
|
||||
- 1.9.3
|
||||
- 2.0.0
|
||||
- jruby-19mode
|
||||
- rbx-19mode
|
||||
|
||||
before_script:
|
||||
- git submodule update --init --recursive
|
||||
|
||||
script: bundle exec rspec spec/
|
||||
script: bundle exec rspec -c spec/
|
||||
|
||||
env: TRAVIS=1
|
||||
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
### 0.2.0 / 2013-10-01
|
||||
|
||||
* Migrate from Yajl to MultiJson to support JRuby
|
||||
* Trigger `close` event as required by Faye 1.0
|
||||
|
||||
|
||||
### 0.1.1 / 2013-04-28
|
||||
|
||||
* Improve garbage collection to avoid leaking Redis memory
|
||||
|
||||
|
||||
### 0.1.0 / 2012-02-26
|
||||
|
||||
* Initial release: Redis backend for Faye 0.8
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
source "http://rubygems.org/"
|
||||
source "https://rubygems.org/"
|
||||
gemspec
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
# Faye::Redis [](https://travis-ci.org/faye/faye-redis-ruby)
|
||||
|
||||
This plugin provides a Redis-based backend for the
|
||||
[Faye](http://faye.jcoglan.com) messaging server. It allows a single Faye
|
||||
service to be distributed across many front-end web servers by storing state and
|
||||
routing messages through a [Redis](http://redis.io) database server.
|
||||
|
||||
|
||||
## Usage
|
||||
|
||||
Pass in the engine and any settings you need when setting up your Faye server.
|
||||
|
||||
```rb
|
||||
require 'faye'
|
||||
require 'faye/redis'
|
||||
|
||||
bayeux = Faye::RackAdapter.new(
|
||||
:mount => '/',
|
||||
:timeout => 25,
|
||||
:engine => {
|
||||
:type => Faye::Redis,
|
||||
:host => 'redis.example.com',
|
||||
# more options
|
||||
}
|
||||
)
|
||||
```
|
||||
|
||||
The full list of settings is as follows.
|
||||
|
||||
* <b>`:uri`</b> - redis URL (example: `redis://:secretpassword@example.com:9000/4`)
|
||||
* <b>`:host`</b> - hostname of your Redis instance
|
||||
* <b>`:port`</b> - port number, default is `6379`
|
||||
* <b>`:password`</b> - password, if `requirepass` is set
|
||||
* <b>`:database`</b> - number of database to use, default is `0`
|
||||
* <b>`:namespace`</b> - prefix applied to all keys, default is `''`
|
||||
* <b>`:socket`</b> - path to Unix socket if `unixsocket` is set
|
||||
|
||||
|
||||
## License
|
||||
|
||||
(The MIT License)
|
||||
|
||||
Copyright (c) 2011-2013 James Coglan
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the 'Software'), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
-58
@@ -1,58 +0,0 @@
|
||||
= Faye::Redis {<img src="https://secure.travis-ci.org/faye/faye-redis-ruby.png?branch=master" alt="Build Status" />}[http://travis-ci.org/faye/faye-redis-ruby]
|
||||
|
||||
This plugin provides a Redis-based backend for the {Faye}[http://faye.jcoglan.com]
|
||||
messaging server. It allows a single Faye service to be distributed across many
|
||||
front-end web servers by storing state and routing messages through a
|
||||
{Redis}[http://redis.io] database server.
|
||||
|
||||
|
||||
== Usage
|
||||
|
||||
Pass in the engine and any settings you need when setting up your Faye server.
|
||||
|
||||
require 'faye'
|
||||
require 'faye/redis'
|
||||
|
||||
bayeux = Faye::RackAdapter.new(
|
||||
:mount => '/',
|
||||
:timeout => 25,
|
||||
:engine => {
|
||||
:type => Faye::Redis,
|
||||
:host => 'redis.example.com',
|
||||
# more options
|
||||
}
|
||||
)
|
||||
|
||||
The full list of settings is as follows.
|
||||
|
||||
* <b><tt>:host</tt></b> - hostname of your Redis instance
|
||||
* <b><tt>:port</tt></b> - port number, default is +6379+
|
||||
* <b><tt>:password</tt></b> - password, if +requirepass+ is set
|
||||
* <b><tt>:database</tt></b> - number of database to use, default is +0+
|
||||
* <b><tt>:namespace</tt></b> - prefix applied to all keys, default is <tt>''</tt>
|
||||
* <b><tt>:socket</tt></b> - path to Unix socket if +unixsocket+ is set
|
||||
|
||||
|
||||
== License
|
||||
|
||||
(The MIT License)
|
||||
|
||||
Copyright (c) 2011-2012 James Coglan
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the 'Software'), to deal in
|
||||
the Software without restriction, including without limitation the rights to use,
|
||||
copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
|
||||
Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
+15
-13
@@ -1,20 +1,22 @@
|
||||
Gem::Specification.new do |s|
|
||||
s.name = "faye-redis"
|
||||
s.version = "0.1.0"
|
||||
s.summary = "Redis backend engine for Faye"
|
||||
s.author = "James Coglan"
|
||||
s.email = "jcoglan@gmail.com"
|
||||
s.homepage = "http://github.com/faye/faye-redis-ruby"
|
||||
s.name = 'faye-redis'
|
||||
s.version = '0.2.0'
|
||||
s.summary = 'Redis backend engine for Faye'
|
||||
s.author = 'James Coglan'
|
||||
s.email = 'jcoglan@gmail.com'
|
||||
s.homepage = 'http://github.com/faye/faye-redis-ruby'
|
||||
|
||||
s.extra_rdoc_files = %w[README.rdoc]
|
||||
s.rdoc_options = %w[--main README.rdoc]
|
||||
s.extra_rdoc_files = %w[README.md]
|
||||
s.rdoc_options = %w[--main README.md --markup markdown]
|
||||
s.require_paths = %w[lib]
|
||||
|
||||
s.files = %w[README.rdoc] + Dir.glob("{spec,lib}/**/*")
|
||||
s.files = %w[CHANGELOG.md README.md] +
|
||||
Dir.glob('lib/**/*.rb')
|
||||
|
||||
s.add_dependency "eventmachine", ">= 0.12.0"
|
||||
s.add_dependency "em-hiredis", ">= 0.0.1"
|
||||
s.add_dependency "yajl-ruby", ">= 1.0.0"
|
||||
s.add_dependency 'eventmachine', '>= 0.12.0'
|
||||
s.add_dependency 'em-hiredis', '>= 0.2.0'
|
||||
s.add_dependency 'multi_json', '>= 1.0.0'
|
||||
|
||||
s.add_development_dependency "rspec", "~> 2.8.0"
|
||||
s.add_development_dependency 'rspec'
|
||||
s.add_development_dependency 'websocket-driver'
|
||||
end
|
||||
|
||||
+93
-77
@@ -1,62 +1,65 @@
|
||||
require 'em-hiredis'
|
||||
require 'yajl'
|
||||
require 'multi_json'
|
||||
|
||||
module Faye
|
||||
class Redis
|
||||
|
||||
|
||||
DEFAULT_HOST = 'localhost'
|
||||
DEFAULT_PORT = 6379
|
||||
DEFAULT_DATABASE = 0
|
||||
DEFAULT_GC = 60
|
||||
LOCK_TIMEOUT = 120
|
||||
|
||||
|
||||
def self.create(server, options)
|
||||
new(server, options)
|
||||
end
|
||||
|
||||
|
||||
def initialize(server, options)
|
||||
@server = server
|
||||
@options = options
|
||||
end
|
||||
|
||||
|
||||
def init
|
||||
return if @redis
|
||||
|
||||
|
||||
uri = @options[:uri] || nil
|
||||
host = @options[:host] || DEFAULT_HOST
|
||||
port = @options[:port] || DEFAULT_PORT
|
||||
db = @options[:database] || DEFAULT_DATABASE
|
||||
auth = @options[:password]
|
||||
auth = @options[:password] || nil
|
||||
gc = @options[:gc] || DEFAULT_GC
|
||||
@ns = @options[:namespace] || ''
|
||||
socket = @options[:socket]
|
||||
|
||||
if socket
|
||||
@redis = EventMachine::Hiredis::Client.connect(socket, nil)
|
||||
@subscriber = EventMachine::Hiredis::Client.connect(socket, nil)
|
||||
socket = @options[:socket] || nil
|
||||
|
||||
if uri
|
||||
@redis = EventMachine::Hiredis.connect(uri)
|
||||
elsif socket
|
||||
@redis = EventMachine::Hiredis::Client.new(socket, nil, auth, db).connect
|
||||
else
|
||||
@redis = EventMachine::Hiredis::Client.connect(host, port)
|
||||
@subscriber = EventMachine::Hiredis::Client.connect(host, port)
|
||||
@redis = EventMachine::Hiredis::Client.new(host, port, auth, db).connect
|
||||
end
|
||||
if auth
|
||||
@redis.auth(auth)
|
||||
@subscriber.auth(auth)
|
||||
end
|
||||
@redis.select(db)
|
||||
@subscriber.select(db)
|
||||
|
||||
@subscriber.subscribe(@ns + '/notifications')
|
||||
@subscriber = @redis.pubsub
|
||||
|
||||
@message_channel = @ns + '/notifications/messages'
|
||||
@close_channel = @ns + '/notifications/close'
|
||||
|
||||
@subscriber.subscribe(@message_channel)
|
||||
@subscriber.subscribe(@close_channel)
|
||||
@subscriber.on(:message) do |topic, message|
|
||||
empty_queue(message) if topic == @ns + '/notifications'
|
||||
empty_queue(message) if topic == @message_channel
|
||||
@server.trigger(:close, message) if topic == @close_channel
|
||||
end
|
||||
|
||||
|
||||
@gc = EventMachine.add_periodic_timer(gc, &method(:gc))
|
||||
end
|
||||
|
||||
|
||||
def disconnect
|
||||
@subscriber.unsubscribe(@ns + '/notifications')
|
||||
return unless @redis
|
||||
@subscriber.unsubscribe(@message_channel)
|
||||
@subscriber.unsubscribe(@close_channel)
|
||||
EventMachine.cancel_timer(@gc)
|
||||
end
|
||||
|
||||
|
||||
def create_client(&callback)
|
||||
init
|
||||
client_id = @server.generate_id
|
||||
@@ -68,48 +71,54 @@ module Faye
|
||||
callback.call(client_id)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def client_exists(client_id, &callback)
|
||||
init
|
||||
cutoff = get_current_time - (1000 * 1.6 * @server.timeout)
|
||||
|
||||
@redis.zscore(@ns + '/clients', client_id) do |score|
|
||||
callback.call(score.to_i > cutoff)
|
||||
end
|
||||
end
|
||||
|
||||
def destroy_client(client_id, &callback)
|
||||
init
|
||||
@redis.zrem(@ns + '/clients', client_id)
|
||||
@redis.del(@ns + "/clients/#{client_id}/messages")
|
||||
|
||||
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
|
||||
i, n = 0, channels.size
|
||||
next after_destroy(client_id, &callback) if i == n
|
||||
|
||||
channels.each do |channel|
|
||||
unsubscribe(client_id, channel) do
|
||||
i += 1
|
||||
after_destroy(client_id, &callback) if i == n
|
||||
@redis.zadd(@ns + '/clients', 0, client_id) do
|
||||
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
|
||||
i, n = 0, channels.size
|
||||
next after_subscriptions_removed(client_id, &callback) if i == n
|
||||
|
||||
channels.each do |channel|
|
||||
unsubscribe(client_id, channel) do
|
||||
i += 1
|
||||
after_subscriptions_removed(client_id, &callback) if i == n
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def after_destroy(client_id, &callback)
|
||||
@server.debug 'Destroyed client ?', client_id
|
||||
@server.trigger(:disconnect, client_id)
|
||||
callback.call if callback
|
||||
end
|
||||
|
||||
def client_exists(client_id, &callback)
|
||||
init
|
||||
@redis.zscore(@ns + '/clients', client_id) do |score|
|
||||
callback.call(score != nil)
|
||||
|
||||
def after_subscriptions_removed(client_id, &callback)
|
||||
@redis.del(@ns + "/clients/#{client_id}/messages") do
|
||||
@redis.zrem(@ns + '/clients', client_id) do
|
||||
@server.debug 'Destroyed client ?', client_id
|
||||
@server.trigger(:disconnect, client_id)
|
||||
@redis.publish(@close_channel, client_id)
|
||||
callback.call if callback
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def ping(client_id)
|
||||
init
|
||||
timeout = @server.timeout
|
||||
return unless Numeric === timeout
|
||||
|
||||
|
||||
time = get_current_time
|
||||
@server.debug 'Ping ?, ?', client_id, time
|
||||
@redis.zadd(@ns + '/clients', time, client_id)
|
||||
end
|
||||
|
||||
|
||||
def subscribe(client_id, channel, &callback)
|
||||
init
|
||||
@redis.sadd(@ns + "/clients/#{client_id}/channels", channel) do |added|
|
||||
@@ -120,7 +129,7 @@ module Faye
|
||||
callback.call if callback
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def unsubscribe(client_id, channel, &callback)
|
||||
init
|
||||
@redis.srem(@ns + "/clients/#{client_id}/channels", channel) do |removed|
|
||||
@@ -131,57 +140,64 @@ module Faye
|
||||
callback.call if callback
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def publish(message, channels)
|
||||
init
|
||||
@server.debug 'Publishing message ?', message
|
||||
|
||||
json_message = Yajl::Encoder.encode(message)
|
||||
|
||||
json_message = MultiJson.dump(message)
|
||||
channels = Channel.expand(message['channel'])
|
||||
keys = channels.map { |c| @ns + "/channels#{c}" }
|
||||
|
||||
|
||||
@redis.sunion(*keys) do |clients|
|
||||
clients.each do |client_id|
|
||||
queue = @ns + "/clients/#{client_id}/messages"
|
||||
|
||||
@server.debug 'Queueing for client ?: ?', client_id, message
|
||||
@redis.rpush(@ns + "/clients/#{client_id}/messages", json_message)
|
||||
@redis.publish(@ns + '/notifications', client_id)
|
||||
@redis.rpush(queue, json_message)
|
||||
@redis.publish(@message_channel, client_id)
|
||||
|
||||
client_exists(client_id) do |exists|
|
||||
@redis.del(queue) unless exists
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@server.trigger(:publish, message['clientId'], message['channel'], message['data'])
|
||||
end
|
||||
|
||||
|
||||
def empty_queue(client_id)
|
||||
return unless @server.has_connection?(client_id)
|
||||
init
|
||||
|
||||
|
||||
key = @ns + "/clients/#{client_id}/messages"
|
||||
|
||||
|
||||
@redis.multi
|
||||
@redis.lrange(key, 0, -1)
|
||||
@redis.del(key)
|
||||
@redis.exec.callback do |json_messages, deleted|
|
||||
messages = json_messages.map { |json| Yajl::Parser.parse(json) }
|
||||
next unless json_messages
|
||||
messages = json_messages.map { |json| MultiJson.load(json) }
|
||||
@server.deliver(client_id, messages)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
private
|
||||
|
||||
|
||||
def get_current_time
|
||||
(Time.now.to_f * 1000).to_i
|
||||
end
|
||||
|
||||
|
||||
def gc
|
||||
timeout = @server.timeout
|
||||
return unless Numeric === timeout
|
||||
|
||||
|
||||
with_lock 'gc' do |release_lock|
|
||||
cutoff = get_current_time - 1000 * 2 * timeout
|
||||
@redis.zrangebyscore(@ns + '/clients', 0, cutoff) do |clients|
|
||||
i, n = 0, clients.size
|
||||
next release_lock.call if i == n
|
||||
|
||||
|
||||
clients.each do |client_id|
|
||||
destroy_client(client_id) do
|
||||
i += 1
|
||||
@@ -191,32 +207,32 @@ module Faye
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def with_lock(lock_name, &block)
|
||||
lock_key = @ns + '/locks/' + lock_name
|
||||
current_time = get_current_time
|
||||
expiry = current_time + LOCK_TIMEOUT * 1000 + 1
|
||||
|
||||
|
||||
release_lock = lambda do
|
||||
@redis.del(lock_key) if get_current_time < expiry
|
||||
end
|
||||
|
||||
|
||||
@redis.setnx(lock_key, expiry) do |set|
|
||||
next block.call(release_lock) if set == 1
|
||||
|
||||
|
||||
@redis.get(lock_key) do |timeout|
|
||||
next unless timeout
|
||||
|
||||
|
||||
lock_timeout = timeout.to_i(10)
|
||||
next if current_time < lock_timeout
|
||||
|
||||
|
||||
@redis.getset(lock_key, expiry) do |old_value|
|
||||
block.call(release_lock) if old_value == timeout
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -5,19 +5,19 @@ describe Faye::Redis do
|
||||
pw = ENV["TRAVIS"] ? nil : "foobared"
|
||||
{:type => Faye::Redis, :password => pw, :namespace => Time.now.to_i.to_s}
|
||||
end
|
||||
|
||||
|
||||
after do
|
||||
engine.disconnect
|
||||
redis = EM::Hiredis::Client.connect('localhost', 6379)
|
||||
redis.auth(engine_opts[:password])
|
||||
redis.flushall
|
||||
end
|
||||
|
||||
|
||||
it_should_behave_like "faye engine"
|
||||
it_should_behave_like "distributed engine"
|
||||
|
||||
|
||||
next if ENV["TRAVIS"]
|
||||
|
||||
|
||||
describe "using a Unix socket" do
|
||||
before { engine_opts[:socket] = "/tmp/redis.sock" }
|
||||
it_should_behave_like "faye engine"
|
||||
|
||||
@@ -1,3 +1,8 @@
|
||||
require File.expand_path('../../lib/faye/redis', __FILE__)
|
||||
require 'websocket/driver'
|
||||
require File.expand_path('../../vendor/faye/spec/ruby/engine_examples', __FILE__)
|
||||
|
||||
class << Faye
|
||||
attr_accessor :logger
|
||||
end
|
||||
|
||||
|
||||
Vendored
+1
-1
Submodule vendor/faye updated: 6ebaa51670...b7dd8015ed
Reference in New Issue
Block a user