6 Commits

8 changed files with 146 additions and 66 deletions
+14
View File
@@ -0,0 +1,14 @@
### 0.1.2 / 2014-12-18
* Don't allow configure() to be called with unrecognized options
### 0.1.1 / 2014-12-15
* Fix race condition when using context takeover, where adjacent messages have
data listeners bound at the same time and end up duplicating output
* Use `DeflateRaw.flush()` correctly on v0.10 so that optimal compression is
achieved
### 0.1.0 / 2014-12-13
* Initial release
+2 -2
View File
@@ -41,12 +41,12 @@ exts.add(deflate);
The set of available options can be split into two sets: those that control the
session's compressor for outgoing messages and do not need to be communicated to
the peer, and those that are negoatiated as part of the protocol. The settings
the peer, and those that are negotiated as part of the protocol. The settings
only affecting the compressor are described fully in the [zlib
documentation](http://nodejs.org/api/zlib.html#zlib_options):
* `level`: sets the compression level, can be an integer from `0` to `9`, or one
of the contants `zlib.Z_NO_COMPRESSION`, `zlib.Z_BEST_SPEED`,
of the constants `zlib.Z_NO_COMPRESSION`, `zlib.Z_BEST_SPEED`,
`zlib.Z_BEST_COMPRESSION`, or `zlib.Z_DEFAULT_COMPRESSION`
* `memLevel`: sets how much memory the compressor allocates, can be an integer
from `1` to `9`, or one of the constants `zlib.Z_MIN_MEMLEVEL`,
+7
View File
@@ -29,6 +29,13 @@ var common = {
return _default;
},
validateOptions: function(options, validKeys) {
for (var key in options) {
if (validKeys.indexOf(key) < 0)
throw new Error('Unrecognized option: ' + key);
}
},
validParams: function(params) {
var keys = Object.keys(params), i = keys.length;
while (i--) {
+13 -1
View File
@@ -1,10 +1,22 @@
'use strict';
var ClientSession = require('./client_session'),
ServerSession = require('./server_session');
ServerSession = require('./server_session'),
common = require('./common');
var VALID_OPTIONS = [
'level',
'memLevel',
'strategy',
'noContextTakeover',
'maxWindowBits',
'requestNoContextTakeover',
'requestMaxWindowBits'
];
var PermessageDeflate = {
configure: function(options) {
common.validateOptions(options, VALID_OPTIONS);
var opts = this._options || {};
for (var key in opts) options[key] = opts[key];
return Object.create(this, {_options: {value: options}});
+86 -41
View File
@@ -3,6 +3,8 @@
var zlib = require('zlib'),
common = require('./common');
var VERSION = process.version.match(/\d+/g).map(function(n) { return parseInt(n, 10) });
var Session = function(options) {
this._level = common.fetch(options, 'level', zlib.Z_DEFAULT_LEVEL);
this._memLevel = common.fetch(options, 'memLevel', zlib.Z_DEFAULT_MEMLEVEL);
@@ -12,34 +14,106 @@ var Session = function(options) {
this._acceptMaxWindowBits = common.fetch(options, 'maxWindowBits', undefined);
this._requestNoContextTakeover = common.fetch(options, 'requestNoContextTakeover', false);
this._requestMaxWindowBits = common.fetch(options, 'requestMaxWindowBits', undefined);
this._queueIn = [];
this._queueOut = [];
};
Session.prototype.processIncomingMessage = function(message, callback) {
if (!message.rsv1) return callback(null, message);
if (this._lockIn) return this._queueIn.push([message, callback]);
var inflate = this._getInflate(),
chunks = [message.data, new Buffer([0x00, 0x00, 0xff, 0xff])],
chunks = [],
length = 0,
self = this;
this._processMessage(inflate, chunks, function(error, data) {
if (this._inflate) this._lockIn = true;
var return_ = function(error, message) {
return_ = function() {};
inflate.removeListener('data', onData);
inflate.removeListener('error', onError);
if (!self._inflate && inflate.close) inflate.close();
if (error) return callback(error, null);
message.data = data;
callback(null, message);
self._lockIn = false;
var next = self._queueIn.shift();
if (next) self.processIncomingMessage.apply(self, next);
callback(error, message);
};
var onData = function(data) {
chunks.push(data);
length += data.length;
};
var onError = function(error) {
return_(error, null);
};
inflate.on('data', onData);
inflate.on('error', onError);
inflate.write(message.data);
inflate.write(new Buffer([0x00, 0x00, 0xff, 0xff]));
inflate.flush(function() {
message.data = common.concat(chunks, length);
return_(null, message);
});
};
Session.prototype.processOutgoingMessage = function(message, callback) {
if (this._lockOut) return this._queueOut.push([message, callback]);
var deflate = this._getDeflate(),
chunks = [],
length = 0,
self = this;
this._processMessage(deflate, [message.data], function(error, data) {
if (this._deflate) this._lockOut = true;
var return_ = function(error, message) {
return_ = function() {};
deflate.removeListener('data', onData);
deflate.removeListener('error', onError);
if (!self._deflate && deflate.close) deflate.close();
if (error) return callback(error, null);
message.rsv1 = true;
message.data = data.slice(0, data.length - 4);
callback(null, message);
});
self._lockOut = false;
var next = self._queueOut.shift();
if (next) self.processOutgoingMessage.apply(self, next);
callback(error, message);
};
var onData = function(data) {
var tail = data.slice(Math.max(data.length - 4, 0), data.length),
isEnd = (tail[0] === 0x00 && tail[1] === 0x00 && tail[2] === 0xff && tail[3] === 0xff);
if (isEnd) data = data.slice(0, data.length - 4);
chunks.push(data);
length += data.length;
if (isEnd) {
message.rsv1 = true;
message.data = common.concat(chunks, length);
return_(null, message);
}
};
var onError = function(error) {
return_(error, null);
};
deflate.on('data', onData);
deflate.on('error', onError);
deflate.write(message.data);
if (VERSION[0] === 0 && VERSION[1] < 10) deflate.flush();
};
Session.prototype.close = function() {
@@ -61,6 +135,7 @@ Session.prototype._getDeflate = function() {
if (this._deflate) return this._deflate;
var deflate = zlib.createDeflateRaw({
flush: zlib.Z_SYNC_FLUSH,
windowBits: this._ownWindowBits,
level: this._level,
memLevel: this._memLevel,
@@ -70,34 +145,4 @@ Session.prototype._getDeflate = function() {
return deflate;
};
Session.prototype._processMessage = function(codec, data, callback) {
var chunks = [],
length = 0;
var return_ = function(error, result) {
return_ = function() {};
codec.removeListener('data', onData);
codec.removeListener('error', onError);
codec = null;
callback(error, result);
};
var onData = function(chunk) {
chunks.push(chunk);
length += chunk.length;
};
var onError = function(error) {
return_(error, null);
};
codec.on('data', onData);
codec.on('error', onError);
data.forEach(function(c) { codec.write(c) });
codec.flush(function() {
return_(null, common.concat(chunks, length));
});
};
module.exports = Session;
+1 -1
View File
@@ -5,7 +5,7 @@
, "keywords" : ["websocket", "compression", "deflate"]
, "license" : "MIT"
, "version" : "0.1.0"
, "version" : "0.1.2"
, "engines" : {"node": ">=0.6.0"}
, "main" : "./lib/permessage_deflate"
, "devDependencies" : {"jstest": ""}
+11 -10
View File
@@ -9,6 +9,7 @@ test.describe("ClientSession", function() { with(this) {
this.deflate = zlibMock()
this.inflate = zlibMock()
this.flush = zlib.Z_SYNC_FLUSH
this.level = zlib.Z_DEFAULT_LEVEL
this.memLevel = zlib.Z_DEFAULT_MEMLEVEL
this.strategy = zlib.Z_DEFAULT_STRATEGY
@@ -24,7 +25,7 @@ test.describe("ClientSession", function() { with(this) {
this.stub(stream, "removeListener")
this.stub(stream, "write")
this.stub(stream, "flush").yields([])
this.stub(stream, "flush", function(cb) { if(cb) cb() });
this.stub(stream, "close").raises(new Error("unexpected close()"))
return stream
@@ -70,7 +71,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -101,7 +102,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses no context takeover and 15 window bits to deflate outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(deflate, "close").exactly(2)
processOutgoingMessage()
processOutgoingMessage()
@@ -140,7 +141,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 8 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -169,7 +170,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses no context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(deflate, "close").exactly(2)
processOutgoingMessage()
processOutgoingMessage()
@@ -191,7 +192,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 9 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 9, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 9, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -214,7 +215,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 8 window bits for deflating outgoing messages", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -309,7 +310,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the level of the deflate stream", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
processOutgoingMessage()
}})
}})
@@ -319,7 +320,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the memLevel of the deflate stream", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
processOutgoingMessage()
}})
}})
@@ -329,7 +330,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the strategy of the deflate stream", function() { with(this) {
activate()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
processOutgoingMessage()
}})
}})
+12 -11
View File
@@ -9,6 +9,7 @@ test.describe("ClientSession", function() { with(this) {
this.deflate = zlibMock()
this.inflate = zlibMock()
this.flush = zlib.Z_SYNC_FLUSH
this.level = zlib.Z_DEFAULT_LEVEL
this.memLevel = zlib.Z_DEFAULT_MEMLEVEL
this.strategy = zlib.Z_DEFAULT_STRATEGY
@@ -24,7 +25,7 @@ test.describe("ClientSession", function() { with(this) {
this.stub(stream, "removeListener")
this.stub(stream, "write")
this.stub(stream, "flush").yields([])
this.stub(stream, "flush", function(cb) { if(cb) cb() });
this.stub(stream, "close").raises(new Error("unexpected close()"))
return stream
@@ -61,7 +62,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -76,7 +77,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses no context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(deflate, "close").exactly(2)
processOutgoingMessage()
processOutgoingMessage()
@@ -108,7 +109,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 13 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 13, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 13, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -171,7 +172,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses no context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
expect(deflate, "close").exactly(2)
processOutgoingMessage()
processOutgoingMessage()
@@ -189,7 +190,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 12 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -204,7 +205,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 12 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -219,7 +220,7 @@ test.describe("ClientSession", function() { with(this) {
it("uses context takeover and 11 window bits for deflating outgoing messages", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 11, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 11, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
processOutgoingMessage()
processOutgoingMessage()
}})
@@ -311,7 +312,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the level of the deflate stream", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
processOutgoingMessage()
}})
}})
@@ -321,7 +322,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the memLevel of the deflate stream", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
processOutgoingMessage()
}})
}})
@@ -331,7 +332,7 @@ test.describe("ClientSession", function() { with(this) {
it("sets the strategy of the deflate stream", function() { with(this) {
response()
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
processOutgoingMessage()
}})
}})