Skip to content

Commit

Permalink
Fixes and cleanups for the multinode setting. (ray-project#143)
Browse files Browse the repository at this point in the history
* Add function for driver to get address info from Redis.

* Use Redis address instead of Redis port.

* Configure Redis to run in unprotected mode.

* Add method for starting Ray processes on non-head node.

* Pass in correct node ip address to start_plasma_manager.

* Script for starting Ray processes.

* Handle the case where an object already exists in the store. Maybe this should also compare the object hashes.

* Have driver get info from Redis when start_ray_local=False.

* Fix.

* Script for killing ray processes.

* Catch some errors when the main_loop in a worker throws an exception.

* Allow redirecting stdout and stderr to /dev/null.

* Wrap start_ray.py in a shell script.

* More helpful error messages.

* Fixes.

* Wait for redis server to start up before configuring it.

* Allow seeding of deterministic object ID generation.

* Small change.
  • Loading branch information
robertnishihara authored and pcmoritz committed Dec 22, 2016
1 parent c9c1b3e commit 6cd02d7
Show file tree
Hide file tree
Showing 14 changed files with 467 additions and 126 deletions.
167 changes: 139 additions & 28 deletions lib/python/ray/services.py

Large diffs are not rendered by default.

160 changes: 130 additions & 30 deletions lib/python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,16 @@ def put_object(self, objectid, value):
# Serialize and put the object in the object store.
schema, size, serialized = numbuf_serialize(value)
size = size + 4096 * 4 + 8 # The last 8 bytes are for the metadata offset. This is temporary.
buff = self.plasma_client.create(objectid.id(), size, bytearray(schema))
try:
buff = self.plasma_client.create(objectid.id(), size, bytearray(schema))
except RuntimeError as e:
if e.args != ("an object with this ID could not be created",):
raise
# The object already exists in the object store, so there is no need to
# add it again. TODO(rkn): We need to compare the hashes and make sure
# that the objects are in fact the same.
print("This object already exists in the object store.")
return
data = np.frombuffer(buff.buffer, dtype="byte")[8:]
metadata_offset = numbuf.write_to_buffer(serialized, memoryview(data))
np.frombuffer(buff.buffer, dtype="int64", count=1)[0] = metadata_offset
Expand Down Expand Up @@ -617,52 +626,125 @@ def objectid_custom_deserializer(serialized_obj):
register_class(RayGetError)
register_class(RayGetArgumentError)

def init(start_ray_local=False, num_workers=None, num_local_schedulers=1, driver_mode=SCRIPT_MODE):
def get_address_info_from_redis_helper(redis_address, node_ip_address):
redis_host, redis_port = redis_address.split(":")
# For this command to work, some other client (on the same machine as Redis)
# must have run "CONFIG SET protected-mode no".
redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port))
# The client table prefix must be kept in sync with the file
# "src/common/redis_module/ray_redis_module.c" where it is defined.
REDIS_CLIENT_TABLE_PREFIX = "CL:"
client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX))
# Filter to clients on the same node and do some basic checking.
plasma_managers = []
local_schedulers = []
for key in client_keys:
info = redis_client.hgetall(key)
assert b"ray_client_id" in info
assert b"node_ip_address" in info
assert b"client_type" in info
if info[b"node_ip_address"].decode("ascii") == node_ip_address:
if info[b"client_type"].decode("ascii") == "plasma_manager":
plasma_managers.append(info)
elif info[b"client_type"].decode("ascii") == "photon":
local_schedulers.append(info)
# Make sure that we got at one plasma manager and local scheduler.
assert len(plasma_managers) == 1
assert len(local_schedulers) == 1
client_info = {"node_ip_address": node_ip_address,
"redis_address": redis_address,
"store_socket_name": plasma_managers[0][b"store_socket_name"].decode("ascii"),
"manager_socket_name": plasma_managers[0][b"manager_socket_name"].decode("ascii"),
"local_scheduler_socket_name": local_schedulers[0][b"local_scheduler_socket_name"].decode("ascii")}
return client_info

