Skip to content

Commit

Permalink
Start chain replicated GCS with Ray (ray-project#1538)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored and robertnishihara committed Mar 7, 2018
1 parent 6dbf4f6 commit a9acfab
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 46 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ script:
- python test/recursion_test.py
- python test/monitor_test.py
- python test/cython_test.py
- python test/credis_test.py

# ray dataframe tests
- python -m pytest python/ray/dataframe/test/test_dataframe.py
Expand Down
100 changes: 89 additions & 11 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@
RUN_PLASMA_MANAGER_PROFILER = False
RUN_PLASMA_STORE_PROFILER = False

# Location of the redis server and module.
REDIS_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/common/thirdparty/redis/src/redis-server")
REDIS_MODULE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/common/redis_module/libray_redis_module.so")

# Location of the credis server and modules.
CREDIS_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/redis/src/redis-server")
CREDIS_MASTER_MODULE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/build/src/libmaster.so")
CREDIS_MEMBER_MODULE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/build/src/libmember.so")


# ObjectStoreAddress tuples contain all information necessary to connect to an
# object store. The fields are:
# - name: The socket name for the object store
Expand Down Expand Up @@ -367,6 +387,61 @@ def check_version_info(redis_client):
print(error_message)


def start_credis(node_ip_address,
port=None,
redirect_output=False,
cleanup=True):
"""Start the credis global state store.
Credis is a chain replicated reliable redis store. It consists
of one master process that acts as a controller and a number of
chain members (currently two, the head and the tail).
Args:
node_ip_address: The IP address of the current node. This is only used
for recording the log filenames in Redis.
port (int): If provided, the primary Redis shard will be started on
this port.
redirect_output (bool): True if output should be redirected to a file
and false otherwise.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then all Redis processes started by this method will be killed by
services.cleanup() when the Python process that imported services
exits.
Returns:
The address (ip_address:port) of the credis master process.
"""

components = ["credis_master", "credis_head", "credis_tail"]
modules = [CREDIS_MASTER_MODULE, CREDIS_MEMBER_MODULE,
CREDIS_MEMBER_MODULE]
ports = []

for i, component in enumerate(components):
stdout_file, stderr_file = new_log_files(
component, redirect_output)

new_port, _ = start_redis_instance(
node_ip_address=node_ip_address, port=port,
stdout_file=stdout_file, stderr_file=stderr_file,
cleanup=cleanup,
module=modules[i],
executable=CREDIS_EXECUTABLE)

ports.append(new_port)

[master_port, head_port, tail_port] = ports

# Connect the members to the master

master_client = redis.StrictRedis(host=node_ip_address, port=master_port)
master_client.execute_command("MASTER.ADD", node_ip_address, head_port)
master_client.execute_command("MASTER.ADD", node_ip_address, tail_port)

return address(node_ip_address, master_port)


def start_redis(node_ip_address,
port=None,
redis_shard_ports=None,
Expand Down Expand Up @@ -462,7 +537,9 @@ def start_redis_instance(node_ip_address="127.0.0.1",
num_retries=20,
stdout_file=None,
stderr_file=None,
cleanup=True):
cleanup=True,
executable=REDIS_EXECUTABLE,
module=REDIS_MODULE):
"""Start a single Redis server.
Args:
Expand All @@ -480,6 +557,9 @@ def start_redis_instance(node_ip_address="127.0.0.1",
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by serices.cleanup() when the
Python process that imported services exits.
executable (str): Full path tho the redis-server executable.
module (str): Full path to the redis module that will be loaded in this
redis server.
Returns:
A tuple of the port used by Redis and a handle to the process that was
Expand All @@ -489,14 +569,8 @@ def start_redis_instance(node_ip_address="127.0.0.1",
Raises:
Exception: An exception is raised if Redis could not be started.
"""
redis_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"./core/src/common/thirdparty/redis/src/redis-server")
redis_module = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"./core/src/common/redis_module/libray_redis_module.so")
assert os.path.isfile(redis_filepath)
assert os.path.isfile(redis_module)
assert os.path.isfile(executable)
assert os.path.isfile(module)
counter = 0
if port is not None:
# If a port is specified, then try only once to connect.
Expand All @@ -506,10 +580,10 @@ def start_redis_instance(node_ip_address="127.0.0.1",
while counter < num_retries:
if counter > 0:
print("Redis failed to start, retrying now.")
p = subprocess.Popen([redis_filepath,
p = subprocess.Popen([executable,
"--port", str(port),
"--loglevel", "warning",
"--loadmodule", redis_module],
"--loadmodule", module],
stdout=stdout_file, stderr=stderr_file)
time.sleep(0.1)
# Check if Redis successfully started (or at least if it the executable
Expand Down Expand Up @@ -1066,6 +1140,10 @@ def start_ray_processes(address_info=None,
redirect_output=True,
redirect_worker_output=redirect_output, cleanup=cleanup)
address_info["redis_address"] = redis_address
if "RAY_USE_NEW_GCS" in os.environ:
credis_address = start_credis(
node_ip_address, cleanup=cleanup)
address_info["credis_address"] = credis_address
time.sleep(0.1)

# Start monitoring the processes.
Expand Down
12 changes: 12 additions & 0 deletions python/ray/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import json
import os
import redis
import subprocess
import tempfile
import time

import ray
Expand Down Expand Up @@ -130,3 +132,13 @@ def wait_for_pid_to_exit(pid, timeout=20):
return
time.sleep(0.1)
raise Exception("Timed out while waiting for process to exit.")


def run_and_get_output(command):
with tempfile.NamedTemporaryFile() as tmp:
p = subprocess.Popen(command, stdout=tmp)
if p.wait() != 0:
raise RuntimeError("ray start did not terminate properly")
with open(tmp.name, 'r') as f:
result = f.readlines()
return "\n".join(result)
5 changes: 5 additions & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
"ray/autoscaler/aws/example-full.yaml"
]

if "RAY_USE_NEW_GCS" in os.environ and os.environ["RAY_USE_NEW_GCS"] == "on":
ray_files += ["ray/core/src/credis/build/src/libmember.so",
"ray/core/src/credis/build/src/libmaster.so",
"ray/core/src/credis/redis/src/redis-server"]

# The UI files are mandatory if the INCLUDE_UI environment variable equals 1.
# Otherwise, they are optional.
if "INCLUDE_UI" in os.environ and os.environ["INCLUDE_UI"] == "1":
Expand Down
31 changes: 31 additions & 0 deletions test/credis_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import redis
import unittest

import ray


@unittest.skipIf(
not os.environ.get('RAY_USE_NEW_GCS', False),
"Tests functionality of the new GCS.")
class CredisTest(unittest.TestCase):
def setUp(self):
self.config = ray.init()

def tearDown(self):
ray.worker.cleanup()

def test_credis_started(self):
assert "credis_address" in self.config
address, port = self.config["credis_address"].split(":")
redis_client = redis.StrictRedis(host=address,
port=port)
assert redis_client.ping() is True


if __name__ == "__main__":
unittest.main(verbosity=2)
6 changes: 4 additions & 2 deletions test/monitor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@

import ray

from ray.test.test_utils import run_and_get_output


class MonitorTest(unittest.TestCase):
def _testCleanupOnDriverExit(self, num_redis_shards):
stdout = subprocess.check_output([
stdout = run_and_get_output([
"ray",
"start",
"--head",
"--num-redis-shards",
str(num_redis_shards),
]).decode("ascii")
])
lines = [m.strip() for m in stdout.split("\n")]
init_cmd = [m for m in lines if m.startswith("ray.init")]
self.assertEqual(1, len(init_cmd))
Expand Down
66 changes: 33 additions & 33 deletions test/multi_node_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import tempfile
import time

from ray.test.test_utils import run_and_get_output


def run_string_as_driver(driver_script):
"""Run a driver as a separate process.
Expand All @@ -31,9 +33,7 @@ def run_string_as_driver(driver_script):
class MultiNodeTest(unittest.TestCase):

def setUp(self):
# Start the Ray processes on this machine.
out = subprocess.check_output(
["ray", "start", "--head"]).decode("ascii")
out = run_and_get_output(["ray", "start", "--head"])
# Get the redis address from the output.
redis_substring_prefix = "redis_address=\""
redis_address_location = (out.find(redis_substring_prefix) +
Expand Down Expand Up @@ -203,73 +203,73 @@ def testCallingStartRayHead(self):
# should also test the non-head node code path.

# Test starting Ray with no arguments.
subprocess.check_output(["ray", "start", "--head"]).decode("ascii")
run_and_get_output(["ray", "start", "--head"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with a number of workers specified.
subprocess.check_output(["ray", "start", "--head", "--num-workers",
"20"])
run_and_get_output(["ray", "start", "--head", "--num-workers",
"20"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with a redis port specified.
subprocess.check_output(["ray", "start", "--head",
"--redis-port", "6379"])
run_and_get_output(["ray", "start", "--head",
"--redis-port", "6379"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with redis shard ports specified.
subprocess.check_output(["ray", "start", "--head",
"--redis-shard-ports", "6380,6381,6382"])
run_and_get_output(["ray", "start", "--head",
"--redis-shard-ports", "6380,6381,6382"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with a node IP address specified.
subprocess.check_output(["ray", "start", "--head",
"--node-ip-address", "127.0.0.1"])
run_and_get_output(["ray", "start", "--head",
"--node-ip-address", "127.0.0.1"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with an object manager port specified.
subprocess.check_output(["ray", "start", "--head",
"--object-manager-port", "12345"])
run_and_get_output(["ray", "start", "--head",
"--object-manager-port", "12345"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with the number of CPUs specified.
subprocess.check_output(["ray", "start", "--head",
"--num-cpus", "100"])
run_and_get_output(["ray", "start", "--head",
"--num-cpus", "100"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with the number of GPUs specified.
subprocess.check_output(["ray", "start", "--head",
"--num-gpus", "100"])
run_and_get_output(["ray", "start", "--head",
"--num-gpus", "100"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with the max redis clients specified.
subprocess.check_output(["ray", "start", "--head",
"--redis-max-clients", "100"])
run_and_get_output(["ray", "start", "--head",
"--redis-max-clients", "100"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with all arguments specified.
subprocess.check_output(["ray", "start", "--head",
"--num-workers", "20",
"--redis-port", "6379",
"--redis-shard-ports", "6380,6381,6382",
"--object-manager-port", "12345",
"--num-cpus", "100",
"--num-gpus", "0",
"--redis-max-clients", "100",
"--resources", "{\"Custom\": 1}"])
run_and_get_output(["ray", "start", "--head",
"--num-workers", "20",
"--redis-port", "6379",
"--redis-shard-ports", "6380,6381,6382",
"--object-manager-port", "12345",
"--num-cpus", "100",
"--num-gpus", "0",
"--redis-max-clients", "100",
"--resources", "{\"Custom\": 1}"])
subprocess.Popen(["ray", "stop"]).wait()

# Test starting Ray with invalid arguments.
with self.assertRaises(Exception):
subprocess.check_output(["ray", "start", "--head",
"--redis-address", "127.0.0.1:6379"])
run_and_get_output(["ray", "start", "--head",
"--redis-address", "127.0.0.1:6379"])
subprocess.Popen(["ray", "stop"]).wait()

def testUsingHostnames(self):
# Start the Ray processes on this machine.
subprocess.check_output(
run_and_get_output(
["ray", "start", "--head",
"--node-ip-address=localhost",
"--redis-port=6379"]).decode("ascii")
"--redis-port=6379"])

ray.init(node_ip_address="localhost", redis_address="localhost:6379")

Expand Down
Loading

0 comments on commit a9acfab

Please sign in to comment.