21 Commits

Author SHA1 Message Date
James Coglan 2af2c18251 Bump version to 0.1.3. 2017-11-11 01:24:29 +00:00
James Coglan 50bcedda78 Test on Node v9. 2017-11-11 01:05:57 +00:00
James Coglan e3aa5246d6 Avoid errors caused by extension names or parameters having names that clash with things in Object.prototype. 2017-11-11 00:45:16 +00:00
James Coglan 1e58c148cb Header parser should accept uppercase letters. 2017-11-11 00:44:44 +00:00
James Coglan 5f040a15af Bump version to 0.1.2. 2017-09-10 17:48:15 +01:00
James Coglan 654d9b0acc Move the license into its own file. 2017-09-10 17:44:46 +01:00
James Coglan c37d0611c7 Use package.json instead of .npmignore to set files in the package. 2017-09-10 17:44:00 +01:00
James Coglan d8d38e54e6 Catch synchronous errors thrown by extensions. 2017-09-08 21:18:07 +01:00
James Coglan fb84d36546 Fix a couple of race conditions in Pipeline.
While improving error handling, I found two situations where Pipeline
fails to close() correctly because it believes there are more messages
to emit.

The first is fixed by the change to `Cell.pending()`. If another message
is pushed into the pipeline after one of the cells has stopped, all the
cells get their pending count bumped. However, this new message will
never enter the queue inside the stopped cell, so its pending count will
never reach zero. So, we only increment the pending count for cells that
are not stopped.

