Files
James Coglan fb84d36546 Fix a couple of race conditions in Pipeline.
While improving error handling, I found two situations where Pipeline
fails to close() correctly because it believes there are more messages
to emit.

The first is fixed by the change to `Cell.pending()`. If another message
is pushed into the pipeline after one of the cells has stopped, all the
cells get their pending count bumped. However, this new message will
never enter the queue inside the stopped cell, so its pending count will
never reach zero. So, we only increment the pending count for cells that
are not stopped.

The second is fixed by the change to `Functor._flushQueue()`. Say a cell
processes two messages in turn, M1 and M2. Both begin being processed
before either returns a result. M1 generates an error, while processing
of M2 never completes. In this situation, the error should indicate the
end of the stream but because M2 never completes, the pending count
never reaches zero. So, if we see a record with an error, we should
truncate the queue and this point and set pending=0 so the functor is
considered complete.
2017-09-08 20:59:13 +01:00

54 lines
1.4 KiB
JavaScript

'use strict';
var Functor = require('./functor'),
Pledge = require('./pledge');
var Cell = function(tuple) {
this._ext = tuple[0];
this._session = tuple[1];
this._functors = {
incoming: new Functor(this._session, 'processIncomingMessage'),
outgoing: new Functor(this._session, 'processOutgoingMessage')
};
};
Cell.prototype.pending = function(direction) {
var functor = this._functors[direction];
if (!functor._stopped) functor.pending += 1;
};
Cell.prototype.incoming = function(error, message, callback, context) {
this._exec('incoming', error, message, callback, context);
};
Cell.prototype.outgoing = function(error, message, callback, context) {
this._exec('outgoing', error, message, callback, context);
};
Cell.prototype.close = function() {
this._closed = this._closed || new Pledge();
this._doClose();
return this._closed;
};
Cell.prototype._exec = function(direction, error, message, callback, context) {
this._functors[direction].call(error, message, function(err, msg) {
if (err) err.message = this._ext.name + ': ' + err.message;
callback.call(context, err, msg);
this._doClose();
}, this);
};
Cell.prototype._doClose = function() {
var fin = this._functors.incoming,
fout = this._functors.outgoing;
if (!this._closed || fin.pending + fout.pending !== 0) return;
if (this._session) this._session.close();
this._session = null;
this._closed.done();
};
module.exports = Cell;