Make sure all event registration and input processing happens on the EventMachine thread, so that we don't get race conditions where incoming messages are processed before the socket's onmessage event handler has been registered.

This commit is contained in:
James Coglan
2013-03-19 19:24:21 +00:00
parent c05e0903f5
commit a65f5d9455
4 changed files with 29 additions and 13 deletions
+3
View File
@@ -48,3 +48,6 @@ App = lambda do |env|
end
end
def App.log(message)
end
+1 -7
View File
@@ -7,12 +7,6 @@ secure = ARGV[1] == 'ssl'
engine = ARGV[2] || 'thin'
spec = File.expand_path('../../spec', __FILE__)
module Logger
def self.log(message)
$stdout.puts(message)
end
end
require File.expand_path('../app', __FILE__)
if %[goliath thin].include?(engine)
Faye::WebSocket.load_adapter(engine)
@@ -32,7 +26,7 @@ when 'goliath'
when 'puma'
events = Puma::Events.new($stdout, $stderr)
binder = Puma::Binder.new(events)
binder.parse(["tcp://0.0.0.0:#{port}"], Logger)
binder.parse(["tcp://0.0.0.0:#{port}"], App)
server = Puma::Server.new(App, events)
server.binder = binder
server.run.join
+6 -3
View File
@@ -23,7 +23,10 @@ module Faye
if socket_object.env['rack.hijack?']
socket_object.env['rack.hijack'].call
@rack_hijack_io = socket_object.env['rack.hijack_io']
EventMachine.attach(@rack_hijack_io, Reader) { |r| r.stream = self }
EventMachine.attach(@rack_hijack_io, Reader) do |reader|
@rack_hijack_io_reader = reader
reader.stream = self
end
end
@connection.socket_stream = self if @connection.respond_to?(:socket_stream)
@@ -31,8 +34,8 @@ module Faye
def clean_rack_hijack
return unless @rack_hijack_io
@rack_hijack_io.close
@rack_hijack_io = nil
@rack_hijack_io_reader.close_connection_after_writing
@rack_hijack_io = @rack_hijack_io_reader = nil
end
def close_connection
+19 -3
View File
@@ -1,7 +1,18 @@
module Faye::WebSocket::API
module EventTarget
attr_accessor :onopen, :onmessage, :onerror, :onclose
events = %w[open message error close]
events.each do |event_type|
define_method "on#{event_type}=" do |handler|
EventMachine.next_tick do
if buffer = @buffers && @buffers.delete(event_type)
buffer.each { |event| handler.call(event) }
end
instance_variable_set("@on#{event_type}", handler)
end
end
end
def add_event_listener(event_type, listener, use_capture = false)
@listeners ||= {}
@@ -20,8 +31,13 @@ module Faye::WebSocket::API
event.target = event.current_target = self
event.event_phase = Event::AT_TARGET
callback = __send__("on#{ event.type }")
callback.call(event) if callback
callback = instance_variable_get("@on#{ event.type }")
if callback
callback.call(event)
else
@buffers ||= Hash.new { |k,v| k[v] = [] }
@buffers[event.type].push(event)
end
return unless @listeners and @listeners[event.type]
@listeners[event.type].each do |listener|