def get_address_info_from_redis(redis_address, node_ip_address, num_retries=10):
counter = 0
while True:
try:
return get_address_info_from_redis_helper(redis_address, node_ip_address)
except Exception as e:
if counter == num_retries:
raise
# Some of the information may not be in Redis yet, so wait a little bit.
print("Some processes that the driver needs to connect to have not registered with Redis, so retrying.")
time.sleep(1)
counter += 1

def init(node_ip_address="127.0.0.1", redis_address=None, start_ray_local=False, object_id_seed=None, num_workers=None, num_local_schedulers=None, driver_mode=SCRIPT_MODE):
"""Either connect to an existing Ray cluster or start one and connect to it.
This method handles two cases. Either a Ray cluster already exists and we
just attach this driver to it, or we start all of the processes associated
with a Ray cluster and attach to the newly started cluster.
Args:
start_ray_local (Optional[bool]): If True then this will start a scheduler
an object store, and some workers. If False, this will attach to an
existing Ray cluster.
num_workers (Optional[int]): The number of workers to start if
node_ip_address (str): The IP address of the node that we are on.
redis_address (str): The address of the Redis server to connect to. This
should only be provided if start_ray_local is False.
start_ray_local (bool): If True then this will start Redis, a global
scheduler, a local scheduler, a plasma store, a plasma manager, and some
workers. It will also kill these processes when Python exits. If False,
this will attach to an existing Ray cluster.
object_id_seed (int): Used to seed the deterministic generation of object
IDs. The same value can be used across multiple runs of the same job in
order to generate the object IDs in a consistent manner. However, the same
ID should not be used for different jobs.
num_workers (int): The number of workers to start. This is only provided if
start_ray_local is True.
num_local_schedulers (Optional[int]): The number of local schedulers to
start if start_ray_local is True.
driver_mode (Optional[bool]): The mode in which to start the driver. This
should be one of SCRIPT_MODE, PYTHON_MODE, and SILENT_MODE.
num_local_schedulers (int): The number of local schedulers to start. This is
only provided if start_ray_local is True.
driver_mode (bool): The mode in which to start the driver. This should be
one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
Returns:
The address of the Redis server.
Address information about the started processes.
Raises:
Exception: An exception is raised if an inappropriate combination of
arguments is passed in.
"""
check_main_thread()
if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]:
raise Exception("Driver_mode must be in [ray.SCRIPT_MODE, ray.PYTHON_MODE, ray.SILENT_MODE].")
if driver_mode == PYTHON_MODE:
# If starting Ray in PYTHON_MODE, don't start any other processes.
address_info = {}
info = {}
elif start_ray_local:
# In this case, we launch a scheduler, a new object store, and some workers,
# and we connect to them.
if driver_mode not in [SCRIPT_MODE, PYTHON_MODE, SILENT_MODE]:
raise Exception("If start_ray_local=True, then driver_mode must be in [ray.SCRIPT_MODE, ray.PYTHON_MODE, ray.SILENT_MODE].")
if redis_address is not None:
raise Exception("If start_ray_local=True, then redis_address cannot be provided because ray.init will start a new Redis server.")
# Use the address 127.0.0.1 in local mode.
node_ip_address = "127.0.0.1" if node_ip_address is None else node_ip_address
# Use 1 worker if num_workers is not provided.
num_workers = 1 if num_workers is None else num_workers
# Use 1 local scheduler if num_local_schedulers is not provided.
num_local_schedulers = 1 if num_local_schedulers is None else num_local_schedulers
# Start the scheduler, object store, and some workers. These will be killed
# by the call to cleanup(), which happens when the Python script exits.
address_info = services.start_ray_local(num_workers=num_workers, num_local_schedulers=num_local_schedulers)
address_info = services.start_ray_local(node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers)
info = {"node_ip_address": node_ip_address,
"redis_address": address_info["redis_address"],
"store_socket_name": address_info["object_store_names"][0],
"manager_socket_name": address_info["object_store_manager_names"][0],
"local_scheduler_socket_name": address_info["local_scheduler_names"][0]}
else:
raise Exception("This mode is currently not enabled.")
if redis_address is None:
raise Exception("If start_ray_local=False, then redis_address must be provided.")
if node_ip_address is None:
raise Exception("If start_ray_local=False, then node_ip_address must be provided.")
if num_workers is not None:
raise Exception("If start_ray_local=False, then num_workers must not be provided.")
if num_local_schedulers is not None:
raise Exception("If start_ray_local=False, then num_local_schedulers must not be provided.")
# Get the address info of the processes to connect to from Redis.
info = get_address_info_from_redis(redis_address, node_ip_address)
# Connect this driver to Redis, the object store, and the local scheduler. The
# corresponing call to disconnect will happen in the call to cleanup() when
# the Python script exits.
connect(address_info, driver_mode, worker=global_worker)
return address_info
connect(info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker)
return info

