From ef4fc7c57a4a8494caaf7b66dd14865b8fb8bdd4 Mon Sep 17 00:00:00 2001 From: James Coglan Date: Tue, 2 Dec 2014 22:51:42 +0000 Subject: [PATCH] Migrate the Hybi driver's internals to use Frame and Message objects compatible with websocket-extensions. --- .travis.yml | 2 - lib/websocket/driver.rb | 8 +- lib/websocket/driver/client.rb | 17 +- lib/websocket/driver/hybi.rb | 289 +++++++++++++++------------ lib/websocket/driver/hybi/frame.rb | 20 ++ lib/websocket/driver/hybi/message.rb | 21 ++ lib/websocket/driver/server.rb | 5 +- lib/websocket/http/headers.rb | 10 +- websocket-driver.gemspec | 5 +- 9 files changed, 236 insertions(+), 141 deletions(-) create mode 100644 lib/websocket/driver/hybi/frame.rb create mode 100644 lib/websocket/driver/hybi/message.rb diff --git a/.travis.yml b/.travis.yml index 2f93181..632adc0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,6 @@ language: ruby rvm: - - 1.8.7 - - 1.9.2 - 1.9.3 - 2.0.0 - 2.1.3 diff --git a/lib/websocket/driver.rb b/lib/websocket/driver.rb index 9e2da65..77994be 100644 --- a/lib/websocket/driver.rb +++ b/lib/websocket/driver.rb @@ -7,9 +7,11 @@ require 'base64' require 'digest/md5' require 'digest/sha1' +require 'securerandom' require 'set' require 'stringio' require 'uri' +require 'websocket/extensions' module WebSocket autoload :HTTP, File.expand_path('../http', __FILE__) @@ -73,6 +75,10 @@ module WebSocket STATES[@ready_state] end + def add_extension(extension) + false + end + def set_header(name, value) return false unless @ready_state <= 0 @headers[name] = value @@ -175,7 +181,7 @@ module WebSocket upgrade = env['HTTP_UPGRADE'] || '' env['REQUEST_METHOD'] == 'GET' and - connection.downcase.split(/\s*,\s*/).include?('upgrade') and + connection.downcase.split(/ *, */).include?('upgrade') and upgrade.downcase == 'websocket' end diff --git a/lib/websocket/driver/client.rb b/lib/websocket/driver/client.rb index 8a817d6..d46a5aa 100644 --- a/lib/websocket/driver/client.rb +++ b/lib/websocket/driver/client.rb @@ -57,14 +57,21 @@ module WebSocket @http.parse(buffer) return fail_handshake('Invalid HTTP response') if @http.error? + return unless @http.complete? - validate_handshake if @http.complete? - parse(@http.body) if @ready_state == 1 + validate_handshake + return if @ready_state == 3 + + open + parse(@http.body) end private def handshake_request + extensions = @extensions.generate_offer + @headers['Sec-WebSocket-Extensions'] = extensions if extensions + start = "GET #{@pathname} HTTP/1.1" headers = [start, @headers.to_s, ''] headers.join("\r\n") @@ -114,7 +121,11 @@ module WebSocket end end - open + begin + @extensions.activate(@headers['Sec-WebSocket-Extensions']) + rescue ::WebSocket::Extensions::ExtensionError => e + return fail_handshake(e.message) + end end end diff --git a/lib/websocket/driver/hybi.rb b/lib/websocket/driver/hybi.rb index e4de895..ccbb58e 100644 --- a/lib/websocket/driver/hybi.rb +++ b/lib/websocket/driver/hybi.rb @@ -3,6 +3,9 @@ module WebSocket class Hybi < Driver root = File.expand_path('../hybi', __FILE__) + + autoload :Frame, root + '/frame' + autoload :Message, root + '/message' autoload :StreamReader, root + '/stream_reader' def self.generate_accept(key) @@ -28,9 +31,9 @@ module WebSocket :pong => 10 } - OPCODE_CODES = OPCODES.values - FRAGMENTED_OPCODES = OPCODES.values_at(:continuation, :text, :binary) - OPENING_OPCODES = OPCODES.values_at(:text, :binary) + OPCODE_CODES = OPCODES.values + MESSAGE_OPCODES = OPCODES.values_at(:continuation, :text, :binary) + OPENING_OPCODES = OPCODES.values_at(:text, :binary) ERRORS = { :normal_closure => 1000, @@ -50,13 +53,13 @@ module WebSocket def initialize(socket, options = {}) super - reset + @extensions = ::WebSocket::Extensions.new @reader = StreamReader.new @stage = 0 @masking = options[:masking] @protocols = options[:protocols] || [] - @protocols = @protocols.strip.split(/\s*,\s*/) if String === @protocols + @protocols = @protocols.strip.split(/ *, */) if String === @protocols @require_masking = options[:require_masking] @ping_callbacks = {} @@ -70,7 +73,7 @@ module WebSocket @headers['Sec-WebSocket-Accept'] = Hybi.generate_accept(sec_key) if protos = @socket.env['HTTP_SEC_WEBSOCKET_PROTOCOL'] - protos = protos.split(/\s*,\s*/) if String === protos + protos = protos.split(/ *, */) if String === protos @protocol = protos.find { |p| @protocols.include?(p) } @headers['Sec-WebSocket-Protocol'] = @protocol if @protocol end @@ -80,6 +83,11 @@ module WebSocket "hybi-#{@socket.env['HTTP_SEC_WEBSOCKET_VERSION']}" end + def add_extension(extension) + @extensions.add(extension) + true + end + def parse(data) data = data.bytes.to_a if data.respond_to?(:bytes) @reader.put(data) @@ -95,20 +103,19 @@ module WebSocket parse_length(buffer[0]) if buffer when 2 then - buffer = @reader.read(@length_size) + buffer = @reader.read(@frame.length_bytes) parse_extended_length(buffer) if buffer when 3 then buffer = @reader.read(4) if buffer - @mask = buffer + @frame.masking_key = buffer @stage = 4 end when 4 then - buffer = @reader.read(@length) + buffer = @reader.read(@frame.length) if buffer - @payload = buffer emit_frame(buffer) @stage = 0 end @@ -119,59 +126,6 @@ module WebSocket end end - def frame(data, type = nil, code = nil) - return queue([data, type, code]) if @ready_state <= 0 - return false unless @ready_state == 1 - - data = data.to_s unless Array === data - data = Driver.encode(data, :utf8) if String === data - - is_text = (String === data) - opcode = OPCODES[type || (is_text ? :text : :binary)] - buffer = data.respond_to?(:bytes) ? data.bytes.to_a : data - insert = code ? 2 : 0 - length = buffer.size + insert - header = (length <= 125) ? 2 : (length <= 65535 ? 4 : 10) - offset = header + (@masking ? 4 : 0) - masked = @masking ? MASK : 0 - frame = Array.new(offset) - - frame[0] = FIN | opcode - - if length <= 125 - frame[1] = masked | length - elsif length <= 65535 - frame[1] = masked | 126 - frame[2] = (length >> 8) & BYTE - frame[3] = length & BYTE - else - frame[1] = masked | 127 - frame[2] = (length >> 56) & BYTE - frame[3] = (length >> 48) & BYTE - frame[4] = (length >> 40) & BYTE - frame[5] = (length >> 32) & BYTE - frame[6] = (length >> 24) & BYTE - frame[7] = (length >> 16) & BYTE - frame[8] = (length >> 8) & BYTE - frame[9] = length & BYTE - end - - if code - buffer = [(code >> 8) & BYTE, code & BYTE] + buffer - end - - if @masking - mask = [rand(256), rand(256), rand(256), rand(256)] - frame[header...offset] = mask - buffer = Mask.mask(buffer, mask) - end - - frame.concat(buffer) - - @socket.write(Driver.encode(frame, :binary)) - true - end - def text(message) frame(message, :text) end @@ -202,9 +156,89 @@ module WebSocket end end + def frame(data, type = nil, code = nil) + return queue([data, type, code]) if @ready_state <= 0 + return false unless @ready_state == 1 + + data = data.to_s unless Array === data + data = Driver.encode(data, :utf8) if String === data + + frame = Frame.new + is_text = (String === data) + + frame.final = true + frame.rsv1 = frame.rsv2 = frame.rsv3 = false + frame.opcode = OPCODES[type || (is_text ? :text : :binary)] + frame.masked = !!@masking + + frame.masking_key = SecureRandom.random_bytes(4).bytes.to_a if frame.masked + + payload = data.respond_to?(:bytes) ? data.bytes.to_a : data + if code + payload = [(code >> 8) & BYTE, code & BYTE] + payload + end + frame.length = payload.size + frame.payload = payload + + send_frame(frame, true) + true + end + private + def send_frame(frame, run_extensions = false) + if run_extensions and MESSAGE_OPCODES.include?(frame.opcode) + message = Message.new + message << frame + + @extensions.process_outgoing_message(message).frames.each do |frame| + send_frame(frame) + end + return + end + + length = frame.length + header = (length <= 125) ? 2 : (length <= 65535 ? 4 : 10) + offset = header + (frame.masked ? 4 : 0) + masked = frame.masked ? MASK : 0 + + buffer = [FIN | frame.opcode] + + if length <= 125 + buffer[1] = masked | length + elsif length <= 65535 + buffer[1] = masked | 126 + buffer[2] = (length >> 8) & BYTE + buffer[3] = length & BYTE + else + buffer[1] = masked | 127 + buffer[2] = (length >> 56) & BYTE + buffer[3] = (length >> 48) & BYTE + buffer[4] = (length >> 40) & BYTE + buffer[5] = (length >> 32) & BYTE + buffer[6] = (length >> 24) & BYTE + buffer[7] = (length >> 16) & BYTE + buffer[8] = (length >> 8) & BYTE + buffer[9] = length & BYTE + end + + if frame.masked + buffer.concat(frame.masking_key) + buffer.concat(Mask.mask(frame.payload, frame.masking_key)) + else + buffer.concat(frame.payload) + end + + @socket.write(Driver.encode(buffer, :binary)) + + rescue ::WebSocket::Extensions::ExtensionError => e + fail(:extension_error, e.message) + end + def handshake_response + extensions = @extensions.generate_response(@socket.env['HTTP_SEC_WEBSOCKET_EXTENSIONS']) + @headers['Sec-WebSocket-Extensions'] = extensions if extensions + start = 'HTTP/1.1 101 Switching Protocols' headers = [start, @headers.to_s, ''] headers.join("\r\n") @@ -212,9 +246,11 @@ module WebSocket def shutdown(code, reason) frame(reason, :close, code) + @frame = @message = nil @ready_state = 3 @stage = 5 emit(:close, CloseEvent.new(code, reason)) + @extensions.close end def fail(type, message) @@ -225,25 +261,30 @@ module WebSocket def parse_opcode(data) rsvs = [RSV1, RSV2, RSV3].map { |rsv| (data & rsv) == rsv } - if rsvs.any? + @frame = Frame.new + + @frame.final = (data & FIN) == FIN + @frame.rsv1 = rsvs[0] + @frame.rsv2 = rsvs[1] + @frame.rsv3 = rsvs[2] + @frame.opcode = (data & OPCODE) + + unless @extensions.valid_frame_rsv?(@frame) return fail(:protocol_error, - "One or more reserved bits are on: reserved1 = #{rsvs[0] ? 1 : 0}" + - ", reserved2 = #{rsvs[1] ? 1 : 0 }" + - ", reserved3 = #{rsvs[2] ? 1 : 0 }") + "One or more reserved bits are on: reserved1 = #{@frame.rsv1 ? 1 : 0}" + + ", reserved2 = #{@frame.rsv2 ? 1 : 0 }" + + ", reserved3 = #{@frame.rsv3 ? 1 : 0 }") end - @final = (data & FIN) == FIN - @opcode = (data & OPCODE) - - unless OPCODES.values.include?(@opcode) - return fail(:protocol_error, "Unrecognized frame opcode: #{@opcode}") + unless OPCODES.values.include?(@frame.opcode) + return fail(:protocol_error, "Unrecognized frame opcode: #{@frame.opcode}") end - unless FRAGMENTED_OPCODES.include?(@opcode) or @final - return fail(:protocol_error, "Received fragmented control frame: opcode = #{@opcode}") + unless MESSAGE_OPCODES.include?(@frame.opcode) or @frame.final + return fail(:protocol_error, "Received fragmented control frame: opcode = #{@frame.opcode}") end - if @mode and OPENING_OPCODES.include?(@opcode) + if @message and OPENING_OPCODES.include?(@frame.opcode) return fail(:protocol_error, 'Received new data frame but previous continuous frame is unfinished') end @@ -251,36 +292,38 @@ module WebSocket end def parse_length(data) - @masked = (data & MASK) == MASK - if @require_masking and not @masked + @frame.masked = (data & MASK) == MASK + if @require_masking and not @frame.masked return fail(:unacceptable, 'Received unmasked frame but masking is required') end - @length = (data & LENGTH) + @frame.length = (data & LENGTH) - if @length >= 0 and @length <= 125 + if @frame.length >= 0 and @frame.length <= 125 return unless check_frame_length - @stage = @masked ? 3 : 4 + @stage = @frame.masked ? 3 : 4 else - @length_size = (@length == 126) ? 2 : 8 - @stage = 2 + @frame.length_bytes = (@frame.length == 126) ? 2 : 8 + @stage = 2 end end def parse_extended_length(buffer) - @length = integer(buffer) + @frame.length = integer(buffer) - unless FRAGMENTED_OPCODES.include?(@opcode) or @length <= 125 - return fail(:protocol_error, "Received control frame having too long payload: #{@length}") + unless MESSAGE_OPCODES.include?(@frame.opcode) or @frame.length <= 125 + return fail(:protocol_error, "Received control frame having too long payload: #{@frame.length}") end return unless check_frame_length - @stage = @masked ? 3 : 4 + @stage = @frame.masked ? 3 : 4 end def check_frame_length - if @buffer.size + @length > @max_length + length = @message ? @message.data.size : 0 + + if length + @frame.length > @max_length fail(:too_large, 'WebSocket frame length too large') false else @@ -289,50 +332,24 @@ module WebSocket end def emit_frame(buffer) - payload = Mask.mask(buffer, @mask) - is_final = @final - opcode = @opcode + frame = @frame + payload = frame.payload = Mask.mask(buffer, @frame.masking_key) + opcode = frame.opcode - @final = @opcode = @length = @length_size = @masked = @mask = nil + @frame = nil case opcode when OPCODES[:continuation] then - return fail(:protocol_error, 'Received unexpected continuation frame') unless @mode - @buffer.concat(payload) - if is_final - message = @buffer - message = Driver.encode(message, :utf8) if @mode == :text - reset - if message - emit(:message, MessageEvent.new(message)) - else - fail(:encoding_error, 'Could not decode a text frame as UTF-8') - end - end + return fail(:protocol_error, 'Received unexpected continuation frame') unless @message + @message << frame - when OPCODES[:text] then - if is_final - message = Driver.encode(payload, :utf8) - if message - emit(:message, MessageEvent.new(message)) - else - fail(:encoding_error, 'Could not decode a text frame as UTF-8') - end - else - @mode = :text - @buffer.concat(payload) - end - - when OPCODES[:binary] then - if is_final - emit(:message, MessageEvent.new(payload)) - else - @mode = :binary - @buffer.concat(payload) - end + when OPCODES[:text], OPCODES[:binary] then + @message = Message.new + @message << frame when OPCODES[:close] then - code = (payload.size >= 2) ? 256 * payload[0] + payload[1] : nil + code = (payload.size >= 2) ? 256 * payload[0] + payload[1] : nil + reason = (payload.size > 2) ? Driver.encode(payload[2..-1] || [], :utf8) : nil unless (payload.size == 0) or (code && code >= MIN_RESERVED_ERROR && code <= MAX_RESERVED_ERROR) or @@ -340,13 +357,10 @@ module WebSocket code = ERRORS[:protocol_error] end - message = Driver.encode(payload[2..-1] || [], :utf8) - - if payload.size > 125 or message.nil? + if payload.size > 125 or (payload.size > 2 and reason.nil?) code = ERRORS[:protocol_error] end - reason = (payload.size > 2) ? message : '' shutdown(code, reason || '') when OPCODES[:ping] then @@ -358,11 +372,24 @@ module WebSocket @ping_callbacks.delete(message) callback.call if callback end + + emit_message if frame.final and MESSAGE_OPCODES.include?(opcode) end - def reset - @buffer = [] - @mode = nil + def emit_message + message = @extensions.process_incoming_message(@message) + @message = nil + + payload = message.data + payload = Driver.encode(payload, :utf8) if message.frames.first.opcode == OPCODES[:text] + + if payload + emit(:message, MessageEvent.new(payload)) + else + fail(:encoding_error, 'Could not decode a text frame as UTF-8') + end + rescue ::WebSocket::Extensions::ExtensionError => e + fail(:extension_error, e.message) end def integer(bytes) diff --git a/lib/websocket/driver/hybi/frame.rb b/lib/websocket/driver/hybi/frame.rb new file mode 100644 index 0000000..dfe69c6 --- /dev/null +++ b/lib/websocket/driver/hybi/frame.rb @@ -0,0 +1,20 @@ +module WebSocket + class Driver + class Hybi + + class Frame + attr_accessor :final, + :rsv1, + :rsv2, + :rsv3, + :opcode, + :masked, + :masking_key, + :length_bytes, + :length, + :payload + end + + end + end +end diff --git a/lib/websocket/driver/hybi/message.rb b/lib/websocket/driver/hybi/message.rb new file mode 100644 index 0000000..0f1c142 --- /dev/null +++ b/lib/websocket/driver/hybi/message.rb @@ -0,0 +1,21 @@ +module WebSocket + class Driver + class Hybi + + class Message + attr_accessor :data, :frames + + def initialize + @data = [] + @frames = [] + end + + def <<(frame) + @data.concat(frame.payload) + @frames << frame + end + end + + end + end +end diff --git a/lib/websocket/driver/server.rb b/lib/websocket/driver/server.rb index 692e691..26a041d 100644 --- a/lib/websocket/driver/server.rb +++ b/lib/websocket/driver/server.rb @@ -22,7 +22,7 @@ module WebSocket url end - %w[set_header start state frame text binary ping close].each do |method| + %w[add_extension set_header start frame text binary ping close].each do |method| define_method(method) do |*args, &block| if @delegate @delegate.__send__(method, *args, &block) @@ -47,7 +47,8 @@ module WebSocket return unless @http.complete? @delegate = Driver.rack(self, @options) - @delegate.on(:open) { open } + open + EVENTS.each do |event| @delegate.on(event) { |e| emit(event, e) } end diff --git a/lib/websocket/http/headers.rb b/lib/websocket/http/headers.rb index 68ddb33..9a64bf6 100644 --- a/lib/websocket/http/headers.rb +++ b/lib/websocket/http/headers.rb @@ -90,7 +90,15 @@ module WebSocket def header_line(line) return false unless parsed = line.scan(HEADER_LINE).first - @headers[HTTP.normalize_header(parsed[0])] = parsed[1].strip + + key = HTTP.normalize_header(parsed[0]) + value = parsed[1].strip + + if @headers.has_key?(key) + @headers[key] << ', ' << value + else + @headers[key] = value + end true end diff --git a/websocket-driver.gemspec b/websocket-driver.gemspec index e3f6c1f..8ad6ca2 100644 --- a/websocket-driver.gemspec +++ b/websocket-driver.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'websocket-driver' - s.version = '0.4.0' + s.version = '0.5.0' s.summary = 'WebSocket protocol handler with pluggable I/O' s.author = 'James Coglan' s.email = 'jcoglan@gmail.com' @@ -24,7 +24,10 @@ Gem::Specification.new do |s| s.files = files + s.add_dependency 'websocket-extensions', '>= 0.1.0' + s.add_development_dependency 'eventmachine' + s.add_development_dependency 'permessage-deflate' s.add_development_dependency 'rake-compiler', '~> 0.8.0' s.add_development_dependency 'rspec' end