fb84d36546
While improving error handling, I found two situations where Pipeline fails to close() correctly because it believes there are more messages to emit. The first is fixed by the change to `Cell.pending()`. If another message is pushed into the pipeline after one of the cells has stopped, all the cells get their pending count bumped. However, this new message will never enter the queue inside the stopped cell, so its pending count will never reach zero. So, we only increment the pending count for cells that are not stopped. The second is fixed by the change to `Functor._flushQueue()`. Say a cell processes two messages in turn, M1 and M2. Both begin being processed before either returns a result. M1 generates an error, while processing of M2 never completes. In this situation, the error should indicate the end of the stream but because M2 never completes, the pending count never reaches zero. So, if we see a record with an error, we should truncate the queue and this point and set pending=0 so the functor is considered complete.
67 lines
1.6 KiB
JavaScript
67 lines
1.6 KiB
JavaScript
'use strict';
|
|
|
|
var RingBuffer = function(bufferSize) {
|
|
this._bufferSize = bufferSize;
|
|
this.clear();
|
|
};
|
|
|
|
RingBuffer.prototype.clear = function() {
|
|
this._buffer = new Array(this._bufferSize);
|
|
this._ringOffset = 0;
|
|
this._ringSize = this._bufferSize;
|
|
this._head = 0;
|
|
this._tail = 0;
|
|
this.length = 0;
|
|
};
|
|
|
|
RingBuffer.prototype.push = function(value) {
|
|
var expandBuffer = false,
|
|
expandRing = false;
|
|
|
|
if (this._ringSize < this._bufferSize) {
|
|
expandBuffer = (this._tail === 0);
|
|
} else if (this._ringOffset === this._ringSize) {
|
|
expandBuffer = true;
|
|
expandRing = (this._tail === 0);
|
|
}
|
|
|
|
if (expandBuffer) {
|
|
this._tail = this._bufferSize;
|
|
this._buffer = this._buffer.concat(new Array(this._bufferSize));
|
|
this._bufferSize = this._buffer.length;
|
|
|
|
if (expandRing)
|
|
this._ringSize = this._bufferSize;
|
|
}
|
|
|
|
this._buffer[this._tail] = value;
|
|
this.length += 1;
|
|
if (this._tail < this._ringSize) this._ringOffset += 1;
|
|
this._tail = (this._tail + 1) % this._bufferSize;
|
|
};
|
|
|
|
RingBuffer.prototype.peek = function() {
|
|
if (this.length === 0) return void 0;
|
|
return this._buffer[this._head];
|
|
};
|
|
|
|
RingBuffer.prototype.shift = function() {
|
|
if (this.length === 0) return void 0;
|
|
|
|
var value = this._buffer[this._head];
|
|
this._buffer[this._head] = void 0;
|
|
this.length -= 1;
|
|
this._ringOffset -= 1;
|
|
|
|
if (this._ringOffset === 0 && this.length > 0) {
|
|
this._head = this._ringSize;
|
|
this._ringOffset = this.length;
|
|
this._ringSize = this._bufferSize;
|
|
} else {
|
|
this._head = (this._head + 1) % this._ringSize;
|
|
}
|
|
return value;
|
|
};
|
|
|
|
module.exports = RingBuffer;
|