Skip to content

Commit

Permalink
Merge pull request pencilblue#61 from pencilblue/story/PB60
Browse files Browse the repository at this point in the history
Fixes pencilblue#60. Provides a distributed command interface for 1 way communication between all nodes in the cluster
  • Loading branch information
brianhyder committed Jul 11, 2014
2 parents 11b495d + 5310625 commit 80e6512
Show file tree
Hide file tree
Showing 5 changed files with 455 additions and 8 deletions.
4 changes: 4 additions & 0 deletions include/requirements.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ pb.MongoRegistrationProvider = require(path.join(DOCUMENT_ROOT, '/include/system
pb.RedisRegistrationProvider = require(path.join(DOCUMENT_ROOT, '/include/system/registry/redis_registration_provider.js'));
pb.ServerRegistration = require(DOCUMENT_ROOT+'/include/system/server_registration.js');

//command service
pb.RedisCommandBroker = require(path.join(DOCUMENT_ROOT, '/include/system/command/redis_command_broker.js'));
pb.CommandService = require(path.join(DOCUMENT_ROOT, '/include/system/command/command_service.js'));

//Email settings and functions
pb.EmailService = require(DOCUMENT_ROOT+'/include/email').EmailService;
pb.email = new pb.EmailService();
Expand Down
264 changes: 264 additions & 0 deletions include/system/command/command_service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/*
Copyright (C) 2014 PencilBlue, LLC
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

/**
* Provides a mechanism to send commands to all members of the cluster or a
* specific member.
* @class CommandService
* @constructor
*/
function CommandService(){}

//statics
/**
* @private
* @property BROKER
* @type {CommandBroker}
*/
var BROKER = null;

/**
* @private
* @property BROKER
* @type {CommandBroker}
*/
var REGISTRANTS = {};

/**
* @private
* @property COMMAND_CHANNEL
* @type {String}
*/
var COMMAND_CHANNEL = 'pencilblue-command-channel';

/**
* Initializes the service and the broker implementation. The broker is
* determined by the configuration value of "command.broker". This value can
* be "redis" for the out of the box implementation for Redis or an absolute
* path to another implementation.
* @static
* @method init
* @param {Function} cb A callback that takes two parameters: cb(Error, TRUE/FALSE)
*/
CommandService.init = function(cb) {
pb.log.debug('CommandService: Initializing...');

//figure out which broker to use
var BrokerPrototype = null;
if (pb.config.command.broker === 'redis') {
BrokerPrototype = pb.RedisCommandBroker;
}
else {
try {
BrokerPrototype = require(pb.config.command.broker);
}
catch(e){
pb.log.error('CommandService: Failed to load CommandBroker implementation at [%s]. %s', pb.config.command.broker, e.stack);
}
}

//ensure a broker was found
if (!BrokerPrototype) {
cb(new Error('A valid CommandBroker must be provided in order to initialize the CommandService'));
}

//instantiate the command broker
BROKER = new BrokerPrototype();
BROKER.init(function(err, result) {
if (util.isError(err)) {
cb(err);
return;
}

BROKER.subscribe(COMMAND_CHANNEL, CommandService.onCommandReceived, cb);
});
};

/**
* Shuts down the command service and the broker if initialized
* @static
* @method shutdown
* @param {Function} cb A callback that takes two parameters: cb(Error, TRUE/FALSE)
*/
CommandService.shutdown = function(cb) {
pb.log.debug('CommandService: Shutting down...');

REGISTRANTS = {};
if (BROKER) {
BROKER.shutdown(cb);
}
else {
cb(null, true);
}
};

/**
* Registers a handler for incoming commands of the specified type.
* @static
* @method registerForType
* @param {String} type The name/type of the command to handle
* @param {Function} handler A function that takes two parameters:
* handler(channel, command). where channel is a string and command is an
* object.
* @returns {Boolean} TRUE if the the handler was registered, FALSE if not.
*/
CommandService.registerForType = function(type, handler) {
if (!pb.validation.validateNonEmptyStr(type, true) || !pb.utils.isFunction(handler)) {
return false;
}

//ensure there is a holder for the type
if (!REGISTRANTS[type]) {
REGISTRANTS[type] = [];
}

REGISTRANTS[type].push(handler);
return true;
};

/**
* Unregisters a handler for the specified type.
* @static
* @method unregisterForType
* @param {String} type The name/type of the command that the handler is
* registered for
* @param {Function} handler The handler function to unregister
* @returns {Boolean} TRUE if the handler was unregistered, FALSE if not.
*/
CommandService.unregisterForType = function(type, handler) {
if (!pb.validation.validateNonEmptyStr(type, true) || !pb.utils.isFunction(handler)) {
return false;
}

if (!util.isArray(REGISTRANTS[type])) {
return false;
}

for (var i = 0; i < REGISTRANTS[type].length; i++) {
if (handler === REGISTRANTS[type][i]) {
REGISTRANTS[type].splice(i, 1);
return true;
}
}
return false;
};

/**
* Responsible for delegating out the received command to the registered
* handlers. The command parameter must be an object, must have a type
* property that is a string, and must have a registered handler for the
* specified type.
* @static
* @method notifyOfCommand
* @param {Object} command The command to delegate
*/
CommandService.notifyOfCommand = function(command) {
if (!pb.utils.isObject(command)) {
return;
}

var type = command.type;
if (!pb.validation.validateNonEmptyStr(type, true)) {
return;
}

if (!util.isArray(REGISTRANTS[type])) {
pb.log.warn('CommandService: Command of type [%s] was received but there are no registered handlers.', type);
return;
}

//emit command to each handler
var emitFunction = function(type, i, command){
return function() {
REGISTRANTS[type][i](command);
};
};
for (var i = 0; i < REGISTRANTS[type].length; i++) {
process.nextTick(emitFunction(type, i, command));
}
};

/**
* Sends a command to the cluster
* @static
* @method sendCommand
* @param {String} type The command name/type
* @param {Object} options The options that will be serialized and sent to the other processes in the cluster
*/
CommandService.sendCommand = function(type, options, cb) {
if (!pb.validation.validateNonEmptyStr(type, true)) {
cb(new Error("The command type is required"));
return;
}
else if (pb.utils.isFunction(options)) {
cb = options;
options = {};
}
else if (!pb.validation.validateObject(options, false)) {
cb(new Error('When provided the options parameter must be an object'));
return;
}

//ensure a callback is provided
cb = cb || pb.utils.cb;

//ensure an options object
if (!options) {
options = {};
}

//set who its from
options.from = pb.ServerRegistration.generateKey();
options.type = type;
options.date = new Date();

//ensure each command is sent with an ID. Allow the ID to already be set
//in the event that we are sending a response.
if (!options.id) {
options.id = pb.utils.uniqueId();
}

//instruct the broker to broadcast the command
BROKER.publish(COMMAND_CHANNEL, options, cb);
};

/**
* The global handler for incoming commands. It registers itself with the
* broker and then when messages are received it verifies that the message is
* meant for this member of the cluster (or all members) then proceeds to
* handoff to the function that will delegate out to the handlers.
* @static
* @method onCommandReceived
* @param {String} channel The channel to listen for incoming commands
* @param {Object} command The command to verify and delegate
*/
CommandService.onCommandReceived = function(channel, command) {
var uid = pb.ServerRegistration.generateKey();
if (command.to === uid || command.to === null || command.to === undefined) {
CommandService.notifyOfCommand(command);
}
else {
//skip because it isn't addressed to us
pb.log.silly('CommandService: The command was not addressed to me [%s] but to [%s]. Skipping.', uid, command.to);
}
};

//register for events
pb.system.registerShutdownHook('CommandService', CommandService.shutdown);

//exports
module.exports = CommandService;
Loading

0 comments on commit 80e6512

Please sign in to comment.