Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e7619d9091 | |||
| cc2978bdfb | |||
| 983c0d30c3 | |||
| 72955f0b1e |
@@ -0,0 +1,10 @@
|
||||
### 0.1.1 / 2014-12-15
|
||||
|
||||
* Fix race condition when using context takeover, where adjacent messages have
|
||||
data listeners bound at the same time and end up duplicating output
|
||||
* Use `DeflateRaw.flush()` correctly on v0.10 so that optimal compression is
|
||||
achieved
|
||||
|
||||
### 0.1.0 / 2014-12-13
|
||||
|
||||
* Initial release
|
||||
@@ -41,12 +41,12 @@ exts.add(deflate);
|
||||
|
||||
The set of available options can be split into two sets: those that control the
|
||||
session's compressor for outgoing messages and do not need to be communicated to
|
||||
the peer, and those that are negoatiated as part of the protocol. The settings
|
||||
the peer, and those that are negotiated as part of the protocol. The settings
|
||||
only affecting the compressor are described fully in the [zlib
|
||||
documentation](http://nodejs.org/api/zlib.html#zlib_options):
|
||||
|
||||
* `level`: sets the compression level, can be an integer from `0` to `9`, or one
|
||||
of the contants `zlib.Z_NO_COMPRESSION`, `zlib.Z_BEST_SPEED`,
|
||||
of the constants `zlib.Z_NO_COMPRESSION`, `zlib.Z_BEST_SPEED`,
|
||||
`zlib.Z_BEST_COMPRESSION`, or `zlib.Z_DEFAULT_COMPRESSION`
|
||||
* `memLevel`: sets how much memory the compressor allocates, can be an integer
|
||||
from `1` to `9`, or one of the constants `zlib.Z_MIN_MEMLEVEL`,
|
||||
|
||||
+86
-41
@@ -3,6 +3,8 @@
|
||||
var zlib = require('zlib'),
|
||||
common = require('./common');
|
||||
|
||||
var VERSION = process.version.match(/\d+/g).map(function(n) { return parseInt(n, 10) });
|
||||
|
||||
var Session = function(options) {
|
||||
this._level = common.fetch(options, 'level', zlib.Z_DEFAULT_LEVEL);
|
||||
this._memLevel = common.fetch(options, 'memLevel', zlib.Z_DEFAULT_MEMLEVEL);
|
||||
@@ -12,34 +14,106 @@ var Session = function(options) {
|
||||
this._acceptMaxWindowBits = common.fetch(options, 'maxWindowBits', undefined);
|
||||
this._requestNoContextTakeover = common.fetch(options, 'requestNoContextTakeover', false);
|
||||
this._requestMaxWindowBits = common.fetch(options, 'requestMaxWindowBits', undefined);
|
||||
|
||||
this._queueIn = [];
|
||||
this._queueOut = [];
|
||||
};
|
||||
|
||||
Session.prototype.processIncomingMessage = function(message, callback) {
|
||||
if (!message.rsv1) return callback(null, message);
|
||||
if (this._lockIn) return this._queueIn.push([message, callback]);
|
||||
|
||||
var inflate = this._getInflate(),
|
||||
chunks = [message.data, new Buffer([0x00, 0x00, 0xff, 0xff])],
|
||||
chunks = [],
|
||||
length = 0,
|
||||
self = this;
|
||||
|
||||
this._processMessage(inflate, chunks, function(error, data) {
|
||||
if (this._inflate) this._lockIn = true;
|
||||
|
||||
var return_ = function(error, message) {
|
||||
return_ = function() {};
|
||||
|
||||
inflate.removeListener('data', onData);
|
||||
inflate.removeListener('error', onError);
|
||||
if (!self._inflate && inflate.close) inflate.close();
|
||||
if (error) return callback(error, null);
|
||||
message.data = data;
|
||||
callback(null, message);
|
||||
|
||||
self._lockIn = false;
|
||||
var next = self._queueIn.shift();
|
||||
if (next) self.processIncomingMessage.apply(self, next);
|
||||
|
||||
callback(error, message);
|
||||
};
|
||||
|
||||
var onData = function(data) {
|
||||
chunks.push(data);
|
||||
length += data.length;
|
||||
};
|
||||
|
||||
var onError = function(error) {
|
||||
return_(error, null);
|
||||
};
|
||||
|
||||
inflate.on('data', onData);
|
||||
inflate.on('error', onError);
|
||||
|
||||
inflate.write(message.data);
|
||||
inflate.write(new Buffer([0x00, 0x00, 0xff, 0xff]));
|
||||
|
||||
inflate.flush(function() {
|
||||
message.data = common.concat(chunks, length);
|
||||
return_(null, message);
|
||||
});
|
||||
};
|
||||
|
||||
Session.prototype.processOutgoingMessage = function(message, callback) {
|
||||
if (this._lockOut) return this._queueOut.push([message, callback]);
|
||||
|
||||
var deflate = this._getDeflate(),
|
||||
chunks = [],
|
||||
length = 0,
|
||||
self = this;
|
||||
|
||||
this._processMessage(deflate, [message.data], function(error, data) {
|
||||
if (this._deflate) this._lockOut = true;
|
||||
|
||||
var return_ = function(error, message) {
|
||||
return_ = function() {};
|
||||
|
||||
deflate.removeListener('data', onData);
|
||||
deflate.removeListener('error', onError);
|
||||
if (!self._deflate && deflate.close) deflate.close();
|
||||
if (error) return callback(error, null);
|
||||
message.rsv1 = true;
|
||||
message.data = data.slice(0, data.length - 4);
|
||||
callback(null, message);
|
||||
});
|
||||
|
||||
self._lockOut = false;
|
||||
var next = self._queueOut.shift();
|
||||
if (next) self.processOutgoingMessage.apply(self, next);
|
||||
|
||||
callback(error, message);
|
||||
};
|
||||
|
||||
var onData = function(data) {
|
||||
var tail = data.slice(Math.max(data.length - 4, 0), data.length),
|
||||
isEnd = (tail[0] === 0x00 && tail[1] === 0x00 && tail[2] === 0xff && tail[3] === 0xff);
|
||||
|
||||
if (isEnd) data = data.slice(0, data.length - 4);
|
||||
|
||||
chunks.push(data);
|
||||
length += data.length;
|
||||
|
||||
if (isEnd) {
|
||||
message.rsv1 = true;
|
||||
message.data = common.concat(chunks, length);
|
||||
return_(null, message);
|
||||
}
|
||||
};
|
||||
|
||||
var onError = function(error) {
|
||||
return_(error, null);
|
||||
};
|
||||
|
||||
deflate.on('data', onData);
|
||||
deflate.on('error', onError);
|
||||
|
||||
deflate.write(message.data);
|
||||
if (VERSION[0] === 0 && VERSION[1] < 10) deflate.flush();
|
||||
};
|
||||
|
||||
Session.prototype.close = function() {
|
||||
@@ -61,6 +135,7 @@ Session.prototype._getDeflate = function() {
|
||||
if (this._deflate) return this._deflate;
|
||||
|
||||
var deflate = zlib.createDeflateRaw({
|
||||
flush: zlib.Z_SYNC_FLUSH,
|
||||
windowBits: this._ownWindowBits,
|
||||
level: this._level,
|
||||
memLevel: this._memLevel,
|
||||
@@ -70,34 +145,4 @@ Session.prototype._getDeflate = function() {
|
||||
return deflate;
|
||||
};
|
||||
|
||||
Session.prototype._processMessage = function(codec, data, callback) {
|
||||
var chunks = [],
|
||||
length = 0;
|
||||
|
||||
var return_ = function(error, result) {
|
||||
return_ = function() {};
|
||||
codec.removeListener('data', onData);
|
||||
codec.removeListener('error', onError);
|
||||
codec = null;
|
||||
callback(error, result);
|
||||
};
|
||||
|
||||
var onData = function(chunk) {
|
||||
chunks.push(chunk);
|
||||
length += chunk.length;
|
||||
};
|
||||
|
||||
var onError = function(error) {
|
||||
return_(error, null);
|
||||
};
|
||||
|
||||
codec.on('data', onData);
|
||||
codec.on('error', onError);
|
||||
data.forEach(function(c) { codec.write(c) });
|
||||
|
||||
codec.flush(function() {
|
||||
return_(null, common.concat(chunks, length));
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = Session;
|
||||
|
||||
+1
-1
@@ -5,7 +5,7 @@
|
||||
, "keywords" : ["websocket", "compression", "deflate"]
|
||||
, "license" : "MIT"
|
||||
|
||||
, "version" : "0.1.0"
|
||||
, "version" : "0.1.1"
|
||||
, "engines" : {"node": ">=0.6.0"}
|
||||
, "main" : "./lib/permessage_deflate"
|
||||
, "devDependencies" : {"jstest": ""}
|
||||
|
||||
+11
-10
@@ -9,6 +9,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
this.deflate = zlibMock()
|
||||
this.inflate = zlibMock()
|
||||
this.flush = zlib.Z_SYNC_FLUSH
|
||||
this.level = zlib.Z_DEFAULT_LEVEL
|
||||
this.memLevel = zlib.Z_DEFAULT_MEMLEVEL
|
||||
this.strategy = zlib.Z_DEFAULT_STRATEGY
|
||||
@@ -24,7 +25,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
this.stub(stream, "removeListener")
|
||||
|
||||
this.stub(stream, "write")
|
||||
this.stub(stream, "flush").yields([])
|
||||
this.stub(stream, "flush", function(cb) { if(cb) cb() });
|
||||
this.stub(stream, "close").raises(new Error("unexpected close()"))
|
||||
|
||||
return stream
|
||||
@@ -70,7 +71,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
|
||||
activate()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
@@ -101,7 +102,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses no context takeover and 15 window bits to deflate outgoing messages", function() { with(this) {
|
||||
activate()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
|
||||
expect(deflate, "close").exactly(2)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
@@ -140,7 +141,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses context takeover and 8 window bits for deflating outgoing messages", function() { with(this) {
|
||||
activate()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
@@ -169,7 +170,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses no context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
|
||||
activate()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
|
||||
expect(deflate, "close").exactly(2)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
@@ -191,7 +192,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses context takeover and 9 window bits for deflating outgoing messages", function() { with(this) {
|
||||
activate()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 9, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 9, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
@@ -214,7 +215,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses context takeover and 8 window bits for deflating outgoing messages", function() { with(this) {
|
||||
activate()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 8, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
@@ -309,7 +310,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("sets the level of the deflate stream", function() { with(this) {
|
||||
activate()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
}})
|
||||
@@ -319,7 +320,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("sets the memLevel of the deflate stream", function() { with(this) {
|
||||
activate()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
}})
|
||||
@@ -329,7 +330,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("sets the strategy of the deflate stream", function() { with(this) {
|
||||
activate()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
}})
|
||||
|
||||
+12
-11
@@ -9,6 +9,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
this.deflate = zlibMock()
|
||||
this.inflate = zlibMock()
|
||||
this.flush = zlib.Z_SYNC_FLUSH
|
||||
this.level = zlib.Z_DEFAULT_LEVEL
|
||||
this.memLevel = zlib.Z_DEFAULT_MEMLEVEL
|
||||
this.strategy = zlib.Z_DEFAULT_STRATEGY
|
||||
@@ -24,7 +25,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
this.stub(stream, "removeListener")
|
||||
|
||||
this.stub(stream, "write")
|
||||
this.stub(stream, "flush").yields([])
|
||||
this.stub(stream, "flush", function(cb) { if(cb) cb() });
|
||||
this.stub(stream, "close").raises(new Error("unexpected close()"))
|
||||
|
||||
return stream
|
||||
@@ -61,7 +62,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
@@ -76,7 +77,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses no context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
|
||||
expect(deflate, "close").exactly(2)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
@@ -108,7 +109,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses context takeover and 13 window bits for deflating outgoing messages", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 13, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 13, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
@@ -171,7 +172,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses no context takeover and 15 window bits for deflating outgoing messages", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: level, memLevel: memLevel, strategy: strategy}).exactly(2).returning(deflate)
|
||||
expect(deflate, "close").exactly(2)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
@@ -189,7 +190,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses context takeover and 12 window bits for deflating outgoing messages", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
@@ -204,7 +205,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses context takeover and 12 window bits for deflating outgoing messages", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 12, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
@@ -219,7 +220,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("uses context takeover and 11 window bits for deflating outgoing messages", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 11, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 11, level: level, memLevel: memLevel, strategy: strategy}).exactly(1).returning(deflate)
|
||||
processOutgoingMessage()
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
@@ -311,7 +312,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("sets the level of the deflate stream", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_BEST_SPEED, memLevel: memLevel, strategy: strategy}).returns(deflate)
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
}})
|
||||
@@ -321,7 +322,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("sets the memLevel of the deflate stream", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: 5, strategy: strategy}).returns(deflate)
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
}})
|
||||
@@ -331,7 +332,7 @@ test.describe("ClientSession", function() { with(this) {
|
||||
|
||||
it("sets the strategy of the deflate stream", function() { with(this) {
|
||||
response()
|
||||
expect(zlib, "createDeflateRaw").given({windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
|
||||
expect(zlib, "createDeflateRaw").given({flush: flush, windowBits: 15, level: zlib.Z_DEFAULT_LEVEL, memLevel: memLevel, strategy: zlib.Z_FILTERED}).returns(deflate)
|
||||
processOutgoingMessage()
|
||||
}})
|
||||
}})
|
||||
|
||||
Reference in New Issue
Block a user