From d8d38e54e6ee8449d067a0b48bbcc73aefc52a3f Mon Sep 17 00:00:00 2001 From: James Coglan Date: Fri, 8 Sep 2017 21:18:07 +0100 Subject: [PATCH] Catch synchronous errors thrown by extensions. --- lib/pipeline/functor.js | 10 ++- spec/websocket_extensions_spec.js | 134 +++++++++++++++++------------- 2 files changed, 83 insertions(+), 61 deletions(-) diff --git a/lib/pipeline/functor.js b/lib/pipeline/functor.js index a36a0aa..f6c5f3b 100644 --- a/lib/pipeline/functor.js +++ b/lib/pipeline/functor.js @@ -27,7 +27,7 @@ Functor.prototype.call = function(error, message, callback, context) { return this._flushQueue(); } - this._session[this._method](message, function(err, msg) { + var handler = function(err, msg) { if (!(called ^ (called = true))) return; if (err) { @@ -40,7 +40,13 @@ Functor.prototype.call = function(error, message, callback, context) { record.done = true; self._flushQueue(); - }); + }; + + try { + this._session[this._method](message, handler); + } catch (err) { + handler(err); + } }; Functor.prototype._stop = function() { diff --git a/spec/websocket_extensions_spec.js b/spec/websocket_extensions_spec.js index d1a032c..9988f47 100644 --- a/spec/websocket_extensions_spec.js +++ b/spec/websocket_extensions_spec.js @@ -226,73 +226,89 @@ test.describe("Extensions", function() { with(this) { describe("error handling", function() { with(this) { include(FakeClock) - before(function() { with(this) { - clock.stub() - extensions.activate("deflate, reverse") + sharedExamplesFor("handles errors", function() { with(this) { + before(function() { with(this) { + clock.stub() + extensions.activate("deflate, reverse") - stub(session, "processOutgoingMessage", function(message, callback) { - setTimeout(function() { callback(null, message.concat("a")) }, 100) - }) + stub(session, "processOutgoingMessage", function(message, callback) { + setTimeout(function() { callback(null, message.concat("a")) }, 100) + }) - stub(nonconflictSession, "processOutgoingMessage", function(message, callback) { - setTimeout(function() { callback(null, message.concat("b")) }, 100) - }) + stub(nonconflictSession, "processOutgoingMessage", function(message, callback) { + setTimeout(function() { callback(null, message.concat("b")) }, 100) + }) - stub(nonconflictSession, "processIncomingMessage", function(message, callback) { - if (message[0] === 5) - setTimeout(function() { callback(new Error(""), null) }, 10) - else + stub(nonconflictSession, "processIncomingMessage", function(message, callback) { + if (message[0] === 5) return emitError(callback) setTimeout(function() { callback(null, message.concat("c")) }, 50) + }) + + stub(session, "processIncomingMessage", function(message, callback) { + setTimeout(function() { callback(null, message.concat("d")) }, 100) + }) + + stub(session, "close") + stub(nonconflictSession, "close") + + this.messages = [] + + var push = function(error, message) { + if (error) extensions.close(function() { messages.push("close") }) + messages.push(message) + } + + ;[1, 2, 3].forEach(function(n) { + extensions.processOutgoingMessage([n], push) + }) + + ;[4, 5, 6].forEach(function(n, i) { + setTimeout(function() { + extensions.processIncomingMessage([n], push) + }, 20 * i) + }) + + clock.tick(200) + }}) + + it("allows the message before the error through to the end", function() { with(this) { + assertEqual( [4, "c", "d"], messages[0] ) + }}) + + it("yields the error to the end of the pipeline", function() { with(this) { + assertNull( messages[1] ) + }}) + + it("does not yield the message after the error", function() { with(this) { + assertNotEqual( arrayIncluding([6, "c", "d"]), messages ) + }}) + + it("yields all the messages in the direction unaffected by the error", function() { with(this) { + assertEqual( [1, "a", "b"], messages[2] ) + assertEqual( [2, "a", "b"], messages[3] ) + assertEqual( [3, "a", "b"], messages[4] ) + }}) + + it("closes after all messages are processed", function() { with(this) { + assertEqual( "close", messages[5] ) + assertEqual( 6, messages.length ) + }}) + }}) + + describe("with a sync error", function() { with(this) { + define("emitError", function(callback) { + throw new Error("sync error") }) - stub(session, "processIncomingMessage", function(message, callback) { - setTimeout(function() { callback(null, message.concat("d")) }, 100) + itShouldBehaveLike("handles errors") + }}) + + describe("with an async error", function() { with(this) { + define("emitError", function(callback) { + setTimeout(function() { callback(new Error("async error"), null) }, 10) }) - stub(session, "close") - stub(nonconflictSession, "close") - - this.messages = [] - - var push = function(error, message) { - if (error) extensions.close(function() { messages.push("close") }) - messages.push(message) - } - - ;[1, 2, 3].forEach(function(n) { - extensions.processOutgoingMessage([n], push) - }) - - ;[4, 5, 6].forEach(function(n, i) { - setTimeout(function() { - extensions.processIncomingMessage([n], push) - }, 20 * i) - }) - - clock.tick(200) - }}) - - it("allows the message before the error through to the end", function() { with(this) { - assertEqual( [4, "c", "d"], messages[0] ) - }}) - - it("yields the error to the end of the pipeline", function() { with(this) { - assertNull( messages[1] ) - }}) - - it("does not yield the message after the error", function() { with(this) { - assertNotEqual( arrayIncluding([6, "c", "d"]), messages ) - }}) - - it("yields all the messages in the direction unaffected by the error", function() { with(this) { - assertEqual( [1, "a", "b"], messages[2] ) - assertEqual( [2, "a", "b"], messages[3] ) - assertEqual( [3, "a", "b"], messages[4] ) - }}) - - it("closes after all messages are processed", function() { with(this) { - assertEqual( "close", messages[5] ) - assertEqual( 6, messages.length ) + itShouldBehaveLike("handles errors") }}) }})