4 Commits

6 changed files with 122 additions and 65 deletions
+10
View File
@@ -0,0 +1,10 @@
### 0.1.1 / 2014-12-15
* Fix race condition when using context takeover, where adjacent messages have
data listeners bound at the same time and end up duplicating output
* Use `DeflateRaw.flush()` correctly on v0.10 so that optimal compression is
achieved
### 0.1.0 / 2014-12-13
* Initial release
+2 -2
View File
@@ -41,12 +41,12 @@ exts.add(deflate);
The set of available options can be split into two sets: those that control the
session's compressor for outgoing messages and do not need to be communicated to
the peer, and those that are negoatiated as part of the protocol. The settings
the peer, and those that are negotiated as part of the protocol. The settings
only affecting the compressor are described fully in the [zlib
documentation](http://nodejs.org/api/zlib.html#zlib_options):
* `level`: sets the compression level, can be an integer from `0` to `9`, or one
of the contants `zlib.Z_NO_COMPRESSION`, `zlib.Z_BEST_SPEED`,
of the constants `zlib.Z_NO_COMPRESSION`, `zlib.Z_BEST_SPEED`,
`zlib.Z_BEST_COMPRESSION`, or `zlib.Z_DEFAULT_COMPRESSION`
* `memLevel`: sets how much memory the compressor allocates, can be an integer
from `1` to `9`, or one of the constants `zlib.Z_MIN_MEMLEVEL`,
+86 -41
View File
@@ -3,6 +3,8 @@
var zlib = require('zlib'),
common = require('./common');
var VERSION = process.version.match(/\d+/g).map(function(n) { return parseInt(n, 10) });
var Session = function(options) {
this._level = common.fetch(options, 'level', zlib.Z_DEFAULT_LEVEL);
this._memLevel = common.fetch(options, 'memLevel', zlib.Z_DEFAULT_MEMLEVEL);
@@ -12,34 +14,106 @@ var Session = function(options) {
this._acceptMaxWindowBits = common.fetch(options, 'maxWindowBits', undefined);
this._requestNoContextTakeover = common.fetch(options, 'requestNoContextTakeover', false);
this._requestMaxWindowBits = common.fetch(options, 'requestMaxWindowBits', undefined);
this._queueIn = [];
this._queueOut = [];
};
Session.prototype.processIncomingMessage = function(message, callback) {
if (!message.rsv1) return callback(null, message);
if (this._lockIn) return this._queueIn.push([message, callback]);
var inflate = this._getInflate(),
chunks = [message.data, new Buffer([0x00, 0x00, 0xff, 0xff])],
chunks = [],
length = 0,
self = this;
this._processMessage(inflate, chunks, function(error, data) {
if (this._inflate) this._lockIn = true;
var return_ = function(error, message) {
return_ = function() {};
inflate.removeListener('data', onData);
inflate.removeListener('error', onError);
if (!self._inflate && inflate.close) inflate.close();
if (error) return callback(error, null);
message.data = data;
callback(null, message);
self._lockIn = false;
var next = self._queueIn.shift();
if (next) self.processIncomingMessage.apply(self, next);
callback(error, message);
};
var onData = function(data) {
chunks.push(data);
length += data.length;
};
var onError = function(error) {
return_(error, null);
};
inflate.on('data', onData);
inflate.on('error', onError);
inflate.write(message.data);
inflate.write(new Buffer([0x00, 0x00, 0xff, 0xff]));
inflate.flush(function() {
message.data = common.concat(chunks, length);
return_(null, message);
});
};
Session.prototype.processOutgoingMessage = function(message, callback) {
if (this._lockOut) return this._queueOut.push([message, callback]);
var deflate = this._getDeflate(),
chunks = [],
length = 0,
self = this;
this._processMessage(deflate, [message.data], function(error, data) {
if (this._deflate) this._lockOut = true;
var return_ = function(error, message) {
return_ = function() {};
deflate.removeListener('data', onData);
deflate.removeListener('error', onError);
if (!self._deflate && deflate.close) deflate.close();
if (error) return callback(error, null);
message.rsv1 = true;
message.data = data.slice(0, data.length - 4);
callback(null, message);
});
self._lockOut = false;
var next = self._queueOut.shift();
if (next) self.processOutgoingMessage.apply(self, next);
callback(error, message);
};
var onData = function(data) {
var tail = data.slice(Math.max(data.length - 4, 0), data.length),
isEnd = (tail[0] === 0x00 && tail[1] === 0x00 && tail[2] === 0xff && tail[3] === 0xff);
if (isEnd) data = data.slice(0, data.length - 4);
chunks.push(data);
length += data.length;
if (isEnd) {
message.rsv1 = true;
message.data = common.concat(chunks, length);
return_(null, message);
}
};
var onError = function(error) {
return_(error, null);
};
deflate.on('data', onData);
deflate.on('error', onError);
deflate.write(message.data);
if (VERSION[0] === 0 && VERSION[1] < 10) deflate.flush();
};
Session.prototype.close = function() {
@@ -61,6 +135,7 @@ Session.prototype._getDeflate = function() {
if (this._deflate) return this._deflate;
var deflate = zlib.createDeflateRaw({
flush: zlib.Z_SYNC_FLUSH,
windowBits: this._ownWindowBits,
level: this._level,
memLevel: this._memLevel,
@@ -70,34 +145,4 @@ Session.prototype._getDeflate = function() {
return deflate;
};
Session.prototype._processMessage = function(codec, data, callback) {
var chunks = [],
length = 0;
var return_ = function(error, result) {
return_ = function() {};
codec.removeListener('data', onData);
codec.removeListener('error', onError);
codec = null;
callback(error, result);
};
var onData = function(chunk) {
chunks.push(chunk);
length += chunk.length;
};
var onError = function(error) {
return_(error, null);
};
codec.on('data', onData);
codec.on('error', onError);
data.forEach(function(c) { codec.write(c) });
codec.flush(function() {
return_(null, common.concat(chunks, length));
});
};
module.exports = Session;
+1 -1
View File
@@ -5,7 +5,7 @@
, "keywords" : ["websocket", "compression", "deflate"]
, "license" : "MIT"
, "version" : "0.1.0"
, "version" : "0.1.1"
, "engines" : {"node": ">=0.6.0"}
, "main" : "./lib/permessage_deflate"
, "devDependencies" : {"jstest": ""}
+11 -10
View File
@@ -9,6 +9,7 @@ test.describe("ClientSession", function() { with(this) {
this.deflate = zlibMock()
this.inflate = zlibMock()
this.flush = zlib.Z_SYNC_FLUSH
this.level = zlib.Z_DEFAULT_LEVEL
this.memLevel = zlib.Z_DEFAULT_MEMLEVEL
this.strategy = zlib.Z_DEFAULT_STRATEGY
@@ -24,7 +25,7 @@ test.describe("ClientSession", function() { with(this) {
this.stub(stream, "removeListener")
this.stub(stream, "write")
this.stub(stream, "flush").yields([])
this.stub(stream, "flush", function(cb) { if(cb) cb() });
this.stub(stream, "close").raises(new Error("unexpected close()"))
return stream
@@ -70,7 +71,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -101,7 +102,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses no context takeover and 15 window bits to deflate outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(deflate, "close").exactly(2)
processOutgoingMessage()
processOutgoingMessage()
@@ -140,7 +141,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 8 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -169,7 +170,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses no context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(deflate, "close").exactly(2)
processOutgoingMessage()
processOutgoingMessage()
@@ -191,7 +192,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 9 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 9, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 9, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -214,7 +215,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 8 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -309,7 +310,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the level of the deflate stream", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
processOutgoingMessage()
}})
}})
@@ -319,7 +320,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the memLevel of the deflate stream", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
processOutgoingMessage()
}})
}})
@@ -329,7 +330,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the strategy of the deflate stream", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
processOutgoingMessage()
}})
}})
+12 -11
View File
@@ -9,6 +9,7 @@ test.describe("ClientSession", function() { with(this) {
this.deflate = zlibMock()
this.inflate = zlibMock()
this.flush = zlib.Z_SYNC_FLUSH
this.level = zlib.Z_DEFAULT_LEVEL
this.memLevel = zlib.Z_DEFAULT_MEMLEVEL
this.strategy = zlib.Z_DEFAULT_STRATEGY
@@ -24,7 +25,7 @@ test.describe("ClientSession", function() { with(this) {
this.stub(stream, "removeListener")
this.stub(stream, "write")
this.stub(stream, "flush").yields([])
this.stub(stream, "flush", function(cb) { if(cb) cb() });
this.stub(stream, "close").raises(new Error("unexpected close()"))
return stream
@@ -61,7 +62,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -76,7 +77,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses no context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(deflate, "close").exactly(2)
processOutgoingMessage()
processOutgoingMessage()
@@ -108,7 +109,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 13 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 13, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 13, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -171,7 +172,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses no context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(deflate, "close").exactly(2)
processOutgoingMessage()
processOutgoingMessage()
@@ -189,7 +190,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 12 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -204,7 +205,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 12 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -219,7 +220,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 11 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 11, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 11, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -311,7 +312,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the level of the deflate stream", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
processOutgoingMessage()
}})
}})
@@ -321,7 +322,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the memLevel of the deflate stream", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
processOutgoingMessage()
}})
}})
@@ -331,7 +332,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the strategy of the deflate stream", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
processOutgoingMessage()
}})
}})