From 8e6b7929d6935539014c919841c17ca455fd7bba Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 7 Jul 2016 14:05:25 -0700 Subject: [PATCH] make services.cleanup happen automatically (#224) --- examples/hyperopt/driver.py | 1 - examples/imagenet/driver.py | 2 - examples/lbfgs/driver.py | 2 - lib/python/ray/services.py | 84 ++++++++++++++++++++++++++++++------- scripts/shell.py | 3 -- 5 files changed, 68 insertions(+), 24 deletions(-) diff --git a/examples/hyperopt/driver.py b/examples/hyperopt/driver.py index ebae1ba8ca53..3dd85d797978 100644 --- a/examples/hyperopt/driver.py +++ b/examples/hyperopt/driver.py @@ -36,4 +36,3 @@ print "Best parameters are now {}.".format(params) print "Best parameters over {} samples was {}, with an accuracy of {:.4}%.".format(samples, best_params, 100 * best_accuracy) -services.cleanup() diff --git a/examples/imagenet/driver.py b/examples/imagenet/driver.py index 54c2fab4cb9f..036dfbae594c 100644 --- a/examples/imagenet/driver.py +++ b/examples/imagenet/driver.py @@ -34,5 +34,3 @@ if args.drop_ipython: import IPython IPython.embed() - - services.cleanup() diff --git a/examples/lbfgs/driver.py b/examples/lbfgs/driver.py index ce3a50c44627..37186e87ac0f 100644 --- a/examples/lbfgs/driver.py +++ b/examples/lbfgs/driver.py @@ -50,5 +50,3 @@ def full_grad(theta): result = scipy.optimize.fmin_l_bfgs_b(full_loss, theta_init, maxiter=10, fprime=full_grad, disp=True) end_time = time.time() print "Elapsed time = {}".format(end_time - start_time) - - services.cleanup() diff --git a/lib/python/ray/services.py b/lib/python/ray/services.py index 6845e54878ef..7160f9ffe1d3 100644 --- a/lib/python/ray/services.py +++ b/lib/python/ray/services.py @@ -39,6 +39,17 @@ def new_objstore_port(): return 20000 + objstore_port_counter def cleanup(): + """ + This method is used to shutdown processes that were started with + services.start_ray_local(). It kills all scheduler, object store, and worker + processes that were started by this services module. It disconnects driver + processes but does not kill them. Users should not invoke this manually. It + will automatically run at the end when a Python process that imports + services exits. It is ok to run this twice in a row. Note that we manaully + call services.cleanup() in the tests because we need to start and stop many + clusters in the tests, but in the tests, services is only imported and only + exits once. + """ global drivers for driver in drivers: ray.disconnect(driver) @@ -66,23 +77,54 @@ def cleanup(): print "Termination attempt failed, giving up." all_processes = [] -# atexit.register(cleanup) +atexit.register(cleanup) + +def start_scheduler(scheduler_address, local=True): + """ + This method starts a scheduler process. -def start_scheduler(scheduler_address): + :param scheduler_address: The ip address and port to use for the scheduler. + :param local: True if using Ray in local mode. If local is true, then this + process will be killed by serices.cleanup() when the Python process that + imported services exits. + """ p = subprocess.Popen(["scheduler", scheduler_address, "--log-file-name", config.get_log_file_path("scheduler.log")], env=_services_env) - all_processes.append((p, scheduler_address)) + if local: + all_processes.append((p, scheduler_address)) + +def start_objstore(scheduler_address, objstore_address, local=True): + """ + This method starts an object store process. -def start_objstore(scheduler_address, objstore_address): + :param scheduler_address: The ip address and port of the scheduler to connect to. + :param objstore_address: The ip address and port to use for the object store. + :param local: True if using Ray in local mode. If local is true, then this + process will be killed by serices.cleanup() when the Python process that + imported services exits. + """ p = subprocess.Popen(["objstore", scheduler_address, objstore_address, "--log-file-name", config.get_log_file_path("-".join(["objstore", objstore_address]) + ".log")], env=_services_env) - all_processes.append((p, objstore_address)) + if local: + all_processes.append((p, objstore_address)) -def start_worker(worker_path, scheduler_address, objstore_address, worker_address): +def start_worker(worker_path, scheduler_address, objstore_address, worker_address, local=True): + """ + This method starts a worker process. + + :param worker_path: The path of the source code which the worker process will run. + :param scheduler_address: The ip address and port of the scheduler to connect to. + :param objstore_address: The ip address and port of the object store to connect to. + :param worker_address: The ip address and port to use for the worker. + :param local: True if using Ray in local mode. If local is true, then this + process will be killed by serices.cleanup() when the Python process that + imported services exits. + """ p = subprocess.Popen(["python", worker_path, "--scheduler-address=" + scheduler_address, "--objstore-address=" + objstore_address, "--worker-address=" + worker_address]) - all_processes.append((p, worker_address)) + if local: + all_processes.append((p, worker_address)) def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None): """ @@ -95,10 +137,10 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None :param worker_path: path of the source code that will be run on the worker """ objstore_address = address(node_ip_address, new_objstore_port()) - start_objstore(scheduler_address, objstore_address) + start_objstore(scheduler_address, objstore_address, local=False) time.sleep(0.2) for _ in range(num_workers): - start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) + start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False) time.sleep(0.3) ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), is_driver=True) time.sleep(0.5) @@ -118,12 +160,22 @@ def start_workers(scheduler_address, objstore_address, num_workers, worker_path) """ node_ip_address = objstore_address.split(":")[0] for _ in range(num_workers): - start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) + start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()), local=False) -# driver_mode should equal ray.SCRIPT_MODE if this is being run in a script and -# ray.SHELL_MODE if it is being used interactively in a shell. It can also equal -# ray.PYTHON_MODE to run things in a manner equivalent to serial Python code. def start_ray_local(num_workers=0, worker_path=None, driver_mode=ray.SCRIPT_MODE): + """ + This method starts Ray in local mode (as opposed to cluster mode, which is + handled by cluster.py). + + :param num_workers: The number of workers to start. + :param worker_path: The path of the source code that will be run by the worker + :param driver_mode: The mode for the driver, this only affects the printing of + error messages. This should be ray.SCRIPT_MODE if the driver is being run in + a script. It should be ray.SHELL_MODE if it is being used interactively in + the shell. It should be ray.PYTHON_MODE to run things in a manner eqivalent + to serial Python code. It should be ray.WORKER_MODE to surpress the printing + of error messages. + """ start_services_local(num_objstores=1, num_workers_per_objstore=num_workers, worker_path=worker_path, driver_mode=driver_mode) # This is a helper method which is only used in the tests and should not be @@ -135,17 +187,17 @@ def start_services_local(num_objstores=1, num_workers_per_objstore=0, worker_pat if num_workers_per_objstore > 0 and num_objstores < 1: raise Exception("Attempting to start a cluster with {} workers per object store, but `num_objstores` is {}.".format(num_objstores)) scheduler_address = address(IP_ADDRESS, new_scheduler_port()) - start_scheduler(scheduler_address) + start_scheduler(scheduler_address, local=True) time.sleep(0.1) objstore_addresses = [] # create objstores for i in range(num_objstores): objstore_address = address(IP_ADDRESS, new_objstore_port()) objstore_addresses.append(objstore_address) - start_objstore(scheduler_address, objstore_address) + start_objstore(scheduler_address, objstore_address, local=True) time.sleep(0.2) for _ in range(num_workers_per_objstore): - start_worker(worker_path, scheduler_address, objstore_address, address(IP_ADDRESS, new_worker_port())) + start_worker(worker_path, scheduler_address, objstore_address, address(IP_ADDRESS, new_worker_port()), local=True) time.sleep(0.3) # create drivers if return_drivers: diff --git a/scripts/shell.py b/scripts/shell.py index 2c1bb75c1342..688a8c14affd 100644 --- a/scripts/shell.py +++ b/scripts/shell.py @@ -34,6 +34,3 @@ import IPython IPython.embed() - - if not args.attach: - services.cleanup()