Skip to content

Commit

Permalink
stream cmd: write, flush added
Browse files Browse the repository at this point in the history
  • Loading branch information
niteshsinha committed Aug 8, 2012
1 parent 58804e6 commit 37783ae
Showing 1 changed file with 142 additions and 49 deletions.
191 changes: 142 additions & 49 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
/*global Buffer require exports console setTimeout */

var net = require("net"),
fs = require('fs'),
util = require("./lib/util"),
Queue = require("./lib/queue"),
to_array = require("./lib/to_array"),
events = require("events"),
parsers = [], commands,
connection_id = 0,
default_port = 6379,
default_host = "127.0.0.1";
default_host = "127.0.0.1",
write_str = "",
log_stream = "";

// can set this to true to enable for all connections
exports.debug_mode = false;

// write input/output to file
exports.log_to_file = false;

// hiredis might not be installed
try {
require("./lib/parser/hiredis");
Expand Down Expand Up @@ -90,10 +96,12 @@ function RedisClient(stream, options) {
self.connection_gone("end");
});

this.stream.on("drain", function () {
self.should_buffer = false;
self.emit("drain");
});
this.stream.on("drain", function () {
self.should_buffer = false;
self.emit("drain");
});

log_stream = fs.createWriteStream(__dirname + '/file.log', { flags: 'a+' })

events.EventEmitter.call(this);
}
Expand Down Expand Up @@ -469,19 +477,34 @@ RedisClient.prototype.connection_gone = function (why) {
};

