Migrate the Hybi driver's internals to use Frame and Message objects compatible with websocket-extensions.

This commit is contained in:
James Coglan
2014-12-02 22:51:42 +00:00
parent 93c7cf16b3
commit ef4fc7c57a
9 changed files with 236 additions and 141 deletions
-2
View File
@@ -1,8 +1,6 @@
language: ruby
rvm:
- 1.8.7
- 1.9.2
- 1.9.3
- 2.0.0
- 2.1.3
+7 -1
View File
@@ -7,9 +7,11 @@
require 'base64'
require 'digest/md5'
require 'digest/sha1'
require 'securerandom'
require 'set'
require 'stringio'
require 'uri'
require 'websocket/extensions'
module WebSocket
autoload :HTTP, File.expand_path('../http', __FILE__)
@@ -73,6 +75,10 @@ module WebSocket
STATES[@ready_state]
end
def add_extension(extension)
false
end
def set_header(name, value)
return false unless @ready_state <= 0
@headers[name] = value
@@ -175,7 +181,7 @@ module WebSocket
upgrade = env['HTTP_UPGRADE'] || ''
env['REQUEST_METHOD'] == 'GET' and
connection.downcase.split(/\s*,\s*/).include?('upgrade') and
connection.downcase.split(/ *, */).include?('upgrade') and
upgrade.downcase == 'websocket'
end
+14 -3
View File
@@ -57,14 +57,21 @@ module WebSocket
@http.parse(buffer)
return fail_handshake('Invalid HTTP response') if @http.error?
return unless @http.complete?
validate_handshake if @http.complete?
parse(@http.body) if @ready_state == 1
validate_handshake
return if @ready_state == 3
open
parse(@http.body)
end
private
def handshake_request
extensions = @extensions.generate_offer
@headers['Sec-WebSocket-Extensions'] = extensions if extensions
start = "GET #{@pathname} HTTP/1.1"
headers = [start, @headers.to_s, '']
headers.join("\r\n")
@@ -114,7 +121,11 @@ module WebSocket
end
end
open
begin
@extensions.activate(@headers['Sec-WebSocket-Extensions'])
rescue ::WebSocket::Extensions::ExtensionError => e
return fail_handshake(e.message)
end
end
end
+158 -131
View File
@@ -3,6 +3,9 @@ module WebSocket
class Hybi < Driver
root = File.expand_path('../hybi', __FILE__)
autoload :Frame, root + '/frame'
autoload :Message, root + '/message'
autoload :StreamReader, root + '/stream_reader'
def self.generate_accept(key)
@@ -28,9 +31,9 @@ module WebSocket
:pong => 10
}
OPCODE_CODES = OPCODES.values
FRAGMENTED_OPCODES = OPCODES.values_at(:continuation, :text, :binary)
OPENING_OPCODES = OPCODES.values_at(:text, :binary)
OPCODE_CODES = OPCODES.values
MESSAGE_OPCODES = OPCODES.values_at(:continuation, :text, :binary)
OPENING_OPCODES = OPCODES.values_at(:text, :binary)
ERRORS = {
:normal_closure => 1000,
@@ -50,13 +53,13 @@ module WebSocket
def initialize(socket, options = {})
super
reset
@extensions = ::WebSocket::Extensions.new
@reader = StreamReader.new
@stage = 0
@masking = options[:masking]
@protocols = options[:protocols] || []
@protocols = @protocols.strip.split(/\s*,\s*/) if String === @protocols
@protocols = @protocols.strip.split(/ *, */) if String === @protocols
@require_masking = options[:require_masking]
@ping_callbacks = {}
@@ -70,7 +73,7 @@ module WebSocket
@headers['Sec-WebSocket-Accept'] = Hybi.generate_accept(sec_key)
if protos = @socket.env['HTTP_SEC_WEBSOCKET_PROTOCOL']
protos = protos.split(/\s*,\s*/) if String === protos
protos = protos.split(/ *, */) if String === protos
@protocol = protos.find { |p| @protocols.include?(p) }
@headers['Sec-WebSocket-Protocol'] = @protocol if @protocol
end
@@ -80,6 +83,11 @@ module WebSocket
"hybi-#{@socket.env['HTTP_SEC_WEBSOCKET_VERSION']}"
end
def add_extension(extension)
@extensions.add(extension)
true
end
def parse(data)
data = data.bytes.to_a if data.respond_to?(:bytes)
@reader.put(data)
@@ -95,20 +103,19 @@ module WebSocket
parse_length(buffer[0]) if buffer
when 2 then
buffer = @reader.read(@length_size)
buffer = @reader.read(@frame.length_bytes)
parse_extended_length(buffer) if buffer
when 3 then
buffer = @reader.read(4)
if buffer
@mask = buffer
@frame.masking_key = buffer
@stage = 4
end
when 4 then
buffer = @reader.read(@length)
buffer = @reader.read(@frame.length)
if buffer
@payload = buffer
emit_frame(buffer)
@stage = 0
end
@@ -119,59 +126,6 @@ module WebSocket
end
end
def frame(data, type = nil, code = nil)
return queue([data, type, code]) if @ready_state <= 0
return false unless @ready_state == 1
data = data.to_s unless Array === data
data = Driver.encode(data, :utf8) if String === data
is_text = (String === data)
opcode = OPCODES[type || (is_text ? :text : :binary)]
buffer = data.respond_to?(:bytes) ? data.bytes.to_a : data
insert = code ? 2 : 0
length = buffer.size + insert
header = (length <= 125) ? 2 : (length <= 65535 ? 4 : 10)
offset = header + (@masking ? 4 : 0)
masked = @masking ? MASK : 0
frame = Array.new(offset)
frame[0] = FIN | opcode
if length <= 125
frame[1] = masked | length
elsif length <= 65535
frame[1] = masked | 126
frame[2] = (length >> 8) & BYTE
frame[3] = length & BYTE
else
frame[1] = masked | 127
frame[2] = (length >> 56) & BYTE
frame[3] = (length >> 48) & BYTE
frame[4] = (length >> 40) & BYTE
frame[5] = (length >> 32) & BYTE
frame[6] = (length >> 24) & BYTE
frame[7] = (length >> 16) & BYTE
frame[8] = (length >> 8) & BYTE
frame[9] = length & BYTE
end
if code
buffer = [(code >> 8) & BYTE, code & BYTE] + buffer
end
if @masking
mask = [rand(256), rand(256), rand(256), rand(256)]
frame[header...offset] = mask
buffer = Mask.mask(buffer, mask)
end
frame.concat(buffer)
@socket.write(Driver.encode(frame, :binary))
true
end
def text(message)
frame(message, :text)
end
@@ -202,9 +156,89 @@ module WebSocket
end
end
def frame(data, type = nil, code = nil)
return queue([data, type, code]) if @ready_state <= 0
return false unless @ready_state == 1
data = data.to_s unless Array === data
data = Driver.encode(data, :utf8) if String === data
frame = Frame.new
is_text = (String === data)
frame.final = true
frame.rsv1 = frame.rsv2 = frame.rsv3 = false
frame.opcode = OPCODES[type || (is_text ? :text : :binary)]
frame.masked = !!@masking
frame.masking_key = SecureRandom.random_bytes(4).bytes.to_a if frame.masked
payload = data.respond_to?(:bytes) ? data.bytes.to_a : data
if code
payload = [(code >> 8) & BYTE, code & BYTE] + payload
end
frame.length = payload.size
frame.payload = payload
send_frame(frame, true)
true
end
private
def send_frame(frame, run_extensions = false)
if run_extensions and MESSAGE_OPCODES.include?(frame.opcode)
message = Message.new
message << frame
@extensions.process_outgoing_message(message).frames.each do |frame|
send_frame(frame)
end
return
end
length = frame.length
header = (length <= 125) ? 2 : (length <= 65535 ? 4 : 10)
offset = header + (frame.masked ? 4 : 0)
masked = frame.masked ? MASK : 0
buffer = [FIN | frame.opcode]
if length <= 125
buffer[1] = masked | length
elsif length <= 65535
buffer[1] = masked | 126
buffer[2] = (length >> 8) & BYTE
buffer[3] = length & BYTE
else
buffer[1] = masked | 127
buffer[2] = (length >> 56) & BYTE
buffer[3] = (length >> 48) & BYTE
buffer[4] = (length >> 40) & BYTE
buffer[5] = (length >> 32) & BYTE
buffer[6] = (length >> 24) & BYTE
buffer[7] = (length >> 16) & BYTE
buffer[8] = (length >> 8) & BYTE
buffer[9] = length & BYTE
end
if frame.masked
buffer.concat(frame.masking_key)
buffer.concat(Mask.mask(frame.payload, frame.masking_key))
else
buffer.concat(frame.payload)
end
@socket.write(Driver.encode(buffer, :binary))
rescue ::WebSocket::Extensions::ExtensionError => e
fail(:extension_error, e.message)
end
def handshake_response
extensions = @extensions.generate_response(@socket.env['HTTP_SEC_WEBSOCKET_EXTENSIONS'])
@headers['Sec-WebSocket-Extensions'] = extensions if extensions
start = 'HTTP/1.1 101 Switching Protocols'
headers = [start, @headers.to_s, '']
headers.join("\r\n")
@@ -212,9 +246,11 @@ module WebSocket
def shutdown(code, reason)
frame(reason, :close, code)
@frame = @message = nil
@ready_state = 3
@stage = 5
emit(:close, CloseEvent.new(code, reason))
@extensions.close
end
def fail(type, message)
@@ -225,25 +261,30 @@ module WebSocket
def parse_opcode(data)
rsvs = [RSV1, RSV2, RSV3].map { |rsv| (data & rsv) == rsv }
if rsvs.any?
@frame = Frame.new
@frame.final = (data & FIN) == FIN
@frame.rsv1 = rsvs[0]
@frame.rsv2 = rsvs[1]
@frame.rsv3 = rsvs[2]
@frame.opcode = (data & OPCODE)
unless @extensions.valid_frame_rsv?(@frame)
return fail(:protocol_error,
"One or more reserved bits are on: reserved1 = #{rsvs[0] ? 1 : 0}" +
", reserved2 = #{rsvs[1] ? 1 : 0 }" +
", reserved3 = #{rsvs[2] ? 1 : 0 }")
"One or more reserved bits are on: reserved1 = #{@frame.rsv1 ? 1 : 0}" +
", reserved2 = #{@frame.rsv2 ? 1 : 0 }" +
", reserved3 = #{@frame.rsv3 ? 1 : 0 }")
end
@final = (data & FIN) == FIN
@opcode = (data & OPCODE)
unless OPCODES.values.include?(@opcode)
return fail(:protocol_error, "Unrecognized frame opcode: #{@opcode}")
unless OPCODES.values.include?(@frame.opcode)
return fail(:protocol_error, "Unrecognized frame opcode: #{@frame.opcode}")
end
unless FRAGMENTED_OPCODES.include?(@opcode) or @final
return fail(:protocol_error, "Received fragmented control frame: opcode = #{@opcode}")
unless MESSAGE_OPCODES.include?(@frame.opcode) or @frame.final
return fail(:protocol_error, "Received fragmented control frame: opcode = #{@frame.opcode}")
end
if @mode and OPENING_OPCODES.include?(@opcode)
if @message and OPENING_OPCODES.include?(@frame.opcode)
return fail(:protocol_error, 'Received new data frame but previous continuous frame is unfinished')
end
@@ -251,36 +292,38 @@ module WebSocket
end
def parse_length(data)
@masked = (data & MASK) == MASK
if @require_masking and not @masked
@frame.masked = (data & MASK) == MASK
if @require_masking and not @frame.masked
return fail(:unacceptable, 'Received unmasked frame but masking is required')
end
@length = (data & LENGTH)
@frame.length = (data & LENGTH)
if @length >= 0 and @length <= 125
if @frame.length >= 0 and @frame.length <= 125
return unless check_frame_length
@stage = @masked ? 3 : 4
@stage = @frame.masked ? 3 : 4
else
@length_size = (@length == 126) ? 2 : 8
@stage = 2
@frame.length_bytes = (@frame.length == 126) ? 2 : 8
@stage = 2
end
end
def parse_extended_length(buffer)
@length = integer(buffer)
@frame.length = integer(buffer)
unless FRAGMENTED_OPCODES.include?(@opcode) or @length <= 125
return fail(:protocol_error, "Received control frame having too long payload: #{@length}")
unless MESSAGE_OPCODES.include?(@frame.opcode) or @frame.length <= 125
return fail(:protocol_error, "Received control frame having too long payload: #{@frame.length}")
end
return unless check_frame_length
@stage = @masked ? 3 : 4
@stage = @frame.masked ? 3 : 4
end
def check_frame_length
if @buffer.size + @length > @max_length
length = @message ? @message.data.size : 0
if length + @frame.length > @max_length
fail(:too_large, 'WebSocket frame length too large')
false
else
@@ -289,50 +332,24 @@ module WebSocket
end
def emit_frame(buffer)
payload = Mask.mask(buffer, @mask)
is_final = @final
opcode = @opcode
frame = @frame
payload = frame.payload = Mask.mask(buffer, @frame.masking_key)
opcode = frame.opcode
@final = @opcode = @length = @length_size = @masked = @mask = nil
@frame = nil
case opcode
when OPCODES[:continuation] then
return fail(:protocol_error, 'Received unexpected continuation frame') unless @mode
@buffer.concat(payload)
if is_final
message = @buffer
message = Driver.encode(message, :utf8) if @mode == :text
reset
if message
emit(:message, MessageEvent.new(message))
else
fail(:encoding_error, 'Could not decode a text frame as UTF-8')
end
end
return fail(:protocol_error, 'Received unexpected continuation frame') unless @message
@message << frame
when OPCODES[:text] then
if is_final
message = Driver.encode(payload, :utf8)
if message
emit(:message, MessageEvent.new(message))
else
fail(:encoding_error, 'Could not decode a text frame as UTF-8')
end
else
@mode = :text
@buffer.concat(payload)
end
when OPCODES[:binary] then
if is_final
emit(:message, MessageEvent.new(payload))
else
@mode = :binary
@buffer.concat(payload)
end
when OPCODES[:text], OPCODES[:binary] then
@message = Message.new
@message << frame
when OPCODES[:close] then
code = (payload.size >= 2) ? 256 * payload[0] + payload[1] : nil
code = (payload.size >= 2) ? 256 * payload[0] + payload[1] : nil
reason = (payload.size > 2) ? Driver.encode(payload[2..-1] || [], :utf8) : nil
unless (payload.size == 0) or
(code && code >= MIN_RESERVED_ERROR && code <= MAX_RESERVED_ERROR) or
@@ -340,13 +357,10 @@ module WebSocket
code = ERRORS[:protocol_error]
end
message = Driver.encode(payload[2..-1] || [], :utf8)
if payload.size > 125 or message.nil?
if payload.size > 125 or (payload.size > 2 and reason.nil?)
code = ERRORS[:protocol_error]
end
reason = (payload.size > 2) ? message : ''
shutdown(code, reason || '')
when OPCODES[:ping] then
@@ -358,11 +372,24 @@ module WebSocket
@ping_callbacks.delete(message)
callback.call if callback
end
emit_message if frame.final and MESSAGE_OPCODES.include?(opcode)
end
def reset
@buffer = []
@mode = nil
def emit_message
message = @extensions.process_incoming_message(@message)
@message = nil
payload = message.data
payload = Driver.encode(payload, :utf8) if message.frames.first.opcode == OPCODES[:text]
if payload
emit(:message, MessageEvent.new(payload))
else
fail(:encoding_error, 'Could not decode a text frame as UTF-8')
end
rescue ::WebSocket::Extensions::ExtensionError => e
fail(:extension_error, e.message)
end
def integer(bytes)
+20
View File
@@ -0,0 +1,20 @@
module WebSocket
class Driver
class Hybi
class Frame
attr_accessor :final,
:rsv1,
:rsv2,
:rsv3,
:opcode,
:masked,
:masking_key,
:length_bytes,
:length,
:payload
end
end
end
end
+21
View File
@@ -0,0 +1,21 @@
module WebSocket
class Driver
class Hybi
class Message
attr_accessor :data, :frames
def initialize
@data = []
@frames = []
end
def <<(frame)
@data.concat(frame.payload)
@frames << frame
end
end
end
end
end
+3 -2
View File
@@ -22,7 +22,7 @@ module WebSocket
url
end
%w[set_header start state frame text binary ping close].each do |method|
%w[add_extension set_header start frame text binary ping close].each do |method|
define_method(method) do |*args, &block|
if @delegate
@delegate.__send__(method, *args, &block)
@@ -47,7 +47,8 @@ module WebSocket
return unless @http.complete?
@delegate = Driver.rack(self, @options)
@delegate.on(:open) { open }
open
EVENTS.each do |event|
@delegate.on(event) { |e| emit(event, e) }
end
+9 -1
View File
@@ -90,7 +90,15 @@ module WebSocket
def header_line(line)
return false unless parsed = line.scan(HEADER_LINE).first
@headers[HTTP.normalize_header(parsed[0])] = parsed[1].strip
key = HTTP.normalize_header(parsed[0])
value = parsed[1].strip
if @headers.has_key?(key)
@headers[key] << ', ' << value
else
@headers[key] = value
end
true
end
+4 -1
View File
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'websocket-driver'
s.version = '0.4.0'
s.version = '0.5.0'
s.summary = 'WebSocket protocol handler with pluggable I/O'
s.author = 'James Coglan'
s.email = 'jcoglan@gmail.com'
@@ -24,7 +24,10 @@ Gem::Specification.new do |s|
s.files = files
s.add_dependency 'websocket-extensions', '>= 0.1.0'
s.add_development_dependency 'eventmachine'
s.add_development_dependency 'permessage-deflate'
s.add_development_dependency 'rake-compiler', '~> 0.8.0'
s.add_development_dependency 'rspec'
end