Skip to content

Commit

Permalink
Simplify put test and move it to failure tests. (ray-project#788)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertnishihara authored and pcmoritz committed Aug 1, 2017
1 parent c394a65 commit 37dafa4
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 95 deletions.
16 changes: 12 additions & 4 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,8 @@ def start_local_scheduler(redis_address,
def start_objstore(node_ip_address, redis_address,
object_manager_port=None, store_stdout_file=None,
store_stderr_file=None, manager_stdout_file=None,
manager_stderr_file=None, cleanup=True,
objstore_memory=None):
manager_stderr_file=None, objstore_memory=None,
cleanup=True):
"""This method starts an object store process.
Args:
Expand All @@ -585,11 +585,11 @@ def start_objstore(node_ip_address, redis_address,
manager_stderr_file: A file handle opened for writing to redirect
stderr to. If no redirection should happen, then this should be
None.
objstore_memory: The amount of memory (in bytes) to start the object
store with.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
objstore_memory: The amount of memory (in bytes) to start the object
store with.
Return:
A tuple of the Plasma store socket name, the Plasma manager socket
Expand Down Expand Up @@ -734,6 +734,7 @@ def start_ray_processes(address_info=None,
redis_port=None,
num_workers=None,
num_local_schedulers=1,
object_store_memory=None,
num_redis_shards=1,
worker_path=None,
cleanup=True,
Expand Down Expand Up @@ -762,6 +763,8 @@ def start_ray_processes(address_info=None,
stores until there are num_local_schedulers existing instances of
each, including ones already registered with the given
address_info.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
worker_path (str): The path of the source code that will be run by the
Expand Down Expand Up @@ -895,6 +898,7 @@ def start_ray_processes(address_info=None,
store_stderr_file=plasma_store_stderr_file,
manager_stdout_file=plasma_manager_stdout_file,
manager_stderr_file=plasma_manager_stderr_file,
objstore_memory=object_store_memory,
cleanup=cleanup)
object_store_addresses.append(object_store_address)
time.sleep(0.1)
Expand Down Expand Up @@ -1026,6 +1030,7 @@ def start_ray_head(address_info=None,
redis_port=None,
num_workers=0,
num_local_schedulers=1,
object_store_memory=None,
worker_path=None,
cleanup=True,
redirect_output=False,
Expand All @@ -1051,6 +1056,8 @@ def start_ray_head(address_info=None,
stores until there are at least num_local_schedulers existing
instances of each, including ones already registered with the given
address_info.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
worker_path (str): The path of the source code that will be run by the
worker.
cleanup (bool): If cleanup is true, then the processes started here
Expand All @@ -1077,6 +1084,7 @@ def start_ray_head(address_info=None,
redis_port=redis_port,
num_workers=num_workers,
num_local_schedulers=num_local_schedulers,
object_store_memory=object_store_memory,
worker_path=worker_path,
cleanup=cleanup,
redirect_output=redirect_output,
Expand Down
7 changes: 7 additions & 0 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ def _init(address_info=None,
object_id_seed=None,
num_workers=None,
num_local_schedulers=None,
object_store_memory=None,
driver_mode=SCRIPT_MODE,
redirect_output=False,
start_workers_from_local_scheduler=True,
Expand Down Expand Up @@ -845,6 +846,8 @@ def _init(address_info=None,
provided if start_ray_local is True.
num_local_schedulers (int): The number of local schedulers to start.
This is only provided if start_ray_local is True.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
driver_mode (bool): The mode in which to start the driver. This should
be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
redirect_output (bool): True if stdout and stderr for all the processes
Expand Down Expand Up @@ -909,6 +912,7 @@ def _init(address_info=None,
node_ip_address=node_ip_address,
num_workers=num_workers,
num_local_schedulers=num_local_schedulers,
object_store_memory=object_store_memory,
redirect_output=redirect_output,
start_workers_from_local_scheduler=(
start_workers_from_local_scheduler),
Expand All @@ -931,6 +935,9 @@ def _init(address_info=None,
if num_redis_shards is not None:
raise Exception("When connecting to an existing cluster, "
"num_redis_shards must not be provided.")
if object_store_memory is not None:
raise Exception("When connecting to an existing cluster, "
"object_store_memory must not be provided.")
# Get the node IP address if one is not provided.
if node_ip_address is None:
node_ip_address = services.get_node_ip_address(redis_address)
Expand Down
96 changes: 96 additions & 0 deletions test/failure_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import division
from __future__ import print_function

import numpy as np
import os
import ray
import sys
Expand Down Expand Up @@ -277,5 +278,100 @@ def f():
ray.worker.cleanup()


class PutErrorTest(unittest.TestCase):

def testPutError1(self):
store_size = 10 ** 6
ray.worker._init(start_ray_local=True, driver_mode=ray.SILENT_MODE,
object_store_memory=store_size)

num_objects = 3
object_size = 4 * 10 ** 5

# Define a task with a single dependency, a numpy array, that returns
# another array.
@ray.remote
def single_dependency(i, arg):
arg = np.copy(arg)
arg[0] = i
return arg

@ray.remote
def put_arg_task():
# Launch num_objects instances of the remote task, each dependent
# on the one before it. The result of the first task should get
# evicted.
args = []
arg = single_dependency.remote(0, np.zeros(object_size,
dtype=np.uint8))
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)

# Get the last value to force all tasks to finish.
value = ray.get(args[-1])
assert value[0] == i

# Get the first value (which should have been evicted) to force
# reconstruction. Currently, since we're not able to reconstruct
# `ray.put` objects that were evicted and whose originating tasks
# are still running, this for-loop should hang and push an error to
# the driver.
ray.get(args[0])

put_arg_task.remote()

# Make sure we receive the correct error message.
wait_for_errors(b"put_reconstruction", 1)

ray.worker.cleanup()

def testPutError2(self):
# This is the same as the previous test, but it calls ray.put directly.
store_size = 10 ** 6
ray.worker._init(start_ray_local=True, driver_mode=ray.SILENT_MODE,
object_store_memory=store_size)

num_objects = 3
object_size = 4 * 10 ** 5

# Define a task with a single dependency, a numpy array, that returns
# another array.
@ray.remote
def single_dependency(i, arg):
arg = np.copy(arg)
arg[0] = i
return arg

@ray.remote
def put_task():
# Launch num_objects instances of the remote task, each dependent
# on the one before it. The result of the first task should get
# evicted.
args = []
arg = ray.put(np.zeros(object_size, dtype=np.uint8))
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)

# Get the last value to force all tasks to finish.
value = ray.get(args[-1])
assert value[0] == i

# Get the first value (which should have been evicted) to force
# reconstruction. Currently, since we're not able to reconstruct
# `ray.put` objects that were evicted and whose originating tasks
# are still running, this for-loop should hang and push an error to
# the driver.
ray.get(args[0])

put_task.remote()

# Make sure we receive the correct error message.
wait_for_errors(b"put_reconstruction", 1)

ray.worker.cleanup()


if __name__ == "__main__":
unittest.main(verbosity=2)
91 changes: 0 additions & 91 deletions test/stress_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,97 +425,6 @@ def error_check(errors):
self.assertTrue(all(error[b"data"] == b"__main__.foo"
for error in errors))

def testPutErrors(self):
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
num_objects = 1000
size = self.plasma_store_memory * 2 // (num_objects * 8)

# Define a task with a single dependency, a numpy array, that returns
# another array.
@ray.remote
def single_dependency(i, arg):
arg = np.copy(arg)
arg[0] = i
return arg

# Define a root task that calls `ray.put` to put an argument in the
# object store.
@ray.remote
def put_arg_task(size):
# Launch num_objects instances of the remote task, each dependent
# on the one before it. The first instance of the task takes a
# numpy array as an argument, which is put into the object store.
args = []
arg = single_dependency.remote(0, np.zeros(size))
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)

# Get each value to force each task to finish. After some number of
# gets, old values should be evicted.
for i in range(num_objects):
value = ray.get(args[i])
self.assertEqual(value[0], i)
# Get each value again to force reconstruction. Currently, since
# we're not able to reconstruct `ray.put` objects that were evicted
# and whose originating tasks are still running, this for-loop
# should hang on its first iteration and push an error to the
# driver.
for i in range(num_objects):
value = ray.get(args[i])
self.assertEqual(value[0], i)

# Define a root task that calls `ray.put` directly.
@ray.remote
def put_task(size):
# Launch num_objects instances of the remote task, each dependent
# on the one before it. The first instance of the task takes an
# object ID returned by ray.put.
args = []
arg = ray.put(np.zeros(size))
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)

# Get each value to force each task to finish. After some number of
# gets, old values should be evicted.
for i in range(num_objects):
value = ray.get(args[i])
self.assertEqual(value[0], i)
# Get each value again to force reconstruction. Currently, since
# we're not able to reconstruct `ray.put` objects that were evicted
# and whose originating tasks are still running, this for-loop
# should hang on its first iteration and push an error to the
# driver.
for i in range(num_objects):
value = ray.get(args[i])
self.assertEqual(value[0], i)

put_arg_task.remote(size)

def error_check(errors):
return len(errors) > 1
errors = self.wait_for_errors(error_check)
# Make sure all the errors have the correct type.
self.assertTrue(all(error[b"type"] == b"put_reconstruction"
for error in errors))
self.assertTrue(all(error[b"data"] == b"__main__.put_arg_task"
for error in errors))

put_task.remote(size)

def error_check(errors):
return any(error[b"data"] == b"__main__.put_task"
for error in errors)
errors = self.wait_for_errors(error_check)
# Make sure all the errors have the correct type.
self.assertTrue(all(error[b"type"] == b"put_reconstruction"
for error in errors))
self.assertTrue(any(error[b"data"] == b"__main__.put_task"
for error in errors))

def testDriverPutErrors(self):
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
Expand Down

0 comments on commit 37dafa4

Please sign in to comment.