Change the order of operations in destroyClient() so that the ID is not unregistered until its subscriptions and message queue have been deleted.
This commit is contained in:
+27
-24
@@ -54,44 +54,47 @@ Engine.prototype = {
|
||||
clearInterval(this._gc);
|
||||
},
|
||||
|
||||
createClient: function(callback, scope) {
|
||||
createClient: function(callback, context) {
|
||||
var clientId = this._server.generateId(), self = this;
|
||||
this._redis.zadd(this._ns + '/clients', 0, clientId, function(error, added) {
|
||||
if (added === 0) return self.createClient(callback, scope);
|
||||
if (added === 0) return self.createClient(callback, context);
|
||||
self._server.debug('Created new client ?', clientId);
|
||||
self.ping(clientId);
|
||||
self._server.trigger('handshake', clientId);
|
||||
callback.call(scope, clientId);
|
||||
callback.call(context, clientId);
|
||||
});
|
||||
},
|
||||
|
||||
destroyClient: function(clientId, callback, scope) {
|
||||
clientExists: function(clientId, callback, context) {
|
||||
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
|
||||
callback.call(context, score !== null);
|
||||
});
|
||||
},
|
||||
|
||||
destroyClient: function(clientId, callback, context) {
|
||||
var self = this;
|
||||
this._redis.zrem(this._ns + '/clients', clientId);
|
||||
this._redis.del(this._ns + '/clients/' + clientId + '/messages');
|
||||
|
||||
this._redis.smembers(this._ns + '/clients/' + clientId + '/channels', function(error, channels) {
|
||||
var n = channels.length, i = 0;
|
||||
if (i === n) return self.afterDestroy(clientId, callback, scope);
|
||||
if (i === n) return self._afterSubscriptionsRemoved(clientId, callback, context);
|
||||
|
||||
channels.forEach(function(channel) {
|
||||
self.unsubscribe(clientId, channel, function() {
|
||||
i += 1;
|
||||
if (i === n) self.afterDestroy(clientId, callback, scope);
|
||||
if (i === n) self._afterSubscriptionsRemoved(clientId, callback, context);
|
||||
});
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
afterDestroy: function(clientId, callback, scope) {
|
||||
this._server.debug('Destroyed client ?', clientId);
|
||||
this._server.trigger('disconnect', clientId);
|
||||
if (callback) callback.call(scope);
|
||||
},
|
||||
|
||||
clientExists: function(clientId, callback, scope) {
|
||||
this._redis.zscore(this._ns + '/clients', clientId, function(error, score) {
|
||||
callback.call(scope, score !== null);
|
||||
_afterSubscriptionsRemoved: function(clientId, callback, context) {
|
||||
var self = this;
|
||||
this._redis.del(this._ns + '/clients/' + clientId + '/messages', function() {
|
||||
self._redis.zrem(self._ns + '/clients', clientId, function() {
|
||||
self._server.debug('Destroyed client ?', clientId);
|
||||
self._server.trigger('disconnect', clientId);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
@@ -105,25 +108,25 @@ Engine.prototype = {
|
||||
this._redis.zadd(this._ns + '/clients', time, clientId);
|
||||
},
|
||||
|
||||
subscribe: function(clientId, channel, callback, scope) {
|
||||
subscribe: function(clientId, channel, callback, context) {
|
||||
var self = this;
|
||||
this._redis.sadd(this._ns + '/clients/' + clientId + '/channels', channel, function(error, added) {
|
||||
if (added === 1) self._server.trigger('subscribe', clientId, channel);
|
||||
});
|
||||
this._redis.sadd(this._ns + '/channels' + channel, clientId, function() {
|
||||
self._server.debug('Subscribed client ? to channel ?', clientId, channel);
|
||||
if (callback) callback.call(scope);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
},
|
||||
|
||||
unsubscribe: function(clientId, channel, callback, scope) {
|
||||
unsubscribe: function(clientId, channel, callback, context) {
|
||||
var self = this;
|
||||
this._redis.srem(this._ns + '/clients/' + clientId + '/channels', channel, function(error, removed) {
|
||||
if (removed === 1) self._server.trigger('unsubscribe', clientId, channel);
|
||||
});
|
||||
this._redis.srem(this._ns + '/channels' + channel, clientId, function() {
|
||||
self._server.debug('Unsubscribed client ? from channel ?', clientId, channel);
|
||||
if (callback) callback.call(scope);
|
||||
if (callback) callback.call(context);
|
||||
});
|
||||
},
|
||||
|
||||
@@ -184,7 +187,7 @@ Engine.prototype = {
|
||||
}, this);
|
||||
},
|
||||
|
||||
_withLock: function(lockName, callback, scope) {
|
||||
_withLock: function(lockName, callback, context) {
|
||||
var lockKey = this._ns + '/locks/' + lockName,
|
||||
currentTime = new Date().getTime(),
|
||||
expiry = currentTime + this.LOCK_TIMEOUT * 1000 + 1,
|
||||
@@ -195,7 +198,7 @@ Engine.prototype = {
|
||||
};
|
||||
|
||||
this._redis.setnx(lockKey, expiry, function(error, set) {
|
||||
if (set === 1) return callback.call(scope, releaseLock);
|
||||
if (set === 1) return callback.call(context, releaseLock);
|
||||
|
||||
self._redis.get(lockKey, function(error, timeout) {
|
||||
if (!timeout) return;
|
||||
@@ -205,7 +208,7 @@ Engine.prototype = {
|
||||
|
||||
self._redis.getset(lockKey, expiry, function(error, oldValue) {
|
||||
if (oldValue !== timeout) return;
|
||||
callback.call(scope, releaseLock);
|
||||
callback.call(context, releaseLock);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user