From db7297865f34928cef6197104cb8effe38364ab6 Mon Sep 17 00:00:00 2001 From: Wapaul1 Date: Mon, 30 Jan 2017 19:17:42 -0800 Subject: [PATCH] Added functionality for retrieving variables from control dependencies (#220) * Added test for retriving variables from an optimizer * Added comments to test * Addressed comments * Fixed travis bug * Added fix to circular controls * Added set for explored operations and duplicate prefix stripping * Removed embeded ipython * Removed prefix, use seperate graph for each network * Removed redundant imports * Addressed comments and added separate graph to initializer * fix typos * get rid of prefix in documentation --- doc/using-ray-with-tensorflow.md | 23 ++++------- examples/lbfgs/driver.py | 3 +- python/ray/experimental/tfutils.py | 31 ++++++++++----- test/tensorflow_test.py | 63 ++++++++++++++++++++++++------ 4 files changed, 84 insertions(+), 36 deletions(-) diff --git a/doc/using-ray-with-tensorflow.md b/doc/using-ray-with-tensorflow.md index 4ee3769197870..d917509a4eaee 100644 --- a/doc/using-ray-with-tensorflow.md +++ b/doc/using-ray-with-tensorflow.md @@ -72,21 +72,18 @@ b.assign(np.zeros(1)) # This adds a node to the graph every time you call it. ## Complete Example Putting this all together, we would first create the graph on each worker using -environment variables. Within the environment variables, we would use the +environment variables. Within the environment variables, we would use the `get_weights` and `set_weights` methods of the `TensorFlowVariables` class. We -would then use those methods to ship the weights (as a dictionary of variable +would then use those methods to ship the weights (as a dictionary of variable names mapping to tensorflow tensors) between the processes without shipping the actual TensorFlow graphs, which are much more complex Python objects. Note that -to avoid namespace collision with already created variables on the workers, we -use a variable_scope and a prefix in the environment variables and then pass -true to the prefix in `TensorFlowVariables` so it can properly decode the variable -names. +to avoid namespace collision with already created variables on the workers, we +use a separate graph for each network. ```python import tensorflow as tf import numpy as np import ray -import uuid ray.init(num_workers=5) @@ -95,11 +92,8 @@ NUM_BATCHES = 1 NUM_ITERS = 201 def net_vars_initializer(): - # Prefix should be random so that there is no conflict with variable names in - # the cluster setting. - prefix = str(uuid.uuid1().hex) - # Use the tensorflow variable_scope to prefix all of the variables - with tf.variable_scope(prefix): + # Use a separate graph for each network. + with tf.Graph().as_default(): # Seed TensorFlow to make the script deterministic. tf.set_random_seed(0) # Define the inputs. @@ -116,9 +110,8 @@ def net_vars_initializer(): # Define the weight initializer and session. init = tf.global_variables_initializer() sess = tf.Session() - # Additional code for setting and getting the weights, and use a prefix - # so that the variable names can be converted between workers. - variables = ray.experimental.TensorFlowVariables(loss, sess, prefix=True) + # Additional code for setting and getting the weights + variables = ray.experimental.TensorFlowVariables(loss, sess) # Return all of the data needed to use the network. return variables, sess, train, loss, x_data, y_data, init diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index b7610865dd8dc..4427e6f35c3ac 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -61,7 +61,8 @@ def grad(self, xs, ys): return self.sess.run(self.cross_entropy_grads, feed_dict={self.x: xs, self.y_: ys}) def net_initialization(): - return LinearModel([784,10]) + with tf.Graph().as_default(): + return LinearModel([784,10]) # By default, when an environment variable is used by a remote function, the # initialization code will be rerun at the end of the remote task to ensure diff --git a/python/ray/experimental/tfutils.py b/python/ray/experimental/tfutils.py index eb7da888add53..258d60bbc9400 100644 --- a/python/ray/experimental/tfutils.py +++ b/python/ray/experimental/tfutils.py @@ -28,28 +28,41 @@ class TensorFlowVariables(object): assignment_placeholders (List[tf.placeholders]): The nodes that weights get passed to. assignment_nodes (List[tf.Tensor]): The nodes that assign the weights. - prefix (Bool): Boolean for if there is a prefix on the variable names. """ - def __init__(self, loss, sess=None, prefix=False): + def __init__(self, loss, sess=None): """Creates a TensorFlowVariables instance.""" import tensorflow as tf self.sess = sess self.loss = loss - self.prefix = prefix queue = deque([loss]) variable_names = [] + explored_inputs = set([loss]) # We do a BFS on the dependency graph of the input function to find # the variables. while len(queue) != 0: - op = queue.popleft().op - queue.extend(op.inputs) - if op.node_def.op == "Variable": - variable_names.append(op.node_def.name) + tf_obj = queue.popleft() + + # The object put into the queue is not necessarily an operation, so we + # want the op attribute to get the operation underlying the object. + # Only operations contain the inputs that we can explore. + if hasattr(tf_obj, "op"): + tf_obj = tf_obj.op + for input_op in tf_obj.inputs: + if input_op not in explored_inputs: + queue.append(input_op) + explored_inputs.add(input_op) + # Tensorflow control inputs can be circular, so we keep track of + # explored operations. + for control in tf_obj.control_inputs: + if control not in explored_inputs: + queue.append(control) + explored_inputs.add(control) + if tf_obj.node_def.op == "Variable": + variable_names.append(tf_obj.node_def.name) self.variables = OrderedDict() for v in [v for v in tf.global_variables() if v.op.node_def.name in variable_names]: - name = v.op.node_def.name.split("/", 1 if prefix else 0)[-1] - self.variables[name] = v + self.variables[v.op.node_def.name] = v self.assignment_placeholders = dict() self.assignment_nodes = [] diff --git a/test/tensorflow_test.py b/test/tensorflow_test.py index 62130ac1ddc8e..a4dae42e21d44 100644 --- a/test/tensorflow_test.py +++ b/test/tensorflow_test.py @@ -17,32 +17,39 @@ def make_linear_network(w_name=None, b_name=None): b = tf.Variable(tf.zeros([1]), name=b_name) y = w * x_data + b # Return the loss and weight initializer. - return tf.reduce_mean(tf.square(y - y_data)), tf.global_variables_initializer() + return tf.reduce_mean(tf.square(y - y_data)), tf.global_variables_initializer(), x_data, y_data def net_vars_initializer(): - # Random prefix so variable names do not clash if we use nets with - # the same name. - prefix = str(uuid.uuid1().hex) - # Use the tensorflow variable_scope to prefix all of the variables - with tf.variable_scope(prefix): + # Uses a separate graph for each network. + with tf.Graph().as_default(): # Create the network. - loss, init = make_linear_network() + loss, init, _, _ = make_linear_network() sess = tf.Session() # Additional code for setting and getting the weights. - variables = ray.experimental.TensorFlowVariables(loss, sess, prefix=True) + variables = ray.experimental.TensorFlowVariables(loss, sess) # Return all of the data needed to use the network. return variables, init, sess def net_vars_reinitializer(net_vars): return net_vars +def train_vars_initializer(): + # Almost the same as above, but now returns the placeholders and gradient. + with tf.Graph().as_default(): + loss, init, x_data, y_data = make_linear_network() + sess = tf.Session() + variables = ray.experimental.TensorFlowVariables(loss, sess) + grad = tf.gradients(loss, list(variables.variables.values())) + return variables, init, sess, grad, [x_data, y_data] + + class TensorFlowTest(unittest.TestCase): def testTensorFlowVariables(self): ray.init(num_workers=2) sess = tf.Session() - loss, init = make_linear_network() + loss, init, _, _ = make_linear_network() sess.run(init) variables = ray.experimental.TensorFlowVariables(loss, sess) @@ -54,7 +61,7 @@ def testTensorFlowVariables(self): variables.set_weights(weights) self.assertEqual(weights, variables.get_weights()) - loss2, init2 = make_linear_network("w", "b") + loss2, init2, _, _ = make_linear_network("w", "b") sess.run(init2) variables2 = ray.experimental.TensorFlowVariables(loss2, sess) @@ -148,7 +155,7 @@ def testNetworkDriverWorkerIndependent(self): # Create a network on the driver locally. sess1 = tf.Session() - loss1, init1 = make_linear_network() + loss1, init1, _, _ = make_linear_network() net_vars1 = ray.experimental.TensorFlowVariables(loss1, sess1) sess1.run(init1) @@ -170,5 +177,39 @@ def set_and_get_weights(weights): ray.worker.cleanup() + def testVariablesControlDependencies(self): + ray.init(num_workers=1) + + # Creates a network and appends a momentum optimizer. + sess = tf.Session() + loss, init, _, _ = make_linear_network() + minimizer = tf.train.MomentumOptimizer(0.9, 0.9).minimize(loss) + net_vars = ray.experimental.TensorFlowVariables(minimizer, sess) + sess.run(init) + + # Tests if all variables are properly retrieved, 2 variables and 2 momentum + # variables. + self.assertEqual(len(net_vars.variables.items()), 4) + + ray.worker.cleanup() + + def testRemoteTrainingStep(self): + ray.init(num_workers=1) + + ray.env.net = ray.EnvironmentVariable(train_vars_initializer, net_vars_reinitializer) + + @ray.remote + def training_step(weights): + variables, _, sess, grad, placeholders = ray.env.net + variables.set_weights(weights) + return sess.run(grad, feed_dict=dict(zip(placeholders, [[1]*100]*2))) + + variables, init, sess, _, _ = ray.env.net + + sess.run(init) + ray.get(training_step.remote(variables.get_weights())) + + ray.worker.cleanup() + if __name__ == "__main__": unittest.main(verbosity=2)