Skip to content

Commit

Permalink
make services.cleanup happen automatically (ray-project#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertnishihara authored and pcmoritz committed Jul 7, 2016
1 parent 63f7548 commit 8e6b792
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 24 deletions.
1 change: 0 additions & 1 deletion examples/hyperopt/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,3 @@
print "Best parameters are now {}.".format(params)

print "Best parameters over {} samples was {}, with an accuracy of {:.4}%.".format(samples, best_params, 100 * best_accuracy)
services.cleanup()
2 changes: 0 additions & 2 deletions examples/imagenet/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,3 @@
if args.drop_ipython:
import IPython
IPython.embed()

services.cleanup()
2 changes: 0 additions & 2 deletions examples/lbfgs/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,3 @@ def full_grad(theta):
result = scipy.optimize.fmin_l_bfgs_b(full_loss, theta_init, maxiter=10, fprime=full_grad, disp=True)
end_time = time.time()
print "Elapsed time = {}".format(end_time - start_time)

services.cleanup()
84 changes: 68 additions & 16 deletions lib/python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ def new_objstore_port():
return 20000 + objstore_port_counter

def cleanup():
"""
This method is used to shutdown processes that were started with
services.start_ray_local(). It kills all scheduler, object store, and worker
processes that were started by this services module. It disconnects driver
processes but does not kill them. Users should not invoke this manually. It
will automatically run at the end when a Python process that imports
services exits. It is ok to run this twice in a row. Note that we manaully
call services.cleanup() in the tests because we need to start and stop many
clusters in the tests, but in the tests, services is only imported and only
exits once.
"""
global drivers
for driver in drivers:
ray.disconnect(driver)
Expand Down Expand Up @@ -66,23 +77,54 @@ def cleanup():
print "Termination attempt failed, giving up."
all_processes = []

# atexit.register(cleanup)
atexit.register(cleanup)

def start_scheduler(scheduler_address, local=True):
"""
This method starts a scheduler process.
def start_scheduler(scheduler_address):
:param scheduler_address: The ip address and port to use for the scheduler.
:param local: True if using Ray in local mode. If local is true, then this
process will be killed by serices.cleanup() when the Python process that
imported services exits.
"""
p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler.log")], env=_services_env)
all_processes.append((p, scheduler_address))
if local:
all_processes.append((p, scheduler_address))

def start_objstore(scheduler_address, objstore_address, local=True):
"""
This method starts an object store process.
def start_objstore(scheduler_address, objstore_address):
:param scheduler_address: The ip address and port of the scheduler to connect to.
:param objstore_address: The ip address and port to use for the object store.
:param local: True if using Ray in local mode. If local is true, then this
process will be killed by serices.cleanup() when the Python process that
imported services exits.
"""
p = subprocess.Popen(["objstore", scheduler_address, objstore_address, "--log-file-name", config.get_log_file_path("-".join(["objstore", objstore_address]) + ".log")], env=_services_env)
all_processes.append((p, objstore_address))
if local:
all_processes.append((p, objstore_address))

def start_worker(worker_path, scheduler_address, objstore_address, worker_address):
def start_worker(worker_path, scheduler_address, objstore_address, worker_address, local=True):
"""
This method starts a worker process.
:param worker_path: The path of the source code which the worker process will run.
:param scheduler_address: The ip address and port of the scheduler to connect to.
:param objstore_address: The ip address and port of the object store to connect to.
:param worker_address: The ip address and port to use for the worker.
:param local: True if using Ray in local mode. If local is true, then this
process will be killed by serices.cleanup() when the Python process that
imported services exits.
"""
p = subprocess.Popen(["python",
worker_path,
"--scheduler-address=" + scheduler_address,
"--objstore-address=" + objstore_address,
"--worker-address=" + worker_address])
all_processes.append((p, worker_address))
if local:
all_processes.append((p, worker_address))

def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None):
"""
Expand All @@ -95,10 +137,10 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
:param worker_path: path of the source code that will be run on the worker
"""
objstore_address = address(node_ip_address, new_objstore_port())
start_objstore(scheduler_address, objstore_address)
start_objstore(scheduler_address, objstore_address, local=False)
time.sleep(0.2)
for _ in range(num_workers):
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()))
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False)
time.sleep(0.3)
ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), is_driver=True)
time.sleep(0.5)
Expand All @@ -118,12 +160,22 @@ def start_workers(scheduler_address, objstore_address, num_workers, worker_path)
"""
node_ip_address = objstore_address.split(":")[0]
for _ in range(num_workers):
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()))
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False)

# driver_mode should equal ray.SCRIPT_MODE if this is being run in a script and
# ray.SHELL_MODE if it is being used interactively in a shell. It can also equal
# ray.PYTHON_MODE to run things in a manner equivalent to serial Python code.
def start_ray_local(num_workers=0, worker_path=None, driver_mode=ray.SCRIPT_MODE):
"""
This method starts Ray in local mode (as opposed to cluster mode, which is
handled by cluster.py).
:param num_workers: The number of workers to start.
:param worker_path: The path of the source code that will be run by the worker
:param driver_mode: The mode for the driver, this only affects the printing of
error messages. This should be ray.SCRIPT_MODE if the driver is being run in
a script. It should be ray.SHELL_MODE if it is being used interactively in
the shell. It should be ray.PYTHON_MODE to run things in a manner eqivalent
to serial Python code. It should be ray.WORKER_MODE to surpress the printing
of error messages.
"""
start_services_local(num_objstores=1, num_workers_per_objstore=num_workers, worker_path=worker_path, driver_mode=driver_mode)

# This is a helper method which is only used in the tests and should not be
Expand All @@ -135,17 +187,17 @@ def start_services_local(num_objstores=1, num_workers_per_objstore=0, worker_pat
if num_workers_per_objstore > 0 and num_objstores < 1:
raise Exception("Attempting to start a cluster with {} workers per object store, but `num_objstores` is {}.".format(num_objstores))
scheduler_address = address(IP_ADDRESS, new_scheduler_port())
start_scheduler(scheduler_address)
start_scheduler(scheduler_address, local=True)
time.sleep(0.1)
objstore_addresses = []
# create objstores
for i in range(num_objstores):
objstore_address = address(IP_ADDRESS, new_objstore_port())
objstore_addresses.append(objstore_address)
start_objstore(scheduler_address, objstore_address)
start_objstore(scheduler_address, objstore_address, local=True)
time.sleep(0.2)
for _ in range(num_workers_per_objstore):
start_worker(worker_path, scheduler_address, objstore_address, address(IP_ADDRESS, new_worker_port()))
start_worker(worker_path, scheduler_address, objstore_address, address(IP_ADDRESS, new_worker_port()), local=True)
time.sleep(0.3)
# create drivers
if return_drivers:
Expand Down
3 changes: 0 additions & 3 deletions scripts/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,3 @@

import IPython
IPython.embed()

if not args.attach:
services.cleanup()

0 comments on commit 8e6b792

Please sign in to comment.