RedisClient.prototype.on_data = function (data) {
if (exports.debug_mode) {
console.log("net read " + this.host + ":" + this.port + " id " + this.connection_id + ": " + data.toString());
}

try {
this.reply_parser.execute(data);
} catch (err) {
// This is an unexpected parser problem, an exception that came from the parser code itself.
// Parser should emit "error" events if it notices things are out of whack.
// Callbacks that throw exceptions will land in return_reply(), below.
// TODO - it might be nice to have a different "error" event for different types of errors
this.emit("error", err);
var self = this;
if (exports.debug_mode) {
console.log("net read " + this.host + ":" + this.port + " id " + this.connection_id + ": " + data.toString());
}
if (exports.log_to_file) {
if (data.toString().length > 40)
write_str += "= " + (data.toString().replace(/(\r\n|\n|\r)/gm, ' ').substr(0, 30) + "..[trimmed]") + " \r\n";
else
write_str += "= " + data.toString().replace(/(\r\n|\n|\r)/gm, ' '); + " \r\n";

if (!log_stream.write(write_str)) {
self.stream.pause();
log_stream.on("drain", function cb() {
self.stream.resume();
log_stream.removeListener("drain", cb);
});
}
write_str = "";
}
try {
this.reply_parser.execute(data);
} catch (err) {
// This is an unexpected parser problem, an exception that came from the parser code itself.
// Parser should emit "error" events if it notices things are out of whack.
// Callbacks that throw exceptions will land in return_reply(), below.
// TODO - it might be nice to have a different "error" event for different types of errors
this.emit("error", err);
}
};

RedisClient.prototype.return_error = function (err) {
Expand Down Expand Up @@ -693,30 +716,33 @@ RedisClient.prototype.send_command = function (command, args, callback) {

command_obj = new Command(command, args, false, buffer_args, callback);

if ((!this.ready && !this.send_anyway) || !stream.writable) {
if (exports.debug_mode) {
if (!stream.writable) {
console.log("send command: stream is not writeable.");
}
}

if (this.enable_offline_queue) {
if (exports.debug_mode) {
console.log("Queueing " + command + " for next server connection.");
}
this.offline_queue.push(command_obj);
this.should_buffer = true;
} else {
var not_writeable_error = new Error('send_command: stream not writeable. enable_offline_queue is false');
if (command_obj.callback) {
command_obj.callback(not_writeable_error);
} else {
throw not_writeable_error;
}
}
if ((!this.ready && !this.send_anyway) || !stream.writable) {
if (exports.debug_mode) {
if (!stream.writable) {
console.log("send command: stream is not writeable.");
}

return false;
console.log("Queueing " + command + " for next server connection.");
}
this.offline_queue.push(command_obj);
this.should_buffer = true;
return false;
}
//stream operations
if (command === "flush") {
stream.write("*0\r\n\r\n");
this.command_queue.push(command_obj);
this.commands_sent += 1;
return true;
} else if (command === "write") {
var str = String(args[0]);
if (!(str.charAt(0) === '*'))
str = "*0" + str;
stream.write(str);
this.command_queue.push(command_obj);
this.commands_sent += 1;
return true;
} else {

if (command === "subscribe" || command === "psubscribe" || command === "unsubscribe" || command === "punsubscribe") {
this.pub_sub_command(command_obj);
Expand All @@ -737,18 +763,31 @@ RedisClient.prototype.send_command = function (command, args, callback) {

command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n";

if (! buffer_args) { // Build up a string and send entire command in one write
for (i = 0, il = args.length, arg; i < il; i += 1) {
arg = args[i];
if (typeof arg !== "string") {
arg = String(arg);
}
command_str += "$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n";
if (exports.log_to_file) {
write_str = "\r\n*" + elem_count + " $" + command.length + " " + command + " ";
}

if (!buffer_args) { // Build up a string and send entire command in one write
for (i = 0, il = args.length, arg; i < il; i += 1) {
arg = args[i];
if (typeof arg !== "string") {
arg = String(arg);
}
if (exports.debug_mode) {
console.log("send " + this.host + ":" + this.port + " id " + this.connection_id + ": " + command_str);
command_str += "$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n";

if (exports.log_to_file) {
if (Buffer.byteLength(arg) > 40) {
write_str += "$" + Buffer.byteLength(arg) + " " + (arg.substr(0, 30) + "..[trimmed]") + " ";
} else {
write_str += "$" + Buffer.byteLength(arg) + " " + arg + " ";
}
}
buffered_writes += !stream.write(command_str);

}
if (exports.debug_mode) {
console.log("send " + this.host + ":" + this.port + " id " + this.connection_id + ": " + command_str);
}
buffered_writes += !stream.write(command_str);
} else {
if (exports.debug_mode) {
console.log("send command (" + command_str + ") has Buffer arguments");
Expand Down Expand Up @@ -786,10 +825,21 @@ RedisClient.prototype.send_command = function (command, args, callback) {
if (exports.debug_mode) {
console.log("send_command buffered_writes: " + buffered_writes, " should_buffer: " + this.should_buffer);
}
if (exports.log_to_file && buffer_args) {
write_str += "send_command buffered_writes: " + buffered_writes, " should_buffer: " + this.should_buffer;
}

if (buffered_writes || this.command_queue.getLength() >= this.command_queue_high_water) {
this.should_buffer = true;
}
if (exports.log_to_file) {
log_stream.write(write_str);
write_str = "";
}
return !this.should_buffer;

}

};

RedisClient.prototype.pub_sub_command = function (command_obj) {
Expand Down Expand Up @@ -866,6 +916,9 @@ commands = set_union(["get", "set", "setnx", "setex", "append", "strlen", "del",
"persist", "slaveof", "debug", "config", "subscribe", "unsubscribe", "psubscribe", "punsubscribe", "publish", "watch", "unwatch", "cluster",
"restore", "migrate", "dump", "object", "client", "eval", "evalsha"], require("./lib/commands"));

// for maipulating the stream
stream_commands = ["read", "write", "flush", "close"];

commands.forEach(function (command) {
RedisClient.prototype[command] = function (args, callback) {
if (Array.isArray(args) && typeof callback === "function") {
Expand All @@ -883,6 +936,46 @@ commands.forEach(function (command) {
Multi.prototype[command.toUpperCase()] = Multi.prototype[command];
});

stream_commands.forEach(function (command) {
RedisClient.prototype[command] = function (args, callback) {
if (typeof callback === "function") {
return this.send_command(command, to_array(arguments), callback);
} else {
return this.send_command(command, to_array(arguments));
}
};
RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command];

Multi.prototype[command] = function () {
this.queue.push([command].concat(to_array(arguments)));
return this;
};
Multi.prototype[command.toUpperCase()] = Multi.prototype[command];
});

RedisClient.prototype.flush = function (callback) {
var self = this;
this.send_command('flush', [], function (err, res) {
if (typeof (callback) === 'function') {
callback(err, res);
}
});

};
RedisClient.prototype.FLUSH = RedisClient.prototype.flush;

RedisClient.prototype.write = function (args, callback) {
var self = this;
this.send_command('write', to_array(arguments), function (err, res) {
if (typeof (callback) === 'function') {
callback(err, res);
}
});

};
RedisClient.prototype.WRITE = RedisClient.prototype.write;


// store db in this.select_db to restore it on reconnect
RedisClient.prototype.select = function (db, callback) {
var self = this;
Expand Down

0 comments on commit 37783ae

Please sign in to comment.