def cleanup(worker=global_worker):
"""Disconnect the driver, and terminate any processes started in init.
Expand Down Expand Up @@ -828,17 +910,23 @@ def import_thread(worker):
worker.redis_client.hincrby(worker_info_key, "export_counter", 1)
worker.worker_import_counter += 1

def connect(address_info, mode=WORKER_MODE, worker=global_worker):
"""Connect this worker to the scheduler and an object store.
def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker):
"""Connect this worker to the local scheduler, to Plasma, and to Redis.
Args:
address_info (dict): This contains the entries node_ip_address,
redis_address, object_store_name, object_store_manager_name, and
local_scheduler_name.
info (dict): A dictionary with address of the Redis server and the sockets
of the plasma store, plasma manager, and local scheduler.
mode: The mode of the worker. One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE,
and SILENT_MODE.
"""
check_main_thread()
# Do some basic checking to make sure we didn't call ray.init twice.
error_message = "Perhaps you called ray.init twice by accident?"
assert not worker.connected, error_message
assert worker.cached_functions_to_run is not None, error_message
assert worker.cached_remote_functions is not None, error_message
assert reusables._cached_reusables is not None, error_message
# Initialize some fields.
worker.worker_id = random_string()
worker.connected = True
worker.set_mode(mode)
Expand All @@ -847,24 +935,36 @@ def connect(address_info, mode=WORKER_MODE, worker=global_worker):
if mode == PYTHON_MODE:
return
# Create a Redis client.
worker.redis_client = redis.StrictRedis(host=address_info["node_ip_address"], port=address_info["redis_port"])
worker.redis_client.config_set("notify-keyspace-events", "AKE")
redis_host, redis_port = info["redis_address"].split(":")
worker.redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port))
worker.lock = threading.Lock()
# Create an object store client.
worker.plasma_client = plasma.PlasmaClient(address_info["object_store_names"][0], address_info["object_store_manager_names"][0])
worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"])
# Create the local scheduler client.
worker.photon_client = photon.PhotonClient(address_info["local_scheduler_names"][0])
worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"])
# Register the worker with Redis.
if mode in [SCRIPT_MODE, SILENT_MODE]:
worker.redis_client.rpush("Drivers", worker.worker_id)
elif mode == WORKER_MODE:
worker.redis_client.rpush("Workers", worker.worker_id)
else:
raise Exception("This code should be unreachable.")
# If this is a driver, set the current task ID to a specific fixed value and
# set the task index to 0.
# If this is a driver, set the current task ID and set the task index to 0.
if mode in [SCRIPT_MODE, SILENT_MODE]:
worker.current_task_id = photon.ObjectID("".join(chr(i) for i in range(20)))
# If the user provided an object_id_seed, then set the current task ID
# deterministically based on that seed (without altering the state of the
# user's random number generator). Otherwise, set the current task ID
# randomly to avoid object ID collisions.
numpy_state = np.random.get_state()
if object_id_seed is not None:
np.random.seed(object_id_seed)
else:
# Try to use true randomness.
np.random.seed(None)
worker.current_task_id = photon.ObjectID(np.random.bytes(20))
# Reset the state of the numpy random number generator.
np.random.set_state(numpy_state)
# Set other fields needed for computing task IDs.
worker.task_index = 0
worker.put_index = 0
# If this is a worker, then start a thread to import exports from the driver.
Expand Down
53 changes: 43 additions & 10 deletions lib/python/ray/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,59 @@
from __future__ import division
from __future__ import print_function

import sys
import argparse
import numpy as np
import redis
import traceback

import ray

parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.")
parser.add_argument("--node-ip-address", required=True, type=str, help="the ip address of the worker's node")
parser.add_argument("--redis-port", required=True, type=int, help="the port to use for Redis")
parser.add_argument("--redis-address", required=True, type=str, help="the address to use for Redis")
parser.add_argument("--object-store-name", required=True, type=str, help="the object store's name")
parser.add_argument("--object-store-manager-name", required=True, type=str, help="the object store manager's name")
parser.add_argument("--local-scheduler-name", required=True, type=str, help="the local scheduler's name")

def random_string():
return np.random.bytes(20)

if __name__ == "__main__":
args = parser.parse_args()
address_info = {"node_ip_address": args.node_ip_address,
"redis_port": args.redis_port,
"object_store_names": [args.object_store_name],
"object_store_manager_names": [args.object_store_manager_name],
"local_scheduler_names": [args.local_scheduler_name]}
ray.worker.connect(address_info, ray.WORKER_MODE)

ray.worker.main_loop()
info = {"redis_address": args.redis_address,
"store_socket_name": args.object_store_name,
"manager_socket_name": args.object_store_manager_name,
"local_scheduler_socket_name": args.local_scheduler_name}
ray.worker.connect(info, ray.WORKER_MODE)

error_explanation = """
This error is unexpected and should not have happened. Somehow a worker crashed
in an unanticipated way causing the main_loop to throw an exception, which is
being caught in "lib/python/ray/workers/default_worker.py".
"""

while True:
try:
# This call to main_loop should never return if things are working. Most
# exceptions that are thrown (e.g., inside the execution of a task) should
# be caught and handled inside of the call to main_loop. If an exception
# is thrown here, then that means that there is some error that we didn't
# anticipate.
ray.worker.main_loop()
except Exception as e:
traceback_str = traceback.format_exc() + error_explanation
error_key = "WorkerError:{}".format(random_string())
redis_host, redis_port = args.redis_address.split(":")
# For this command to work, some other client (on the same machine as
# Redis) must have run "CONFIG SET protected-mode no".
redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port))
redis_client.hmset(error_key, {"message": traceback_str,
"note": "This error is unexpected and should not have happened."})
redis_client.rpush("ErrorKeys", error_key)
# TODO(rkn): Note that if the worker was in the middle of executing a
# task, the any worker or driver that is blocking in a get call and
# waiting for the output of that task will hang. We need to address this.

# After putting the error message in Redis, this worker will attempt to
# reenter the main loop. TODO(rkn): We should probably reset it's state and
# call connect again.
40 changes: 40 additions & 0 deletions scripts/start_ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse

import ray.services as services

parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.")
parser.add_argument("--node-ip-address", required=True, type=str, help="the ip address of the worker's node")
parser.add_argument("--redis-address", required=False, type=str, help="the address to use for Redis")
parser.add_argument("--num-workers", default=10, required=False, type=int, help="the number of workers to start on this node")
parser.add_argument("--head", action="store_true", help="provide this argument for the head node")

if __name__ == "__main__":
args = parser.parse_args()

# Note that we redirect stdout and stderr to /dev/null because otherwise
# attempts to print may cause exceptions if a process is started inside of an
# SSH connection and the SSH connection dies. TODO(rkn): This is a temporary
# fix. We should actually redirect stdout and stderr to Redis in some way.

if args.head:
# Start Ray on the head node.
if args.redis_address is not None:
raise Exception("If --head is passed in, a Redis server will be started, so a Redis address should not be provided.")
address_info = services.start_ray_local(node_ip_address=args.node_ip_address,
num_workers=args.num_workers,
cleanup=False,
redirect_output=True)
else:
# Start Ray on a non-head node.
if args.redis_address is None:
raise Exception("If --head is not passed in, --redis-address must be provided.")
address_info = services.start_ray_node(node_ip_address=args.node_ip_address,
redis_address=args.redis_address,
num_workers=args.num_workers,
cleanup=False,
redirect_output=True)
print(address_info)
5 changes: 5 additions & 0 deletions scripts/start_ray.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash

ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)

python "$ROOT_DIR/start_ray.py" "$@"
3 changes: 3 additions & 0 deletions scripts/stop_ray.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

killall redis-server global_scheduler plasma_store plasma_manager photon_scheduler
Loading

0 comments on commit 6cd02d7

Please sign in to comment.