Skip to content

Commit

Permalink
Reorganize to support batching in preprocessing:
Browse files Browse the repository at this point in the history
- remove __preprocessing from daemon.js . This will become the basis for several other functions
- preprocessing() call now only adds task to linked list, the daemon handles expanding and executing tasks
  • Loading branch information
p-flock committed Jan 14, 2020
1 parent 808f593 commit 22e4171
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 136 deletions.
39 changes: 23 additions & 16 deletions lib/client/preprocessing/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module.exports = function (jiffClient) {
* @return {promise} a promise that is resolved when preprocessing is completed, null if this is called by a party that is neither a compute nor receiver party
*/
jiffClient.preprocessing = function (dependent_op, count, batch, protocols, threshold, receivers_list, compute_list, Zp, id_list, params) {
var start = jiffClient.preprocessingTasks.length === 0;
var start = jiffClient.preprocessingTasks.head == null;

// defaults!
if (receivers_list == null) {
Expand Down Expand Up @@ -101,10 +101,9 @@ module.exports = function (jiffClient) {
if (threshold == null) {
threshold = receivers_list.length;
}
if (id_list == null) {
id_list = [];
if (protocols == null) {
protocols = {};
}
protocols = Object.assign({}, jiffClient.default_preprocessing_protocols, protocols);

// actual preprocessing
if (count == null || count <= 0) {
Expand All @@ -119,9 +118,19 @@ module.exports = function (jiffClient) {
batch = batch == null ? count : batch;

// Create preprocessing tasks
for (var i = 0; i < count; i += batch) {
jiffClient.preprocessingTasks.push([dependent_op, Math.min(batch, count - i), protocols, threshold, receivers_list, compute_list, Zp, id_list, params]);
}
var task = {
'dependent_op' : dependent_op,
'count' : count,
'threshold' : threshold,
'receivers_list' : receivers_list,
'compute_list' : compute_list,
'Zp' : Zp,
'id_list' : id_list,
'id' : null,
'params' : params,
'protocols' : protocols
};
jiffClient.preprocessingTasks.add(task);

// Start daemon if not running!
if (start) {
Expand All @@ -130,17 +139,15 @@ module.exports = function (jiffClient) {
};

/**
* Indicates that all preprocessing tasks have been specified by the user (via `jiffClient.preprocessing()`)
* Calls the given callback when all preprocessing tasks have finished! This is meant to be called only once
* @method onFinishPreprocessing
* @method finishPreprocessing
* @memberof module:jiff-client~JIFFClient
* @instance
* @param callback {!Function} - the callback to execute when preprocessing is finished.
*/
jiffClient.onFinishPreprocessing = function (callback) {
if (jiffClient.preprocessingTasks.length === 0) {
jiffClient.counters.reset();
callback(jiffClient);
} else {
jiffClient.preprocessingCallback = callback;
}
jiffClient.finishPreprocessing = function (callback) {
jiffClient.preprocessingCallback = callback;
jiffClient.preprocessingDaemon();
};
};
};
176 changes: 56 additions & 120 deletions lib/client/preprocessing/daemon.js
Original file line number Diff line number Diff line change
@@ -1,114 +1,42 @@
module.exports = function (jiffClient) {
/**
* Generate values used for jiff operations in advance of the general computation
* @method __preprocessing
* @memberof module:jiff-client~JIFFClient
* @instance
* @param {string} dependent_op - name of the operation that will later use the pre_processed values
* @param {Number} count - number of times the protocol should be performed, number of values that will be generated
* @param {Object} [protocols=defaults] - a mapping from base preprocessing elements (triplets, bit arrays) to functions that can pre-process them
* the function must implement the same interface as the JIFF provided protocols (e.g. jiffClient.protocols.generate_beaver_bgw).
* missing mappings indicate that JIFF must use the default protocols.
* @param {Number} [threshold=receivers_list.length] - the threshold of the preprocessed shares.
* @param {Array} [receivers_list=all_parties] - the parties that will receive the preprocsssed shares.
* @param {Array} [compute_list=all_parties] - the parties that will compute the preprocsssed shares.
* @param {Array} [Zp=jiffClient.Zp] - the Zp of the preprocessed shares.
* @param {Array} [id_list=auto_gen()] - array of ids to be used sequentially to identify the pre_processed values. Optional.
* @param {Object} params - any additional protocol-specific parameters.
* @return {promise} a promise that is resolved when preprocessing is completed.
*/
jiffClient.__preprocessing = function (dependent_op, count, protocols, threshold, receivers_list, compute_list, Zp, id_list, params) {
var find_closest_namespace = function (op, starting_namespace) {
var namespace_index = jiffClient.extensions.indexOf(starting_namespace);
while (namespace_index >= 0) {
var namespace = jiffClient.extensions[namespace_index];
if (jiffClient.preprocessing_function_map[namespace] != null && jiffClient.preprocessing_function_map[namespace][op] != null) {
return namespace;
}
namespace_index--;
}

return null;
};

// read only copy of params
var _params = params;
var currentBatchLoad = 0;

// Recursively follow jiffClient.preprocessing_function_map
// to figure out the sub-components/nested primitives of the given operation
// and pre-process those with the right op_ids.
var promises = [];
for (var i = 0; i < count; i++) {
params = Object.assign({}, _params);
if (params.op_id != null) {
params.op_id = params.op_id + i;
var getFirstTask = function (task) {
if (task.count > 1) {
var remainingTasks = Object.assign({}, task);
remainingTasks.count--;
task.count = 1;
if (task.id_list != null) {
task.id = [remainingTasks.id_list.shift()];
task.id_list = null;
}
jiffClient.preprocessingTasks.pushHead(remainingTasks);
}
if (task.id_list != null) {
task.id = task.id_list[0];
task.id_list = null;
}
return task;
}

var id = id_list[i];
if (id == null) {
// Two kinds of operations: one that relies on different sets of senders and receivers, and one that has a set of holders
if (dependent_op === 'open' || dependent_op === 'bits.open') { // TODO: make this part of the description in table
var open_parties = params['open_parties'] != null ? params['open_parties'] : receivers_list;
id = jiffClient.counters.gen_op_id2(dependent_op, open_parties, receivers_list);
} else {
id = jiffClient.counters.gen_op_id(dependent_op, receivers_list);
}
}

var namespace = find_closest_namespace(dependent_op, params['namespace']);
if (namespace == null) {
var protocol = protocols[dependent_op];
params.output_op_id = id;
var result = protocol(threshold, receivers_list, compute_list, Zp, params, protocols);
promises.push(result.promise);
if (receivers_list.indexOf(jiffClient.id) > -1) {
jiffClient.store_preprocessing(id, result.share);
}
} else {
var preprocessing_dependencies = jiffClient.preprocessing_function_map[namespace][dependent_op];
if (typeof(preprocessing_dependencies) === 'function') {
preprocessing_dependencies = preprocessing_dependencies(dependent_op, count, protocols, threshold, receivers_list, compute_list, Zp, id_list, params);
}
for (var k = 0; k < preprocessing_dependencies.length; k++) {
var dependency = preprocessing_dependencies[k];
var next_op = dependency['op'];

// copy both the originally given extra_params and the extra params of the dependency and merge them
// together, dependency params overwrite duplicate keys.
// If params are ever needed in non-leaf operations, this must be changed to accommodate
var extra_params = Object.assign({}, params, dependency['params']);
extra_params['namespace'] = dependency['namespace'] != null ? dependency['namespace'] : 'base';
if (dependency.handler != null) {
extra_params = dependency.handler(threshold, receivers_list, compute_list, Zp, id, extra_params);
}
if (extra_params.ignore === true) {
continue;
}

// compose ids similar to how the actual operation is implemented
var next_id_list = [];
var next_count = dependency['count'];

if (next_count == null) {
next_count = 1;
next_id_list[0] = id + dependency['op_id'];
} else {
next_count = next_count(threshold, receivers_list, compute_list, Zp, id, extra_params);
for (var j = 0; j < next_count; j++) {
next_id_list.push(id + dependency['op_id'] + j);
}
}

if (extra_params['op_id'] != null) {
extra_params['op_id'] = extra_params['op_id'] + dependency['op_id'];
}
var checkIfDone = function () {
if (jiffClient.preprocessingCallback != null && currentBatchLoad === 0) {
jiffClient.counters.reset();

promises.push(jiffClient.__preprocessing(next_op, next_count, protocols, threshold, receivers_list, compute_list, Zp, next_id_list, extra_params));
}
}
var callback = jiffClient.preprocessingCallback;
jiffClient.preprocessingCallback = null;
callback(jiffClient);
}
};

return Promise.all(promises);
var buildID = function (task) {
// Two kinds of operations: one that relies on different sets of senders and receivers, and one that has a set of holders
if (task.dependent_op === 'open' || task.dependent_op === 'bits.open') { // TODO: make this part of the description in table
var open_parties = task.params['open_parties'] != null ? task.params['open_parties'] : task.receivers_list;
task.id = jiffClient.counters.gen_op_id2(task.dependent_op, open_parties, task.receivers_list);
} else {
task.id = jiffClient.counters.gen_op_id(task.dependent_op, task.receivers_list);
}
};

/**
Expand All @@ -118,24 +46,32 @@ module.exports = function (jiffClient) {
* @instance
*/
jiffClient.preprocessingDaemon = function () {
if (jiffClient.preprocessingTasks.length === 0) {
if (jiffClient.preprocessingCallback != null) {
jiffClient.counters.reset();
if (currentBatchLoad >= jiffClient.preprocessingBatchSize) {
return;
}

var callback = jiffClient.preprocessingCallback;
jiffClient.preprocessingCallback = null;
callback(jiffClient);
}
var task = jiffClient.preprocessingTasks.popHead();

if (task == null) {
checkIfDone();
return;
}

// execute a single preprocessing task!
(function () {
var promise = jiffClient.__preprocessing.apply(jiffClient, arguments);
promise.then(function () {
jiffClient.preprocessingTasks.shift();
jiffClient.preprocessingDaemon();
});
}).apply(jiffClient, jiffClient.preprocessingTasks[0]);
task = getFirstTask(task);
if (task.id == null) {
buildID(task);
}

// check if task is executable or no
if (taskIsExecutable(task)) {
// execute single task
executeTask(task).then(jiffClient.preprocessingDaemon);
} else {
//expand single task
expandTask(task); // this co-recursively calls preprocessingDaemon()
}


// expand task to base level
};
};
};

0 comments on commit 22e4171

Please sign in to comment.