From 7fc36b59e66c1cbb305e176bc389bc7cf4b22d8e Mon Sep 17 00:00:00 2001 From: David Byrd Date: Thu, 24 Sep 2020 18:36:37 -0400 Subject: [PATCH] PPFL template --- .../crypto/PPFL_TemplateClientAgent.py | 311 ++++++++++++++++++ config/ppfl_template.py | 287 ++++++++++++++++ 2 files changed, 598 insertions(+) create mode 100755 agent/examples/crypto/PPFL_TemplateClientAgent.py create mode 100755 config/ppfl_template.py diff --git a/agent/examples/crypto/PPFL_TemplateClientAgent.py b/agent/examples/crypto/PPFL_TemplateClientAgent.py new file mode 100755 index 000000000..eda1f7eaa --- /dev/null +++ b/agent/examples/crypto/PPFL_TemplateClientAgent.py @@ -0,0 +1,311 @@ +from agent.Agent import Agent +from agent.examples.crypto.PPFL_ServiceAgent import PPFL_ServiceAgent +from message.Message import Message +from util.util import log_print + +from util.crypto.logReg import getWeights, reportStats +import util.crypto.diffieHellman as dh + +import numpy as np +from os.path import exists +import pandas as pd +import random + + +# The PPFL_TemplateClientAgent class inherits from the base Agent class. It has the +# structure of a secure federated learning protocol with secure multiparty communication, +# but without any particular learning or noise methods. That is, this is a template in +# which the client parties simply pass around arbitrary data. Sections that would need +# to be completed are clearly marked. + +class PPFL_TemplateClientAgent(Agent): + + def __init__(self, id, name, type, peer_list=None, iterations=4, multiplier=10000, secret_scale = 100000, + X_train = None, y_train = None, X_test = None, y_test = None, split_size = None, + num_clients = None, num_subgraphs = None, random_state=None): + + # Base class init. + super().__init__(id, name, type, random_state) + + + # Store the client's peer list (subgraph, neighborhood) with which it should communicate. + self.peer_list = peer_list + + # Initialize a tracking attribute for the initial peer exchange and record the subgraph size. + self.peer_exchange_complete = False + self.num_peers = len(self.peer_list) + + # Record the total number of clients participating in the protocol and the number of subgraphs. + # Neither of these are part of the protocol, or necessary for real-world implementation, but do + # allow for convenient logging of progress and results in simulation. + self.num_clients = num_clients + self.num_subgraphs = num_subgraphs + + # Record the number of protocol (federated learning) iterations the clients will perform. + self.no_of_iterations = iterations + + # Record the multiplier that will be used to protect against floating point accuracy loss and + # the scale of the client shared secrets. + self.multiplier = multiplier + self.secret_scale = secret_scale + + # Record the training and testing splits for the data set to be learned. + self.X_train = X_train + self.y_train = y_train + + self.X_test = X_test + self.y_test = y_test + + # Record the number of features in the data set. + self.no_of_weights = X_train.shape[1] + + # Initialize an attribute to remember the shared weights returned from the server. + self.prevWeight = np.zeros(self.no_of_weights) + + # Each client receives only a portion of the training data each protocol iteration. + self.split_size = split_size + + # Initialize a dictionary to remember which peers we have heard from during peer exchange. + self.peers_received = {} + + # Initialize a dictionary to accumulate this client's timing information by task. + self.elapsed_time = { 'DH_OFFLINE' : pd.Timedelta(0), 'DH_ONLINE' : pd.Timedelta(0), + 'TRAINING' : pd.Timedelta(0), 'ENCRYPTION' : pd.Timedelta(0) } + + + # Pre-generate this client's local training data for each iteration (for the sake of simulation speed). + self.trainX = [] + self.trainY = [] + + # This is a faster PRNG than the default, for times when we must select a large quantity of randomness. + self.prng = np.random.Generator(np.random.SFC64()) + + ### Data randomly selected from total training set each iteration, simulating online behavior. + for i in range(iterations): + slice = self.prng.choice(range(self.X_train.shape[0]), size = split_size, replace = False) + + # Pull together the current local training set. + self.trainX.append(self.X_train[slice].copy()) + self.trainY.append(self.y_train[slice].copy()) + + + # Create dictionaries to hold the public and secure keys for this client, and the public keys shared + # by its peers. + self.pubkeys = {} + self.seckeys = {} + self.peer_public_keys = {} + + # Create dictionaries to hold the shared key for each peer each iteration and the seed for the + # following iteration. + self.r = {} + self.R = {} + + + ### ADD DIFFERENTIAL PRIVACY CONSTANTS AND CONFIGURATION HERE, IF NEEDED. + # + # + + + # Iteration counter. + self.current_iteration = 0 + + + + + ### Simulation lifecycle messages. + + def kernelStarting(self, startTime): + + # Initialize custom state properties into which we will later accumulate results. + # To avoid redundancy, we allow only the first client to handle initialization. + if self.id == 1: + self.kernel.custom_state['dh_offline'] = pd.Timedelta(0) + self.kernel.custom_state['dh_online'] = pd.Timedelta(0) + self.kernel.custom_state['training'] = pd.Timedelta(0) + self.kernel.custom_state['encryption'] = pd.Timedelta(0) + + # Find the PPFL service agent, so messages can be directed there. + self.serviceAgentID = self.kernel.findAgentByType(PPFL_ServiceAgent) + + # Request a wake-up call as in the base Agent. Noise is kept small because + # the overall protocol duration is so short right now. (up to one microsecond) + super().kernelStarting(startTime + pd.Timedelta(self.random_state.randint(low = 0, high = 1000), unit='ns')) + + + def kernelStopping(self): + + # Accumulate into the Kernel's "custom state" this client's elapsed times per category. + # Note that times which should be reported in the mean per iteration are already so computed. + # These will be output to the config (experiment) file at the end of the simulation. + + self.kernel.custom_state['dh_offline'] += self.elapsed_time['DH_OFFLINE'] + self.kernel.custom_state['dh_online'] += (self.elapsed_time['DH_ONLINE'] / self.no_of_iterations) + self.kernel.custom_state['training'] += (self.elapsed_time['TRAINING'] / self.no_of_iterations) + self.kernel.custom_state['encryption'] += (self.elapsed_time['ENCRYPTION'] / self.no_of_iterations) + + super().kernelStopping() + + + ### Simulation participation messages. + + def wakeup (self, currentTime): + super().wakeup(currentTime) + + # Record start of wakeup for real-time computation delay.. + dt_wake_start = pd.Timestamp('now') + + # Check if the clients are still performing the one-time peer exchange. + if not self.peer_exchange_complete: + + # Generate DH keys. + self.pubkeys, self.seckeys = dh.dict_keygeneration( self.peer_list ) + + # Record elapsed wallclock for Diffie Hellman offline. + dt_wake_end = pd.Timestamp('now') + self.elapsed_time['DH_OFFLINE'] += dt_wake_end - dt_wake_start + + # Set computation delay to elapsed wallclock time. + self.setComputationDelay(int((dt_wake_end - dt_wake_start).to_timedelta64())) + + # Send generated values to peers. + for idx, peer in enumerate(self.peer_list): + # We assume a star network configuration where all messages between peers must be forwarded + # through the server. + self.sendMessage(self.serviceAgentID, Message({ "msg" : "FWD_MSG", "msgToForward" : "PEER_EXCHANGE", + "sender": self.id, "recipient": peer, "pubkey" : self.pubkeys[peer] })) + + else: + + # We are waking up to start a new iteration of the protocol. + # (Peer exchange is done before all this.) + + if (self.current_iteration == 0): + # During iteration 0 (only) we complete the key exchange and prepare the + # common key list, because at this point we know we have received keys + # from all peers. + + # R is the common key dictionary (by peer agent id). + dh.dict_keyexchange(self.peer_list, self.id, self.pubkeys, self.seckeys, self.peer_public_keys) + + + # CREATE AND CACHE LOCAL DIFFERENTIAL PRIVACY NOISE HERE, IF NEEDED. + # + # + + + # Diffie Hellman is done in every iteration. + for peer_id, common_key in self.R.items(): + + random.seed(common_key) + rand = random.getrandbits(512) + + rand_b_raw = format(rand, '0512b') + rand_b_rawr = rand_b_raw[:256] + rand_b_rawR = rand_b_raw[256:] + + + # Negate offsets below this agent's id. This ensures each offset will be + # added once and subtracted once. + r = int(rand_b_rawr,2) % (2**32) + + # Update dictionary of shared secrets for this iteration. + self.r[peer_id] = r if peer_id < self.id else -r + + # Store the shared seeds for the next iteration. + self.R[peer_id] = int(rand_b_rawR,2) + + + # Record elapsed wallclock for Diffie Hellman online. + dt_online_complete = pd.Timestamp('now') + + # For convenience of things indexed by iteration... + i = self.current_iteration + + + ### ADD LOCAL LEARNING METHOD HERE, IF NEEDED. + # + # + + weight = np.random.normal (loc = self.prevWeight, scale = self.prevWeight / 10, size = self.prevWeight.shape) + + + # Record elapsed wallclock for training model. + dt_training_complete = pd.Timestamp('now') + + + ### ADD NOISE TO THE WEIGHTS HERE, IF NEEDED. + # + # + + n = np.array(weight) * self.multiplier + weights_to_send = n + sum(self.r.values()) + + + # Record elapsed wallclock for encryption. + dt_encryption_complete = pd.Timestamp('now') + + # Set computation delay to elapsed wallclock time. + self.setComputationDelay(int((dt_encryption_complete - dt_wake_start).to_timedelta64())) + + # Send the message to the server. + self.sendMessage(self.serviceAgentID, Message({ "msg" : "CLIENT_WEIGHTS", "sender": self.id, + "weights" : weights_to_send })) + + self.current_iteration += 1 + + # Store elapsed times by category. + self.elapsed_time['DH_ONLINE'] += dt_online_complete - dt_wake_start + self.elapsed_time['TRAINING'] += dt_training_complete - dt_online_complete + self.elapsed_time['ENCRYPTION'] += dt_encryption_complete - dt_training_complete + + + + def receiveMessage (self, currentTime, msg): + super().receiveMessage(currentTime, msg) + + if msg.body['msg'] == "PEER_EXCHANGE": + + # Record start of message processing. + dt_rcv_start = pd.Timestamp('now') + + # Ensure we don't somehow record the same peer twice. These all come from the + # service provider, relayed from other clients, but are "fixed up" to appear + # as if they come straight from the relevant peer. + if msg.body['sender'] not in self.peers_received: + + # Record the content of the message and that we received it. + self.peers_received[msg.body['sender']] = True + self.peer_public_keys[msg.body['sender']] = msg.body['pubkey'] + + # Record end of message processing. + dt_rcv_end = pd.Timestamp('now') + + # Store elapsed times by category. + self.elapsed_time['DH_OFFLINE'] += dt_rcv_end - dt_rcv_start + + # Set computation delay to elapsed wallclock time. + self.setComputationDelay(int((dt_rcv_end - dt_rcv_start).to_timedelta64())) + + # If this is the last peer from whom we expect to hear, move on with the protocol. + if len(self.peers_received) == self.num_peers: + self.peer_exchange_complete = True + self.setWakeup(currentTime + pd.Timedelta('1ns')) + + elif msg.body['msg'] == "SHARED_WEIGHTS": + # Reset computation delay. + self.setComputationDelay(0) + + # Extract the shared weights from the message. + self.prevWeight = msg.body['weights'] + + # Remove the multiplier that was helping guard against floating point error. + self.prevWeight /= self.multiplier + + log_print ("Client weights received for iteration {} by {}: {}", self.current_iteration, self.id, self.prevWeight) + + if self.id == 1: print (f"Protocol iteration {self.current_iteration} complete.") + + # Start a new iteration if we are not at the end of the protocol. + if self.current_iteration < self.no_of_iterations: + self.setWakeup(currentTime + pd.Timedelta('1ns')) + diff --git a/config/ppfl_template.py b/config/ppfl_template.py new file mode 100755 index 000000000..f1287efd1 --- /dev/null +++ b/config/ppfl_template.py @@ -0,0 +1,287 @@ +# Our custom modules. +from Kernel import Kernel +from agent.examples.crypto.PPFL_TemplateClientAgent import PPFL_TemplateClientAgent as PPFL_ClientAgent +from agent.examples.crypto.PPFL_ServiceAgent import PPFL_ServiceAgent +from model.LatencyModel import LatencyModel +from util import util +from util.crypto import logReg + +# Standard modules. +from datetime import timedelta +from math import floor +import numpy as np +from os.path import exists +import pandas as pd +from sklearn.model_selection import train_test_split +from sys import exit +from time import time + + +# Some config files require additional command line parameters to easily +# control agent or simulation hyperparameters during coarse parallelization. +import argparse + +parser = argparse.ArgumentParser(description='Detailed options for PPFL config.') +parser.add_argument('-a', '--clear_learning', action='store_true', + help='Learning in the clear (vs SMP protocol)') +parser.add_argument('-c', '--config', required=True, + help='Name of config file to execute') +parser.add_argument('-e', '--epsilon', type=float, default=1.0, + help='Privacy loss epsilon') +parser.add_argument('-g', '--num_subgraphs', type=int, default=1, + help='Number of connected subgraphs into which to place client agents') +parser.add_argument('-i', '--num_iterations', type=int, default=5, + help='Number of iterations for the secure multiparty protocol)') +parser.add_argument('-k', '--skip_log', action='store_true', + help='Skip writing agent logs to disk') +parser.add_argument('-l', '--log_dir', default=None, + help='Log directory name (default: unix timestamp at program start)') +parser.add_argument('-m', '--max_logreg_iterations', type=int, default=50, + help='Number of iterations for client local LogReg'), +parser.add_argument('-n', '--num_clients', type=int, default=5, + help='Number of clients for the secure multiparty protocol)') +parser.add_argument('-o', '--collusion', action='store_true', + help='Compute collusion analysis (big and slow!)') +parser.add_argument('-p', '--split_size', type=int, default=20, + help='Local training size per client per iteration') +parser.add_argument('-r', '--learning_rate', type=float, default=10.0, + help='Local learning rate for training on client data') +parser.add_argument('-s', '--seed', type=int, default=None, + help='numpy.random.seed() for simulation') +parser.add_argument('-v', '--verbose', action='store_true', + help='Maximum verbosity!') +parser.add_argument('--config_help', action='store_true', + help='Print argument options for this config file') + +args, remaining_args = parser.parse_known_args() + +if args.config_help: + parser.print_help() + exit() + +# Historical date to simulate. Required even if not relevant. +historical_date = pd.to_datetime('2014-01-28') + +# Requested log directory. +log_dir = args.log_dir +skip_log = args.skip_log + +# Random seed specification on the command line. Default: None (by clock). +# If none, we select one via a specific random method and pass it to seed() +# so we can record it for future use. (You cannot reasonably obtain the +# automatically generated seed when seed() is called without a parameter.) + +# Note that this seed is used to (1) make any random decisions within this +# config file itself and (2) to generate random number seeds for the +# (separate) Random objects given to each agent. This ensure that when +# the agent population is appended, prior agents will continue to behave +# in the same manner save for influences by the new agents. (i.e. all prior +# agents still have their own separate PRNG sequence, and it is the same as +# before) + +seed = args.seed +if not seed: seed = int(pd.Timestamp.now().timestamp() * 1000000) % (2**32 - 1) +np.random.seed(seed) + +# Config parameter that causes util.util.print to suppress most output. +util.silent_mode = not args.verbose + +num_clients = args.num_clients +num_iterations = args.num_iterations +split_size = args.split_size +max_logreg_iterations = args.max_logreg_iterations +epsilon = args.epsilon +learning_rate = args.learning_rate +clear_learning = args.clear_learning +collusion = args.collusion + +### How many client agents will there be? 1000 in 125 subgraphs of 8 fits ln(n), for example +num_subgraphs = args.num_subgraphs + +print ("Silent mode: {}".format(util.silent_mode)) +print ("Configuration seed: {}\n".format(seed)) + + + +# Since the simulator often pulls historical data, we use a real-world +# nanosecond timestamp (pandas.Timestamp) for our discrete time "steps", +# which are considered to be nanoseconds. For other (or abstract) time +# units, one can either configure the Timestamp interval, or simply +# interpret the nanoseconds as something else. + +# What is the earliest available time for an agent to act during the +# simulation? +midnight = historical_date +kernelStartTime = midnight + +# When should the Kernel shut down? +kernelStopTime = midnight + pd.to_timedelta('17:00:00') + +# This will configure the kernel with a default computation delay +# (time penalty) for each agent's wakeup and recvMsg. An agent +# can change this at any time for itself. (nanoseconds) + +defaultComputationDelay = 1000000000 * 5 # five seconds + +# IMPORTANT NOTE CONCERNING AGENT IDS: the id passed to each agent must: +# 1. be unique +# 2. equal its index in the agents list +# This is to avoid having to call an extra getAgentListIndexByID() +# in the kernel every single time an agent must be referenced. + + +### Configure the Kernel. +kernel = Kernel("Base Kernel", random_state = np.random.RandomState(seed=np.random.randint(low=0,high=2**32))) + +### Obtain random state for whatever latency model will be used. +latency_rstate = np.random.RandomState(seed=np.random.randint(low=0,high=2**32)) + +### Configure the agents. When conducting "agent of change" experiments, the +### new agents should be added at the END only. +agent_count = 0 +agents = [] +agent_types = [] + +### What accuracy multiplier will be used? +accy_multiplier = 100000 + +### What will be the scale of the shared secret? +secret_scale = 1000000 + +### For now, non-integer sizes are NOT handled. Please choose an even divisor. +subgraph_size = int(floor(num_clients / num_subgraphs)) + +logReg.number_of_parties = num_clients + + +### LOAD DATA HERE +# +# The data should be loaded only once (for speed). Data should usually be +# shuffled, split into training and test data, and passed to the client +# parties. +# +# X_data should be a numpy array with column-wise features and row-wise +# examples. y_data should contain the same number of rows (examples) +# and a single column representing the label. +# +# Usually this will be passed through a function to shuffle and split +# the data into the structures expected by the PPFL clients. For example: +# X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.25, random_state = shuffle_seed) +# + +# For the template, we generate some random numbers that could serve as example data. +# Note that this data is unlikely to be useful for learning. It just fills the need +# for data of a specific format. +example_features = 10 +example_rows = 10000 +X_data = np.random.uniform(size=(example_rows,example_features)) +y_data = np.random.uniform(size=(example_rows,1)) + +# Randomly shuffle and split the data for training and testing. +X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.25) + +# +# +### END OF LOAD DATA SECTION + + +### Configure a service agent. + +agents.extend([ PPFL_ServiceAgent(0, "PPFL Service Agent 0", "PPFL_ServiceAgent", + random_state = np.random.RandomState(seed=np.random.randint(low=0,high=2**32)), + msg_fwd_delay=0, iterations = num_iterations, num_clients = num_clients) ]) +agent_types.extend(["PPFL_ServiceAgent"]) +agent_count += 1 + + +### Configure a population of cooperating learning client agents. +a, b = agent_count, agent_count + num_clients + +client_init_start = time() + +# Iterate over all client IDs. +for i in range (a, b): + + # Determine subgraph. + subgraph = int(floor((i - a) / subgraph_size)) + + #print ("Neighborhood for agent {} is {}".format(i, subgraph)) + + # Determine agents in subgraph. + subgraph_start = a + (subgraph * subgraph_size) + subgraph_end = a + ((subgraph + 1) * subgraph_size) + + neighbors = range(subgraph_start, subgraph_end) + + #print ("Peers for {} are {}".format(i, [x for x in neighbors if x != i])) + + # Peer list is all agents in subgraph except self. + agents.append(PPFL_ClientAgent(i, "PPFL Client Agent {}".format(i), "PPFL_ClientAgent", + peer_list = [ x for x in neighbors if x != i ], iterations = num_iterations, + num_clients = num_clients, num_subgraphs = num_subgraphs, + multiplier = accy_multiplier, X_train = X_train, y_train = y_train, X_test = X_test, y_test = y_test, + split_size = split_size, secret_scale = secret_scale, + random_state = np.random.RandomState(seed=np.random.randint(low=0,high=2**32)))) + +agent_types.extend([ "PPFL_ClientAgent" for i in range(a,b) ]) +agent_count += num_clients + +client_init_end = time() +init_seconds = client_init_end - client_init_start +td_init = timedelta(seconds = init_seconds) +print (f"Client init took {td_init}") + + +### Configure a latency model for the agents. + +# Get a new-style cubic LatencyModel from the networking literature. +pairwise = (len(agent_types),len(agent_types)) + +model_args = { 'connected' : True, + + # All in NYC. Only matters for evaluating "real world" protocol duration, + # not for accuracy, collusion, or reconstruction. + 'min_latency' : np.random.uniform(low = 21000, high = 100000, size = pairwise), + 'jitter' : 0.3, + 'jitter_clip' : 0.05, + 'jitter_unit' : 5, + } + +latency_model = LatencyModel ( latency_model = 'cubic', random_state = latency_rstate, kwargs = model_args ) + + +# Start the kernel running. +results = kernel.runner(agents = agents, startTime = kernelStartTime, stopTime = kernelStopTime, + agentLatencyModel = latency_model, + defaultComputationDelay = defaultComputationDelay, + skip_log = skip_log, log_dir = log_dir) + + +# Print parameter summary and elapsed times by category for this experimental trial. +print () +print (f"Protocol Iterations: {num_iterations}, Clients: {num_clients}, Split Size: {split_size}, " \ + f"Local Iterations {max_logreg_iterations}, Learning Rate: {learning_rate}.") +print (f"Learning in the clear? {clear_learning}, Privacy Epsilon: {epsilon}.") +print () +print ("Service Agent mean time per iteration...") +print (f" Storing models: {results['srv_store_model'] / num_iterations}") +print (f" Combining models: {results['srv_combine_model'] / num_iterations}") +print () +print ("Client Agent mean time per iteration (except DH Offline)...") +print (f" DH Offline: {results['dh_offline'] / num_clients}") +print (f" DH Online: {results['dh_online'] / num_clients}") +print (f" Training: {results['training'] / num_clients}") +print (f" Encryption: {results['encryption'] / num_clients}") +print () +print (f"Slowest agent simulated time: {results['kernel_slowest_agent_finish_time']}") + + +# Write out the timing log to disk. +if not exists("results/timing_log.csv"): + with open('results/timing_log.csv', 'a') as results_file: + results_file.write(f"Clients,Peers,Subgraphs,Iterations,Train Rows,Learning Rate,In The Clear?,Local Iterations,Epsilon,DH Offline,DH Online,Training,Encryption,Store Model,Combine Model,Last Agent Finish,Time to Simulate\n") + + with open('results/timing_log.csv', 'a') as results_file: + results_file.write(f"{num_clients},{subgraph_size-1},{num_subgraphs},{num_iterations},{split_size},{learning_rate},{clear_learning},{max_logreg_iterations},{epsilon},{results['dh_offline'] / num_clients},{results['dh_online'] / num_clients},{results['training'] / num_clients},{results['encryption'] / num_clients},{results['srv_store_model']},{results['srv_combine_model']},{results['kernel_event_queue_elapsed_wallclock']},{results['kernel_slowest_agent_finish_time']}\n") + +