Catch synchronous errors thrown by extensions.

This commit is contained in:
James Coglan
2017-09-08 21:18:07 +01:00
parent fb84d36546
commit d8d38e54e6
2 changed files with 83 additions and 61 deletions
+8 -2
View File
@@ -27,7 +27,7 @@ Functor.prototype.call = function(error, message, callback, context) {
return this._flushQueue();
}
this._session[this._method](message, function(err, msg) {
var handler = function(err, msg) {
if (!(called ^ (called = true))) return;
if (err) {
@@ -40,7 +40,13 @@ Functor.prototype.call = function(error, message, callback, context) {
record.done = true;
self._flushQueue();
});
};
try {
this._session[this._method](message, handler);
} catch (err) {
handler(err);
}
};
Functor.prototype._stop = function() {
+75 -59
View File
@@ -226,73 +226,89 @@ test.describe("Extensions", function() { with(this) {
describe("error handling", function() { with(this) {
include(FakeClock)
before(function() { with(this) {
clock.stub()
extensions.activate("deflate, reverse")
sharedExamplesFor("handles errors", function() { with(this) {
before(function() { with(this) {
clock.stub()
extensions.activate("deflate, reverse")
stub(session, "processOutgoingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("a")) }, 100)
})
stub(session, "processOutgoingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("a")) }, 100)
})
stub(nonconflictSession, "processOutgoingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("b")) }, 100)
})
stub(nonconflictSession, "processOutgoingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("b")) }, 100)
})
stub(nonconflictSession, "processIncomingMessage", function(message, callback) {
if (message[0] === 5)
setTimeout(function() { callback(new Error(""), null) }, 10)
else
stub(nonconflictSession, "processIncomingMessage", function(message, callback) {
if (message[0] === 5) return emitError(callback)
setTimeout(function() { callback(null, message.concat("c")) }, 50)
})
stub(session, "processIncomingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("d")) }, 100)
})
stub(session, "close")
stub(nonconflictSession, "close")
this.messages = []
var push = function(error, message) {
if (error) extensions.close(function() { messages.push("close") })
messages.push(message)
}
;[1, 2, 3].forEach(function(n) {
extensions.processOutgoingMessage([n], push)
})
;[4, 5, 6].forEach(function(n, i) {
setTimeout(function() {
extensions.processIncomingMessage([n], push)
}, 20 * i)
})
clock.tick(200)
}})
it("allows the message before the error through to the end", function() { with(this) {
assertEqual( [4, "c", "d"], messages[0] )
}})
it("yields the error to the end of the pipeline", function() { with(this) {
assertNull( messages[1] )
}})
it("does not yield the message after the error", function() { with(this) {
assertNotEqual( arrayIncluding([6, "c", "d"]), messages )
}})
it("yields all the messages in the direction unaffected by the error", function() { with(this) {
assertEqual( [1, "a", "b"], messages[2] )
assertEqual( [2, "a", "b"], messages[3] )
assertEqual( [3, "a", "b"], messages[4] )
}})
it("closes after all messages are processed", function() { with(this) {
assertEqual( "close", messages[5] )
assertEqual( 6, messages.length )
}})
}})
describe("with a sync error", function() { with(this) {
define("emitError", function(callback) {
throw new Error("sync error")
})
stub(session, "processIncomingMessage", function(message, callback) {
setTimeout(function() { callback(null, message.concat("d")) }, 100)
itShouldBehaveLike("handles errors")
}})
describe("with an async error", function() { with(this) {
define("emitError", function(callback) {
setTimeout(function() { callback(new Error("async error"), null) }, 10)
})
stub(session, "close")
stub(nonconflictSession, "close")
this.messages = []
var push = function(error, message) {
if (error) extensions.close(function() { messages.push("close") })
messages.push(message)
}
;[1, 2, 3].forEach(function(n) {
extensions.processOutgoingMessage([n], push)
})
;[4, 5, 6].forEach(function(n, i) {
setTimeout(function() {
extensions.processIncomingMessage([n], push)
}, 20 * i)
})
clock.tick(200)
}})
it("allows the message before the error through to the end", function() { with(this) {
assertEqual( [4, "c", "d"], messages[0] )
}})
it("yields the error to the end of the pipeline", function() { with(this) {
assertNull( messages[1] )
}})
it("does not yield the message after the error", function() { with(this) {
assertNotEqual( arrayIncluding([6, "c", "d"]), messages )
}})
it("yields all the messages in the direction unaffected by the error", function() { with(this) {
assertEqual( [1, "a", "b"], messages[2] )
assertEqual( [2, "a", "b"], messages[3] )
assertEqual( [3, "a", "b"], messages[4] )
}})
it("closes after all messages are processed", function() { with(this) {
assertEqual( "close", messages[5] )
assertEqual( 6, messages.length )
itShouldBehaveLike("handles errors")
}})
}})