The second is fixed by the change to `Functor._flushQueue()`. Say a cell
processes two messages in turn, M1 and M2. Both begin being processed
before either returns a result. M1 generates an error, while processing
of M2 never completes. In this situation, the error should indicate the
end of the stream but because M2 never completes, the pending count
never reaches zero. So, if we see a record with an error, we should
truncate the queue and this point and set pending=0 so the functor is
considered complete.
2017-09-08 20:59:13 +01:00
James Coglan 36cc2c5c73 Correct a spelling error in the spec. 2017-09-02 12:18:05 +01:00
James Coglan b315aa08d6 Drop testing for io.js releases, which barely anybody is still using. 2017-08-01 23:48:09 +01:00
James Coglan b23eb5b890 Drop support for Node 0.6, add Node 7 and 8. 2017-08-01 00:53:15 +01:00
James Coglan 7319766a5e Remove non-breaking spaces from README. 2016-10-08 03:09:55 +01:00
James Coglan 9418affa01 Test on Node 6.0. 2016-04-30 13:08:34 +01:00
James Coglan b27d4cebf8 Create CODE_OF_CONDUCT.md. 2015-11-08 12:16:15 +00:00
James Coglan 2792339b4d Add a missing semicolon. 2015-11-06 22:10:26 +00:00
James Coglan 5e5f1f454f Test on Node 5. 2015-11-05 21:22:19 +00:00
James Coglan 2365c0aef2 Use Travis containers. 2015-10-17 12:54:29 +01:00
James Coglan 6669e323c3 Test on major versions of iojs and node 4. 2015-10-17 12:50:54 +01:00
James Coglan c8f31cc1c7 Reversing the previous commit; generateResponse() should throw on invalid heders (as should activate()), because the server should fail the connection in this event. 2015-03-26 08:30:23 +00:00
James Coglan 62ac506b80 If the header from the client is invalid, just ignore it and build a pipeline with no sessions. 2015-03-14 12:56:41 +00:00
15 changed files with 183 additions and 109 deletions
-6
View File
@@ -1,6 +0,0 @@
.git
.gitignore
.npmignore
.travis.yml
node_modules
spec
+8 -3
View File
@@ -1,11 +1,16 @@
sudo: false
language: node_js
node_js:
- "0.6"
- "0.8"
- "0.10"
- "0.12"
- "iojs"
- "4"
- "5"
- "6"
- "7"
- "8"
- "9"
before_install:
- '[ "${TRAVIS_NODE_VERSION}" = "0.6" ] && npm conf set strict-ssl false || true'
- '[ "${TRAVIS_NODE_VERSION}" = "0.8" ] && npm install -g npm@~1.4.0 || true'
+15 -1
View File
@@ -1,7 +1,21 @@
### 0.1.3 / 2017-11-11
* Accept extension names and parameters including uppercase letters
* Handle extension names that clash with `Object.prototype` properties
### 0.1.2 / 2017-09-10
* Catch synchronous exceptions thrown when calling an extension
* Fix race condition caused when a message is pushed after a cell has stopped
due to an error
* Fix failure of `close()` to return if a message that's queued after one that
produces an error never finishes being processed
### 0.1.1 / 2015-02-19
* Prevent sessions being closed before they have finished processing messages
* Add a callback to `Extensions.close()` so the caller can tell when it's safe to close the socket
* Add a callback to `Extensions.close()` so the caller can tell when it's safe
to close the socket
### 0.1.0 / 2014-12-12
+4
View File
@@ -0,0 +1,4 @@
# Code of Conduct
All projects under the [Faye](https://github.com/faye) umbrella are covered by
the [Code of Conduct](https://github.com/faye/code-of-conduct).
+20
View File
@@ -0,0 +1,20 @@
# The MIT License
Copyright (c) 2014-2017 James Coglan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the 'Software'), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
-23
View File
@@ -329,26 +329,3 @@ the session to release any resources it's using.
* Consumer: [websocket-driver](https://github.com/faye/websocket-driver-node)
* Provider: [permessage-deflate](https://github.com/faye/permessage-deflate-node)
## License
(The MIT License)
Copyright (c) 2014-2015 James Coglan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the 'Software'), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+8 -4
View File
@@ -1,13 +1,15 @@
'use strict';
var TOKEN = /([!#\$%&'\*\+\-\.\^_`\|~0-9a-z]+)/,
NOTOKEN = /([^!#\$%&'\*\+\-\.\^_`\|~0-9a-z])/g,
var TOKEN = /([!#\$%&'\*\+\-\.\^_`\|~0-9A-Za-z]+)/,
NOTOKEN = /([^!#\$%&'\*\+\-\.\^_`\|~0-9A-Za-z])/g,
QUOTED = /"((?:\\[\x00-\x7f]|[^\x00-\x08\x0a-\x1f\x7f"])*)"/,
PARAM = new RegExp(TOKEN.source + '(?:=(?:' + TOKEN.source + '|' + QUOTED.source + '))?'),
EXT = new RegExp(TOKEN.source + '(?: *; *' + PARAM.source + ')*', 'g'),
EXT_LIST = new RegExp('^' + EXT.source + '(?: *, *' + EXT.source + ')*$'),
NUMBER = /^-?(0|[1-9][0-9]*)(\.[0-9]+)?$/;
var hasOwnProperty = Object.prototype.hasOwnProperty;
var Parser = {
parseHeader: function(header) {
var offers = new Offers();
@@ -35,7 +37,7 @@ var Parser = {
}
if (NUMBER.test(data)) data = parseFloat(data);
if (offer.hasOwnProperty(key)) {
if (hasOwnProperty.call(offer, key)) {
offer[key] = [].concat(offer[key]);
offer[key].push(data);
} else {
@@ -77,7 +79,9 @@ var Offers = function() {
};
Offers.prototype.push = function(name, params) {
this._byName[name] = this._byName[name] || [];
if (!hasOwnProperty.call(this._byName, name))
this._byName[name] = [];
this._byName[name].push(params);
this._inOrder.push({name: name, params: params});
};
+2 -2
View File
@@ -228,7 +228,7 @@ is a mechanism to reorder the output so that message order is preserved for the
next session in line.
## Solution
## Solution
We now describe the model implemented here and how it meets the above design
goals. The above diagram where a stack of extensions sit between the driver and
@@ -592,7 +592,7 @@ pipeline following *m2* will be processed by `A`, since it's upstream of the
error. Those messages will be dropped by `B`.
## Alternative ideas
## Alternative ideas
I am considering implementing `Functor` as an object-mode transform stream
rather than what is essentially an async function. Being object-mode, a stream
+2 -1
View File
@@ -14,7 +14,8 @@ var Cell = function(tuple) {
};
Cell.prototype.pending = function(direction) {
this._functors[direction].pending += 1;
var functor = this._functors[direction];
if (!functor._stopped) functor.pending += 1;
};
Cell.prototype.incoming = function(error, message, callback, context) {
+14 -3
View File
@@ -27,7 +27,7 @@ Functor.prototype.call = function(error, message, callback, context) {
return this._flushQueue();
}
this._session[this._method](message, function(err, msg) {
var handler = function(err, msg) {
if (!(called ^ (called = true))) return;
if (err) {
@@ -40,7 +40,13 @@ Functor.prototype.call = function(error, message, callback, context) {
record.done = true;
self._flushQueue();
});
};
try {
this._session[this._method](message, handler);
} catch (err) {
handler(err);
}
};
Functor.prototype._stop = function() {
@@ -52,8 +58,13 @@ Functor.prototype._flushQueue = function() {
var queue = this._queue, record;
while (queue.length > 0 && queue.peek().done) {
this.pending -= 1;
record = queue.shift();
if (record.error) {
this.pending = 0;
queue.clear();
} else {
this.pending -= 1;
}
record.callback.call(record.context, record.error, record.message);
}
};
+6 -2
View File
@@ -1,10 +1,14 @@
'use strict';
var RingBuffer = function(bufferSize) {
this._buffer = new Array(bufferSize);
this._bufferSize = bufferSize;
this.clear();
};
RingBuffer.prototype.clear = function() {
this._buffer = new Array(this._bufferSize);
this._ringOffset = 0;
this._ringSize = bufferSize;
this._ringSize = this._bufferSize;
this._head = 0;
this._tail = 0;
this.length = 0;
+4 -4
View File
@@ -9,7 +9,7 @@ var Extensions = function() {
this._byName = {};
this._inOrder = [];
this._sessions = [];
this._index = {}
this._index = {};
};
Extensions.MESSAGE_OPCODES = [1, 2];
@@ -89,9 +89,9 @@ var instance = {
},
generateResponse: function(header) {
var offers = Parser.parseHeader(header),
sessions = [],
response = [];
var sessions = [],
response = [],
offers = Parser.parseHeader(header);
this._inOrder.forEach(function(ext) {
var offer = offers.byName(ext.name);
+6 -4
View File
@@ -5,12 +5,14 @@
, "keywords" : ["websocket"]
, "license" : "MIT"
, "version" : "0.1.1"
, "engines" : {"node": ">=0.6.0"}
, "version" : "0.1.3"
, "engines" : { "node": ">=0.8.0" }
, "files" : ["lib"]
, "main" : "./lib/websocket_extensions"
, "devDependencies" : {"jstest": ""}
, "scripts" : {"test": "jstest spec/runner.js"}
, "devDependencies" : { "jstest": "*" }
, "scripts" : { "test": "jstest spec/runner.js" }
, "repository" : { "type" : "git"
, "url" : "git://github.com/faye/websocket-extensions-node.git"
+11
View File
@@ -67,6 +67,17 @@ test.describe("Parser", function() { with(this) {
{name: "a", params: {b: true}}],
parse('a; b=1, c, b; d, c; e="hi, there"; e, a; b') )
}})
it("parses an extension name that shadows an Object property", function() { with(this) {
assertEqual( [{name: "hasOwnProperty", params: {}}],
parse('hasOwnProperty') )
}})
it("parses an extension param that shadows an Object property", function() { with(this) {
var result = parse('foo; hasOwnProperty; x')[0]
assertEqual( result.params.hasOwnProperty, true )
}})
}})
describe("serializeParams", function() { with(this) {
+83 -56
View File
@@ -181,7 +181,7 @@ test.describe("Extensions", function() { with(this) {
extensions.activate("deflate, reverse")
extensions.processIncomingMessage({frames: []}, function(error, message) {
assertNull(error)
assertNull( error )
assertEqual( ["reverse", "deflate"], message.frames )
})
}})
@@ -226,66 +226,89 @@ test.describe("Extensions", function() { with(this) {
describe("error handling", function() { with(this) {
include(FakeClock)
before(function() { with(this) {
clock.stub()
extensions.activate("deflate, reverse")
sharedExamplesFor("handles errors", function() { with(this) {
before(function() { with(this) {
clock.stub()
extensions.activate("deflate, reverse")
stub(session, "processOutgoingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("a")) }, 100)
})
stub(session, "processOutgoingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("a")) }, 100)
})
stub(nonconflictSession, "processOutgoingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("b")) }, 100)
})
stub(nonconflictSession, "processOutgoingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("b")) }, 100)
})
stub(nonconflictSession, "processIncomingMessage", function(message, callback) {
if (message[0] === 5)
setTimeout(function() { callback(new Error(""), null) }, 10)
else
stub(nonconflictSession, "processIncomingMessage", function(message, callback) {
if (message[0] === 5) return emitError(callback)
setTimeout(function() { callback(null, message.concat("c")) }, 50)
})
stub(session, "processIncomingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("d")) }, 100)
})
stub(session, "close")
stub(nonconflictSession, "close")
this.messages = []
var push = function(error, message) {
if (error) extensions.close(function() { messages.push("close") })
messages.push(message)
}
;[1, 2, 3].forEach(function(n) {
extensions.processOutgoingMessage([n], push)
})
;[4, 5, 6].forEach(function(n, i) {
setTimeout(function() {
extensions.processIncomingMessage([n], push)
}, 20 * i)
})
clock.tick(200)
}})
it("allows the message before the error through to the end", function() { with(this) {
assertEqual( [4, "c", "d"], messages[0] )
}})
it("yields the error to the end of the pipeline", function() { with(this) {
assertNull( messages[1] )
}})
it("does not yield the message after the error", function() { with(this) {
assertNotEqual( arrayIncluding([6, "c", "d"]), messages )
}})
it("yields all the messages in the direction unaffected by the error", function() { with(this) {
assertEqual( [1, "a", "b"], messages[2] )
assertEqual( [2, "a", "b"], messages[3] )
assertEqual( [3, "a", "b"], messages[4] )
}})
it("closes after all messages are processed", function() { with(this) {
assertEqual( "close", messages[5] )
assertEqual( 6, messages.length )
}})
}})
describe("with a sync error", function() { with(this) {
define("emitError", function(callback) {
throw new Error("sync error")
})
stub(session, "processIncomingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("d")) }, 100)
itShouldBehaveLike("handles errors")
}})
describe("with an async error", function() { with(this) {
define("emitError", function(callback) {
setTimeout(function() { callback(new Error("async error"), null) }, 10)
})
stub(session, "close")
stub(nonconflictSession, "close")
this.messages = []
var push = function(error, message) {
if (error) extensions.close(function() { messages.push("close") })
messages.push(message)
}
;[1, 2, 3].forEach(function(n) { extensions.processOutgoingMessage([n], push) })
;[4, 5, 6].forEach(function(n) { extensions.processIncomingMessage([n], push) })
clock.tick(200)
}})
it("allows the message before the error through to the end", function() { with(this) {
assertEqual( [4, "c", "d"], messages[0] )
}})
it("yields the error to the end of the pipeline", function() { with(this) {
assertNull( messages[1] )
}})
it("does not yield the message after the error", function() { with(this) {
assertNotEqual( arrayIncluding([6, "c", "d"]), messages )
}})
it("yields all the messages in the direction unaffected by the error", function() { with(this) {
assertEqual( [1, "a", "b"], messages[2] )
assertEqual( [2, "a", "b"], messages[3] )
assertEqual( [3, "a", "b"], messages[4] )
}})
it("closes after all messages are processed", function() { with(this) {
assertEqual( "close", messages[5] )
assertEqual( 6, messages.length )
itShouldBehaveLike("handles errors")
}})
}})
@@ -375,7 +398,7 @@ test.describe("Extensions", function() { with(this) {
extensions.activate("deflate, reverse")
extensions.processOutgoingMessage({frames: []}, function(error, message) {
assertNull(error)
assertNull( error )
assertEqual( ["deflate", "reverse"], message.frames )
})
}})
@@ -384,7 +407,7 @@ test.describe("Extensions", function() { with(this) {
extensions.activate("reverse, deflate")
extensions.processOutgoingMessage({frames: []}, function(error, message) {
assertNull(error)
assertNull( error )
assertEqual( ["reverse", "deflate"], message.frames )
})
}})
@@ -489,7 +512,11 @@ test.describe("Extensions", function() { with(this) {
assertEqual( "deflate; mode=compress", extensions.generateResponse("deflate, tar") )
}})
it("returns a response for potentially conflicting extensions if their preceeding extensions don't build a session", function() { with(this) {
it("throws an error if the header is invalid", function() { with(this) {
assertThrows(SyntaxError, function() { extensions.generateResponse("x-webkit- -frame") })
}})
it("returns a response for potentially conflicting extensions if their preceding extensions don't build a session", function() { with(this) {
stub(ext, "createServerSession").returns(null)
assertEqual( "tar; gzip", extensions.generateResponse("deflate, tar") )
}})