Skip to content

Commit

Permalink
Fix pubsub further
Browse files Browse the repository at this point in the history
Unsubscribing from all channels did not work properly with reconnect
Pub sub did not work properly with the new `string_numbers` option
  • Loading branch information
Ruben Bridgewater committed Mar 31, 2016
1 parent 3fd865b commit 79c1767
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 43 deletions.
77 changes: 37 additions & 40 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ var Parser = require('redis-parser');
var commands = require('redis-commands');
var debug = require('./lib/debug');
var unifyOptions = require('./lib/createClient');
var SUBSCRIBE_COMMANDS = {
subscribe: true,
unsubscribe: true,
psubscribe: true,
punsubscribe: true
};

// Newer Node.js versions > 0.10 return the EventEmitter right away and using .EventEmitter was deprecated
if (typeof EventEmitter !== 'function') {
Expand Down Expand Up @@ -615,59 +621,52 @@ function normal_reply (self, reply) {
}
}

function set_subscribe (self, type, command_obj, subscribe, reply) {
var i = 0;
function set_subscribe (self, type, subscribe, channel) {
// Every channel has to be saved / removed one after the other and the type has to be the same too,
// to make sure partly subscribe / unsubscribe works well together
if (subscribe) {
// The channels have to be saved one after the other and the type has to be the same too,
// to make sure partly subscribe / unsubscribe works well together
for (; i < command_obj.args.length; i++) {
self.subscription_set[type + '_' + command_obj.args[i]] = command_obj.args[i];
}
self.subscription_set[type + '_' + channel] = channel;
} else {
type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent
for (; i < command_obj.args.length; i++) {
delete self.subscription_set[type + '_' + command_obj.args[i]];
}
if (reply[2] === 0) { // No channels left that this client is subscribed to
var running_command;
i = 0;
// This should be a rare case and therefor handling it this way should be good performance wise for the general case
while (running_command = self.command_queue.get(i++)) {
if (
running_command.command === 'subscribe' ||
running_command.command === 'psubscribe' ||
running_command.command === 'unsubscribe' ||
running_command.command === 'punsubscribe'
) {
self.pub_sub_mode = i;
return;
}
}
self.pub_sub_mode = 0;
}
delete self.subscription_set[type + '_' + channel];
}
}

function subscribe_unsubscribe (self, reply, type, subscribe) {
// Subscribe commands take an optional callback and also emit an event, but only the _last_ response is included in the callback
// The pub sub commands return each argument in a separate return value and have to be handled that way
var command_obj = self.command_queue.get(0);
var buffer = self.options.return_buffers || self.options.detect_buffers && command_obj && command_obj.buffer_args || reply[1] === null;
var channel = buffer ? reply[1] : reply[1].toString();
var count = reply[2];
var buffer = self.options.return_buffers || self.options.detect_buffers && command_obj.buffer_args;
var channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString();
var count = +reply[2]; // Return the channel counter as number no matter if `string_numbers` is activated or not
debug('Subscribe / unsubscribe command');

// Emit first, then return the callback
if (channel !== null) { // Do not emit something if there was no channel to unsubscribe from
if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from
self.emit(type, channel, count);
set_subscribe(self, type, subscribe, channel);
}
// The pub sub commands return each argument in a separate return value and have to be handled that way
if (command_obj.sub_commands_left <= 1) {
if (count !== 0 && !subscribe && command_obj.args.length === 0) {
command_obj.sub_commands_left = count;
return;
if (count !== 0) {
if (!subscribe && command_obj.args.length === 0) { // Unsubscribe from all channels
command_obj.sub_commands_left = count;
return;
}
} else {
var running_command;
var i = 1;
// This should be a rare case and therefor handling it this way should be good performance wise for the general case
while (running_command = self.command_queue.get(i)) {
if (SUBSCRIBE_COMMANDS[running_command.command]) {
self.command_queue.shift();
self.pub_sub_mode = i;
return;
}
i++;
}
self.pub_sub_mode = 0;
}
self.command_queue.shift();
set_subscribe(self, type, command_obj, subscribe, reply);
if (typeof command_obj.callback === 'function') {
// TODO: The current return value is pretty useless.
// Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too
Expand Down Expand Up @@ -819,12 +818,10 @@ RedisClient.prototype.internal_send_command = function (command, args, callback)
var command_obj = new Command(command, args_copy, callback);
command_obj.buffer_args = buffer_args;

if (command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe') {
if (SUBSCRIBE_COMMANDS[command] && this.pub_sub_mode === 0) {
// If pub sub is already activated, keep it that way, otherwise set the number of commands to resolve until pub sub mode activates
// Deactivation of the pub sub mode happens in the result handler
if (!this.pub_sub_mode) {
this.pub_sub_mode = this.command_queue.length + 1;
}
this.pub_sub_mode = this.command_queue.length + 1;
}
this.command_queue.push(command_obj);

Expand Down
35 changes: 32 additions & 3 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,44 @@ describe('publish/subscribe', function () {
});
});

describe('string_numbers and pub sub', function () {
beforeEach(function (done) {
sub.end(false);
sub = redis.createClient({
string_numbers: true
});
sub.once('connect', function () {
done();
});
});

it('does not fire subscribe events after reconnecting', function (done) {
var i = 0;
sub.on('subscribe', function (chnl, count) {
assert.strictEqual(typeof count, 'number');
assert.strictEqual(++i, count);
});
sub.on('unsubscribe', function (chnl, count) {
assert.strictEqual(typeof count, 'number');
assert.strictEqual(--i, count);
});
sub.subscribe(channel, channel2);
sub.unsubscribe(function (err, res) { // Do not pass a channel here!
assert.strictEqual(sub.pub_sub_mode, 2);
assert.deepEqual(sub.subscription_set, {});
});
sub.set('foo', 'bar', helper.isString('OK'));
sub.subscribe(channel2, done);
});
});

describe('subscribe', function () {
it('fires a subscribe event for each channel subscribed to even after reconnecting', function (done) {
var a = false;
sub.on('subscribe', function (chnl, count) {
if (chnl === channel2) {
assert.equal(2, count);
if (a) {
return done();
}
if (a) return done();
sub.stream.destroy();
}
});
Expand Down

0 comments on commit 79c1767

Please sign in to comment.