16 Commits

Author SHA1 Message Date
James Coglan 9d682fb177 Include Changelog in the distribution. 2013-04-28 12:17:31 +01:00
James Coglan 7cdbb1ceaf Apply some of @jpignata's patches to reduce leaked memory. 2013-04-28 12:04:13 +01:00
James Coglan af32a60460 Don't test on 1.9.2. 2013-04-28 10:39:21 +01:00
James Coglan 2efea591fd Test on Rubinius. 2013-04-28 10:32:42 +01:00
James Coglan f2ab1a54a5 Bump version 0.1.1. 2013-04-28 10:19:51 +01:00
James Coglan c00e1cd15e Use --color flag when running tests on Travis. 2013-04-28 10:18:22 +01:00
James Coglan 6520c49ffe Test on MRI 2.0.0. 2013-04-28 10:15:19 +01:00
James Coglan 7c8b0160cf Change the order of operations in destroy_client to avoid leaking /messages keys. 2013-04-28 10:11:20 +01:00
James Coglan 579920543d Bump Faye submodule to 0.8.9. 2013-04-14 19:16:40 +01:00
James Coglan 4d2ad5c83d Fetch gems from https://rubygems.org. 2013-04-14 19:13:16 +01:00
James Coglan fb1134e15e Remove trailing whitespace. 2013-04-14 19:11:58 +01:00
James Coglan ccecb0212e Refactor client creation code. 2013-04-14 19:10:50 +01:00
James Coglan 6be7bb1e61 Merge pull request #6 from le0pard/master
Support new em-hiredis v0.2.0
2013-04-14 11:08:46 -07:00
Alexey Vasiliev 5b47a87519 support new em-hiredis v0.2.0 2013-04-14 21:06:00 +03:00
James Coglan 4c90ababed Bump faye submodule with better em-rspec. 2012-12-03 21:39:30 +00:00
James Coglan 0e64286120 Remove branch identifier from Travis image. 2012-03-28 00:55:30 +01:00
8 changed files with 106 additions and 84 deletions
+5 -2
View File
@@ -1,13 +1,16 @@
language: ruby
rvm:
- 1.8.7
- 1.9.2
- 1.9.3
- 2.0.0
- rbx-18mode
- rbx-19mode
before_script:
- git submodule update --init --recursive
script: bundle exec rspec spec/
script: bundle exec rspec -c spec/
env: TRAVIS=1
+9
View File
@@ -0,0 +1,9 @@
=== 0.1.1 / 2013-04-28
* Improve garbage collection to avoid leaking Redis memory
=== 0.1.0 / 2012-02-26
* Initial release: Redis backend for Faye 0.8
+1 -1
View File
@@ -1,2 +1,2 @@
source "http://rubygems.org/"
source "https://rubygems.org/"
gemspec
+3 -2
View File
@@ -1,4 +1,4 @@
= Faye::Redis {<img src="https://secure.travis-ci.org/faye/faye-redis-ruby.png?branch=master" alt="Build Status" />}[http://travis-ci.org/faye/faye-redis-ruby]
= Faye::Redis {<img src="https://secure.travis-ci.org/faye/faye-redis-ruby.png" alt="Build Status" />}[http://travis-ci.org/faye/faye-redis-ruby]
This plugin provides a Redis-based backend for the {Faye}[http://faye.jcoglan.com]
messaging server. It allows a single Faye service to be distributed across many
@@ -12,7 +12,7 @@ Pass in the engine and any settings you need when setting up your Faye server.
require 'faye'
require 'faye/redis'
bayeux = Faye::RackAdapter.new(
:mount => '/',
:timeout => 25,
@@ -25,6 +25,7 @@ Pass in the engine and any settings you need when setting up your Faye server.
The full list of settings is as follows.
* <b><tt>:uri</tt></b> - redis URL (example: redis://:secretpassword@example.com:9000/4)
* <b><tt>:host</tt></b> - hostname of your Redis instance
* <b><tt>:port</tt></b> - port number, default is +6379+
* <b><tt>:password</tt></b> - password, if +requirepass+ is set
+4 -4
View File
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = "faye-redis"
s.version = "0.1.0"
s.version = "0.1.1"
s.summary = "Redis backend engine for Faye"
s.author = "James Coglan"
s.email = "jcoglan@gmail.com"
@@ -10,11 +10,11 @@ Gem::Specification.new do |s|
s.rdoc_options = %w[--main README.rdoc]
s.require_paths = %w[lib]
s.files = %w[README.rdoc] + Dir.glob("{spec,lib}/**/*")
s.files = %w[README.rdoc CHANGELOG.txt] + Dir.glob("lib/**/*.rb")
s.add_dependency "eventmachine", ">= 0.12.0"
s.add_dependency "em-hiredis", ">= 0.0.1"
s.add_dependency "em-hiredis", ">= 0.2.0"
s.add_dependency "yajl-ruby", ">= 1.0.0"
s.add_development_dependency "rspec", "~> 2.8.0"
s.add_development_dependency "rspec"
end
+79 -70
View File
@@ -3,60 +3,57 @@ require 'yajl'
module Faye
class Redis
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 6379
DEFAULT_DATABASE = 0
DEFAULT_GC = 60
LOCK_TIMEOUT = 120
def self.create(server, options)
new(server, options)
end
def initialize(server, options)
@server = server
@options = options
end
def init
return if @redis
uri = @options[:uri] || nil
host = @options[:host] || DEFAULT_HOST
port = @options[:port] || DEFAULT_PORT
db = @options[:database] || DEFAULT_DATABASE
auth = @options[:password]
auth = @options[:password] || nil
gc = @options[:gc] || DEFAULT_GC
@ns = @options[:namespace] || ''
socket = @options[:socket]
if socket
@redis = EventMachine::Hiredis::Client.connect(socket, nil)
@subscriber = EventMachine::Hiredis::Client.connect(socket, nil)
socket = @options[:socket] || nil
if uri
@redis = EventMachine::Hiredis.connect(uri)
elsif socket
@redis = EventMachine::Hiredis::Client.new(socket, nil, auth, db).connect
else
@redis = EventMachine::Hiredis::Client.connect(host, port)
@subscriber = EventMachine::Hiredis::Client.connect(host, port)
@redis = EventMachine::Hiredis::Client.new(host, port, auth, db).connect
end
if auth
@redis.auth(auth)
@subscriber.auth(auth)
end
@redis.select(db)
@subscriber.select(db)
@subscriber = @redis.pubsub
@subscriber.subscribe(@ns + '/notifications')
@subscriber.on(:message) do |topic, message|
empty_queue(message) if topic == @ns + '/notifications'
end
@gc = EventMachine.add_periodic_timer(gc, &method(:gc))
end
def disconnect
return unless @redis
@subscriber.unsubscribe(@ns + '/notifications')
EventMachine.cancel_timer(@gc)
end
def create_client(&callback)
init
client_id = @server.generate_id
@@ -68,48 +65,53 @@ module Faye
callback.call(client_id)
end
end
def client_exists(client_id, &callback)
init
cutoff = get_current_time - (1000 * 1.6 * @server.timeout)
@redis.zscore(@ns + '/clients', client_id) do |score|
callback.call(score.to_i > cutoff)
end
end
def destroy_client(client_id, &callback)
init
@redis.zrem(@ns + '/clients', client_id)
@redis.del(@ns + "/clients/#{client_id}/messages")
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
i, n = 0, channels.size
next after_destroy(client_id, &callback) if i == n
channels.each do |channel|
unsubscribe(client_id, channel) do
i += 1
after_destroy(client_id, &callback) if i == n
@redis.zadd(@ns + '/clients', 0, client_id) do
@redis.smembers(@ns + "/clients/#{client_id}/channels") do |channels|
i, n = 0, channels.size
next after_subscriptions_removed(client_id, &callback) if i == n
channels.each do |channel|
unsubscribe(client_id, channel) do
i += 1
after_subscriptions_removed(client_id, &callback) if i == n
end
end
end
end
end
def after_destroy(client_id, &callback)
@server.debug 'Destroyed client ?', client_id
@server.trigger(:disconnect, client_id)
callback.call if callback
end
def client_exists(client_id, &callback)
init
@redis.zscore(@ns + '/clients', client_id) do |score|
callback.call(score != nil)
def after_subscriptions_removed(client_id, &callback)
@redis.del(@ns + "/clients/#{client_id}/messages") do
@redis.zrem(@ns + '/clients', client_id) do
@server.debug 'Destroyed client ?', client_id
@server.trigger(:disconnect, client_id)
callback.call if callback
end
end
end
def ping(client_id)
init
timeout = @server.timeout
return unless Numeric === timeout
time = get_current_time
@server.debug 'Ping ?, ?', client_id, time
@redis.zadd(@ns + '/clients', time, client_id)
end
def subscribe(client_id, channel, &callback)
init
@redis.sadd(@ns + "/clients/#{client_id}/channels", channel) do |added|
@@ -120,7 +122,7 @@ module Faye
callback.call if callback
end
end
def unsubscribe(client_id, channel, &callback)
init
@redis.srem(@ns + "/clients/#{client_id}/channels", channel) do |removed|
@@ -131,57 +133,64 @@ module Faye
callback.call if callback
end
end
def publish(message, channels)
init
@server.debug 'Publishing message ?', message
json_message = Yajl::Encoder.encode(message)
channels = Channel.expand(message['channel'])
keys = channels.map { |c| @ns + "/channels#{c}" }
@redis.sunion(*keys) do |clients|
clients.each do |client_id|
queue = @ns + "/clients/#{client_id}/messages"
@server.debug 'Queueing for client ?: ?', client_id, message
@redis.rpush(@ns + "/clients/#{client_id}/messages", json_message)
@redis.rpush(queue, json_message)
@redis.publish(@ns + '/notifications', client_id)
client_exists(client_id) do |exists|
@redis.del(queue) unless exists
end
end
end
@server.trigger(:publish, message['clientId'], message['channel'], message['data'])
end
def empty_queue(client_id)
return unless @server.has_connection?(client_id)
init
key = @ns + "/clients/#{client_id}/messages"
@redis.multi
@redis.lrange(key, 0, -1)
@redis.del(key)
@redis.exec.callback do |json_messages, deleted|
next unless json_messages
messages = json_messages.map { |json| Yajl::Parser.parse(json) }
@server.deliver(client_id, messages)
end
end
private
def get_current_time
(Time.now.to_f * 1000).to_i
end
def gc
timeout = @server.timeout
return unless Numeric === timeout
with_lock 'gc' do |release_lock|
cutoff = get_current_time - 1000 * 2 * timeout
@redis.zrangebyscore(@ns + '/clients', 0, cutoff) do |clients|
i, n = 0, clients.size
next release_lock.call if i == n
clients.each do |client_id|
destroy_client(client_id) do
i += 1
@@ -191,32 +200,32 @@ module Faye
end
end
end
def with_lock(lock_name, &block)
lock_key = @ns + '/locks/' + lock_name
current_time = get_current_time
expiry = current_time + LOCK_TIMEOUT * 1000 + 1
release_lock = lambda do
@redis.del(lock_key) if get_current_time < expiry
end
@redis.setnx(lock_key, expiry) do |set|
next block.call(release_lock) if set == 1
@redis.get(lock_key) do |timeout|
next unless timeout
lock_timeout = timeout.to_i(10)
next if current_time < lock_timeout
@redis.getset(lock_key, expiry) do |old_value|
block.call(release_lock) if old_value == timeout
end
end
end
end
end
end
+4 -4
View File
@@ -5,19 +5,19 @@ describe Faye::Redis do
pw = ENV["TRAVIS"] ? nil : "foobared"
{:type => Faye::Redis, :password => pw, :namespace => Time.now.to_i.to_s}
end
after do
engine.disconnect
redis = EM::Hiredis::Client.connect('localhost', 6379)
redis.auth(engine_opts[:password])
redis.flushall
end
it_should_behave_like "faye engine"
it_should_behave_like "distributed engine"
next if ENV["TRAVIS"]
describe "using a Unix socket" do
before { engine_opts[:socket] = "/tmp/redis.sock" }
it_should_behave_like "faye engine"
Vendored
+1 -1