Skip to content

Commit

Permalink
Added functionality for retrieving variables from control dependencies (
Browse files Browse the repository at this point in the history
ray-project#220)

* Added test for retriving variables from an optimizer

* Added comments to test

* Addressed comments

* Fixed travis bug

* Added fix to circular controls

* Added set for explored operations and duplicate prefix stripping

* Removed embeded ipython

* Removed prefix, use seperate graph for each network

* Removed redundant imports

* Addressed comments and added separate graph to initializer

* fix typos

* get rid of prefix in documentation
  • Loading branch information
Wapaul1 authored and pcmoritz committed Jan 31, 2017
1 parent 6703f7b commit db72978
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 36 deletions.
23 changes: 8 additions & 15 deletions doc/using-ray-with-tensorflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,18 @@ b.assign(np.zeros(1)) # This adds a node to the graph every time you call it.
## Complete Example

Putting this all together, we would first create the graph on each worker using
environment variables. Within the environment variables, we would use the
environment variables. Within the environment variables, we would use the
`get_weights` and `set_weights` methods of the `TensorFlowVariables` class. We
would then use those methods to ship the weights (as a dictionary of variable
would then use those methods to ship the weights (as a dictionary of variable
names mapping to tensorflow tensors) between the processes without shipping the
actual TensorFlow graphs, which are much more complex Python objects. Note that
to avoid namespace collision with already created variables on the workers, we
use a variable_scope and a prefix in the environment variables and then pass
true to the prefix in `TensorFlowVariables` so it can properly decode the variable
names.
to avoid namespace collision with already created variables on the workers, we
use a separate graph for each network.

```python
import tensorflow as tf
import numpy as np
import ray
import uuid

ray.init(num_workers=5)

Expand All @@ -95,11 +92,8 @@ NUM_BATCHES = 1
NUM_ITERS = 201

def net_vars_initializer():
# Prefix should be random so that there is no conflict with variable names in
# the cluster setting.
prefix = str(uuid.uuid1().hex)
# Use the tensorflow variable_scope to prefix all of the variables
with tf.variable_scope(prefix):
# Use a separate graph for each network.
with tf.Graph().as_default():
# Seed TensorFlow to make the script deterministic.
tf.set_random_seed(0)
# Define the inputs.
Expand All @@ -116,9 +110,8 @@ def net_vars_initializer():
# Define the weight initializer and session.
init = tf.global_variables_initializer()
sess = tf.Session()
# Additional code for setting and getting the weights, and use a prefix
# so that the variable names can be converted between workers.
variables = ray.experimental.TensorFlowVariables(loss, sess, prefix=True)
# Additional code for setting and getting the weights
variables = ray.experimental.TensorFlowVariables(loss, sess)
# Return all of the data needed to use the network.
return variables, sess, train, loss, x_data, y_data, init

Expand Down
3 changes: 2 additions & 1 deletion examples/lbfgs/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def grad(self, xs, ys):
return self.sess.run(self.cross_entropy_grads, feed_dict={self.x: xs, self.y_: ys})

def net_initialization():
return LinearModel([784,10])
with tf.Graph().as_default():
return LinearModel([784,10])

# By default, when an environment variable is used by a remote function, the
# initialization code will be rerun at the end of the remote task to ensure
Expand Down
31 changes: 22 additions & 9 deletions python/ray/experimental/tfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,41 @@ class TensorFlowVariables(object):
assignment_placeholders (List[tf.placeholders]): The nodes that weights get
passed to.
assignment_nodes (List[tf.Tensor]): The nodes that assign the weights.
prefix (Bool): Boolean for if there is a prefix on the variable names.
"""
def __init__(self, loss, sess=None, prefix=False):
def __init__(self, loss, sess=None):
"""Creates a TensorFlowVariables instance."""
import tensorflow as tf
self.sess = sess
self.loss = loss
self.prefix = prefix
queue = deque([loss])
variable_names = []
explored_inputs = set([loss])

# We do a BFS on the dependency graph of the input function to find
# the variables.
while len(queue) != 0:
op = queue.popleft().op
queue.extend(op.inputs)
if op.node_def.op == "Variable":
variable_names.append(op.node_def.name)
tf_obj = queue.popleft()

# The object put into the queue is not necessarily an operation, so we
# want the op attribute to get the operation underlying the object.
# Only operations contain the inputs that we can explore.
if hasattr(tf_obj, "op"):
tf_obj = tf_obj.op
for input_op in tf_obj.inputs:
if input_op not in explored_inputs:
queue.append(input_op)
explored_inputs.add(input_op)
# Tensorflow control inputs can be circular, so we keep track of
# explored operations.
for control in tf_obj.control_inputs:
if control not in explored_inputs:
queue.append(control)
explored_inputs.add(control)
if tf_obj.node_def.op == "Variable":
variable_names.append(tf_obj.node_def.name)
self.variables = OrderedDict()
for v in [v for v in tf.global_variables() if v.op.node_def.name in variable_names]:
name = v.op.node_def.name.split("/", 1 if prefix else 0)[-1]
self.variables[name] = v
self.variables[v.op.node_def.name] = v
self.assignment_placeholders = dict()
self.assignment_nodes = []

Expand Down
63 changes: 52 additions & 11 deletions test/tensorflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,39 @@ def make_linear_network(w_name=None, b_name=None):
b = tf.Variable(tf.zeros([1]), name=b_name)
y = w * x_data + b
# Return the loss and weight initializer.
return tf.reduce_mean(tf.square(y - y_data)), tf.global_variables_initializer()
return tf.reduce_mean(tf.square(y - y_data)), tf.global_variables_initializer(), x_data, y_data

def net_vars_initializer():
# Random prefix so variable names do not clash if we use nets with
# the same name.
prefix = str(uuid.uuid1().hex)
# Use the tensorflow variable_scope to prefix all of the variables
with tf.variable_scope(prefix):
# Uses a separate graph for each network.
with tf.Graph().as_default():
# Create the network.
loss, init = make_linear_network()
loss, init, _, _ = make_linear_network()
sess = tf.Session()
# Additional code for setting and getting the weights.
variables = ray.experimental.TensorFlowVariables(loss, sess, prefix=True)
variables = ray.experimental.TensorFlowVariables(loss, sess)
# Return all of the data needed to use the network.
return variables, init, sess

def net_vars_reinitializer(net_vars):
return net_vars

def train_vars_initializer():
# Almost the same as above, but now returns the placeholders and gradient.
with tf.Graph().as_default():
loss, init, x_data, y_data = make_linear_network()
sess = tf.Session()
variables = ray.experimental.TensorFlowVariables(loss, sess)
grad = tf.gradients(loss, list(variables.variables.values()))
return variables, init, sess, grad, [x_data, y_data]


class TensorFlowTest(unittest.TestCase):

def testTensorFlowVariables(self):
ray.init(num_workers=2)

sess = tf.Session()
loss, init = make_linear_network()
loss, init, _, _ = make_linear_network()
sess.run(init)

variables = ray.experimental.TensorFlowVariables(loss, sess)
Expand All @@ -54,7 +61,7 @@ def testTensorFlowVariables(self):
variables.set_weights(weights)
self.assertEqual(weights, variables.get_weights())

loss2, init2 = make_linear_network("w", "b")
loss2, init2, _, _ = make_linear_network("w", "b")
sess.run(init2)

variables2 = ray.experimental.TensorFlowVariables(loss2, sess)
Expand Down Expand Up @@ -148,7 +155,7 @@ def testNetworkDriverWorkerIndependent(self):

# Create a network on the driver locally.
sess1 = tf.Session()
loss1, init1 = make_linear_network()
loss1, init1, _, _ = make_linear_network()
net_vars1 = ray.experimental.TensorFlowVariables(loss1, sess1)
sess1.run(init1)

Expand All @@ -170,5 +177,39 @@ def set_and_get_weights(weights):

ray.worker.cleanup()

def testVariablesControlDependencies(self):
ray.init(num_workers=1)

# Creates a network and appends a momentum optimizer.
sess = tf.Session()
loss, init, _, _ = make_linear_network()
minimizer = tf.train.MomentumOptimizer(0.9, 0.9).minimize(loss)
net_vars = ray.experimental.TensorFlowVariables(minimizer, sess)
sess.run(init)

# Tests if all variables are properly retrieved, 2 variables and 2 momentum
# variables.
self.assertEqual(len(net_vars.variables.items()), 4)

ray.worker.cleanup()

def testRemoteTrainingStep(self):
ray.init(num_workers=1)

ray.env.net = ray.EnvironmentVariable(train_vars_initializer, net_vars_reinitializer)

@ray.remote
def training_step(weights):
variables, _, sess, grad, placeholders = ray.env.net
variables.set_weights(weights)
return sess.run(grad, feed_dict=dict(zip(placeholders, [[1]*100]*2)))

variables, init, sess, _, _ = ray.env.net

sess.run(init)
ray.get(training_step.remote(variables.get_weights()))

ray.worker.cleanup()

if __name__ == "__main__":
unittest.main(verbosity=2)

0 comments on commit db72978

Please sign in to comment.