Skip to content

Commit

Permalink
Update stress tests. (ray-project#3614)
Browse files Browse the repository at this point in the history
Starts clusters for testing and has a fallback to kill the cluster if the command fails.

The results are then printed at the end of test.
  • Loading branch information
robertnishihara authored and richardliaw committed Jan 14, 2019
1 parent a5d1f03 commit 27c20a4
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 36 deletions.
36 changes: 26 additions & 10 deletions test/stress_tests/run_stress_tests.sh
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
#!/usr/bin/env bash

# Cause the script to exit if a single command fails.
set -e

# Show explicitly which commands are currently running.
set -x

ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
RESULT_FILE=$ROOT_DIR/results-$(date '+%Y-%m-%d_%H-%M-%S').log
echo "Logging to" $RESULT_FILE
touch $RESULT_FILE

run_test(){
local test_name=$1

local CLUSTER="stress_testing_config.yaml"
echo "Try running $test_name."
{
ray up -y $CLUSTER --cluster-name "$test_name" &&
sleep 1 &&
ray submit $CLUSTER --cluster-name "$test_name" "$test_name.py"
} || echo "FAIL: $test_name" >> $RESULT_FILE

# Start a large cluster using the autoscaler.
ray up -y $ROOT_DIR/stress_testing_config.yaml
# Tear down cluster.
if [ "$DEBUG_MODE" = "" ]; then
ray down -y $CLUSTER --cluster-name "$test_name"
else
echo "Not tearing down cluster" $CLUSTER
fi
}

# Run a bunch of stress tests.
ray submit $ROOT_DIR/stress_testing_config.yaml test_many_tasks_and_transfers.py
ray submit $ROOT_DIR/stress_testing_config.yaml test_dead_actors.py
pushd "$ROOT_DIR"
run_test test_many_tasks_and_transfers
run_test test_dead_actors
popd

# Tear down the cluster.
ray down -y $ROOT_DIR/stress_testing_config.yaml
cat $RESULT_FILE
3 changes: 2 additions & 1 deletion test/stress_tests/test_dead_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def kill(self):
parents = [
Parent.remote(num_children, death_probability) for _ in range(num_parents)
]

for i in range(100):
ray.get([parent.ping.remote(10) for parent in parents])

Expand All @@ -69,4 +70,4 @@ def kill(self):
parents[parent_index].kill.remote()
parents[parent_index] = Parent.remote(num_children, death_probability)

logger.info("Finished trial", i)
logger.info("Finished trial %s", i)
56 changes: 31 additions & 25 deletions test/stress_tests/test_many_tasks_and_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
while True:
if len(ray.global_state.client_table()) >= num_remote_nodes + 1:
break
logger.info("Nodes have all joined. There are {} resources."
.format(ray.global_state.cluster_resources()))
logger.info("Nodes have all joined. There are %s resources.",
ray.global_state.cluster_resources())


# Require 1 GPU to force the tasks to be on remote machines.
Expand All @@ -40,45 +40,51 @@ def method(self, size, *xs):
return np.ones(size, dtype=np.uint8)


# Launch a bunch of tasks.
# Launch a bunch of tasks. (approximately 200 seconds)
start_time = time.time()
logger.info("Submitting many tasks.")
for i in range(10):
logger.info("Iteration {}".format(i))
logger.info("Iteration %s", i)
ray.get([f.remote(0) for _ in range(100000)])
logger.info("Finished after {} seconds.".format(time.time() - start_time))
logger.info("Finished after %s seconds.", time.time() - start_time)

# Launch a bunch of tasks, each with a bunch of dependencies.
# Launch a bunch of tasks, each with a bunch of dependencies. TODO(rkn): This
# test starts to fail if we increase the number of tasks in the inner loop from
# 500 to 1000. (approximately 615 seconds)
start_time = time.time()
logger.info("Submitting tasks with many dependencies.")
x_ids = []
for i in range(5):
logger.info("Iteration {}".format(i))
x_ids = [f.remote(0, *x_ids) for _ in range(10000)]
ray.get(x_ids)
logger.info("Finished after {} seconds.".format(time.time() - start_time))
for _ in range(5):
for i in range(20):
logger.info("Iteration %s. Cumulative time %s seconds", i,
time.time() - start_time)
x_ids = [f.remote(0, *x_ids) for _ in range(500)]
ray.get(x_ids)
logger.info("Finished after %s seconds.", time.time() - start_time)

# Create a bunch of actors.
start_time = time.time()
logger.info("Creating {} actors.".format(num_remote_cpus))
logger.info("Creating %s actors.", num_remote_cpus)
actors = [Actor.remote() for _ in range(num_remote_cpus)]
logger.info("Finished after {} seconds.".format(time.time() - start_time))
logger.info("Finished after %s seconds.", time.time() - start_time)

# Submit a bunch of small tasks to each actor.
# Submit a bunch of small tasks to each actor. (approximately 1070 seconds)
start_time = time.time()
logger.info("Submitting many small actor tasks.")
x_ids = []
for _ in range(100000):
x_ids = [a.method.remote(0) for a in actors]
ray.get(x_ids)
logger.info("Finished after {} seconds.".format(time.time() - start_time))

# Submit a bunch of actor tasks with all-to-all communication.
start_time = time.time()
logger.info("Submitting actor tasks with all-to-all communication.")
x_ids = []
for _ in range(50):
for size_exponent in [0, 1, 2, 3, 4, 5, 6]:
x_ids = [a.method.remote(10**size_exponent, *x_ids) for a in actors]
ray.get(x_ids)
logger.info("Finished after {} seconds.".format(time.time() - start_time))
logger.info("Finished after %s seconds.", time.time() - start_time)

# TODO(rkn): The test below is commented out because it currently does not
# pass.
# # Submit a bunch of actor tasks with all-to-all communication.
# start_time = time.time()
# logger.info("Submitting actor tasks with all-to-all communication.")
# x_ids = []
# for _ in range(50):
# for size_exponent in [0, 1, 2, 3, 4, 5, 6]:
# x_ids = [a.method.remote(10**size_exponent, *x_ids) for a in actors]
# ray.get(x_ids)
# logger.info("Finished after %s seconds.", time.time() - start_time)

0 comments on commit 27c20a4

Please sign in to comment.