Skip to content

Commit

Permalink
[autoscaler/tune] Optional YAML Fields + Fix Pretty Printing for Tune (
Browse files Browse the repository at this point in the history
  • Loading branch information
richardliaw authored Mar 5, 2018
1 parent 061e435 commit 162d063
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 70 deletions.
18 changes: 11 additions & 7 deletions doc/source/autoscaling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Quick start
First, install boto (``pip install boto3``) and configure your AWS credentials in ``~/.aws/credentials``,
as described in `the boto docs <http://boto3.readthedocs.io/en/latest/guide/configuration.html>`__.

Then you're ready to go. The provided `ray/python/ray/autoscaler/aws/example.yaml <https://github.com/ray-project/ray/tree/master/python/ray/autoscaler/aws/example.yaml>`__ cluster config file will create a small cluster with a m4.large head node (on-demand), and two m4.large `spot workers <https://aws.amazon.com/ec2/spot/>`__, configured to autoscale up to four m4.large workers.
Then you're ready to go. The provided `ray/python/ray/autoscaler/aws/example-full.yaml <https://github.com/ray-project/ray/tree/master/python/ray/autoscaler/aws/example-full.yaml>`__ cluster config file will create a small cluster with a m5.large head node (on-demand) configured to autoscale up to two m5.large `spot workers <https://aws.amazon.com/ec2/spot/>`__.

Try it out by running these commands from your personal computer. Once the cluster is started, you can then
SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(redis_address=ray.services.get_node_ip_address() + ":6379")``.
Expand All @@ -18,14 +18,14 @@ SSH into the head node, ``source activate tensorflow_p36``, and then run Ray pro
# Create or update the cluster. When the command finishes, it will print
# out the command that can be used to SSH into the cluster head node.
$ ray create_or_update ray/python/ray/autoscaler/aws/example.yaml
$ ray create_or_update ray/python/ray/autoscaler/aws/example-full.yaml
# Reconfigure autoscaling behavior without interrupting running jobs
$ ray create_or_update ray/python/ray/autoscaler/aws/example.yaml \
$ ray create_or_update ray/python/ray/autoscaler/aws/example-full.yaml \
--max-workers=N --no-restart
# Teardown the cluster
$ ray teardown ray/python/ray/autoscaler/aws/example.yaml
$ ray teardown ray/python/ray/autoscaler/aws/example-full.yaml
To run connect to applications running on the cluster (e.g. Jupyter notebook) using a web browser, you can forward the port to your local machine using SSH:

Expand Down Expand Up @@ -66,10 +66,14 @@ The Ray autoscaler also reports per-node status in the form of instance tags. In
Customizing cluster setup
-------------------------

You are encouraged to copy the example YAML file and modify it to your needs. This may include adding additional setup commands to install libraries or sync local data files. After you have customized the nodes, it is also a good idea to create a new machine image (AMI) and use that in the config file. This reduces worker setup time, improving the efficiency of auto-scaling.
You are encouraged to copy the example YAML file and modify it to your needs. This may include adding additional setup commands to install libraries or sync local data files.

.. note:: After you have customized the nodes, it is also a good idea to create a new machine image (AMI) and use that in the config file. This reduces worker setup time, improving the efficiency of auto-scaling.

The setup commands you use should ideally be *idempotent*, that is, can be run more than once. This allows Ray to update nodes after they have been created. You can usually make commands idempotent with small modifications, e.g. ``git clone foo`` can be rewritten as ``test -e foo || git clone foo`` which checks if the repo is already cloned first.

Most of the example YAML file is optional. Here is a `reference minimal YAML file <https://github.com/ray-project/ray/tree/master/python/ray/autoscaler/aws/example-minimal.yaml>`__, and you can find the defaults for `optional fields in this YAML file <https://github.com/ray-project/ray/tree/master/python/ray/autoscaler/aws/example-full.yaml>`__.

Syncing git branches
--------------------

Expand All @@ -94,7 +98,7 @@ This tells ``ray create_or_update`` to sync the current git branch SHA from your
Common cluster configurations
-----------------------------

The ``example.yaml`` configuration is enough to get started with Ray, but for more compute intensive workloads you will want to change the instance types to e.g. use GPU or larger compute instance by editing the yaml file. Here are a few common configurations:
The ``example-full.yaml`` configuration is enough to get started with Ray, but for more compute intensive workloads you will want to change the instance types to e.g. use GPU or larger compute instance by editing the yaml file. Here are a few common configurations:

**GPU single node**: use Ray on a single large GPU instance.

Expand All @@ -105,7 +109,7 @@ The ``example.yaml`` configuration is enough to get started with Ray, but for mo
InstanceType: p2.8xlarge
**Docker**: Specify docker image. This executes all commands on all nodes in the docker container,
and opens all the necessary ports to support the Ray cluster.
and opens all the necessary ports to support the Ray cluster. This currently does not have GPU support.

.. code-block:: yaml
Expand Down
109 changes: 73 additions & 36 deletions python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,77 +18,85 @@
from ray.ray_constants import AUTOSCALER_MAX_NUM_FAILURES, \
AUTOSCALER_MAX_CONCURRENT_LAUNCHES, AUTOSCALER_UPDATE_INTERVAL_S, \
AUTOSCALER_HEARTBEAT_TIMEOUT_S
from ray.autoscaler.node_provider import get_node_provider
from ray.autoscaler.node_provider import get_node_provider, \
get_default_config
from ray.autoscaler.updater import NodeUpdaterProcess
from ray.autoscaler.docker import dockerize_if_needed
from ray.autoscaler.tags import TAG_RAY_LAUNCH_CONFIG, \
TAG_RAY_RUNTIME_CONFIG, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_TYPE, TAG_NAME
import ray.services as services

REQUIRED, OPTIONAL = True, False

# For (a, b), if a is a dictionary object, then
# no extra fields can be introduced.
CLUSTER_CONFIG_SCHEMA = {
# An unique identifier for the head node and workers of this cluster.
"cluster_name": str,
"cluster_name": (str, REQUIRED),

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
"min_workers": int,
"min_workers": (int, OPTIONAL),

# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
"max_workers": int,
"max_workers": (int, REQUIRED),

# The autoscaler will scale up the cluster to this target fraction of
# resources usage. For example, if a cluster of 8 nodes is 100% busy
# and target_utilization was 0.8, it would resize the cluster to 10.
"target_utilization_fraction": float,
"target_utilization_fraction": (float, OPTIONAL),

# If a node is idle for this many minutes, it will be removed.
"idle_timeout_minutes": int,
"idle_timeout_minutes": (int, OPTIONAL),

# Cloud-provider specific configuration.
"provider": {
"type": str, # e.g. aws
"region": str, # e.g. us-east-1
"availability_zone": str, # e.g. us-east-1a
},
"provider": ({
"type": (str, REQUIRED), # e.g. aws
"region": (str, REQUIRED), # e.g. us-east-1
"availability_zone": (str, REQUIRED), # e.g. us-east-1a
}, REQUIRED),

# How Ray will authenticate with newly launched nodes.
"auth": dict,
"auth": ({
"ssh_user": (str, REQUIRED), # e.g. ubuntu
"ssh_private_key": (str, OPTIONAL),
}, REQUIRED),

# Docker configuration. If this is specified, all setup and start commands
# will be executed in the container.
"docker": {
"image": str, # e.g. tensorflow/tensorflow:1.5.0-py3
"container_name": str
},
"docker": ({
"image": (str, OPTIONAL), # e.g. tensorflow/tensorflow:1.5.0-py3
"container_name": (str, OPTIONAL), # e.g., ray_docker
}, OPTIONAL),

# Provider-specific config for the head node, e.g. instance type.
"head_node": dict,
"head_node": (dict, OPTIONAL),

# Provider-specific config for worker nodes. e.g. instance type.
"worker_nodes": dict,
"worker_nodes": (dict, OPTIONAL),

# Map of remote paths to local paths, e.g. {"/tmp/data": "/my/local/data"}
"file_mounts": dict,
"file_mounts": (dict, OPTIONAL),

# List of common shell commands to run to initialize nodes.
"setup_commands": list,
"setup_commands": (list, OPTIONAL),

# Commands that will be run on the head node after common setup.
"head_setup_commands": list,
"head_setup_commands": (list, OPTIONAL),

# Commands that will be run on worker nodes after common setup.
"worker_setup_commands": list,
"worker_setup_commands": (list, OPTIONAL),

# Command to start ray on the head node. You shouldn't need to modify this.
"head_start_ray_commands": list,
"head_start_ray_commands": (list, OPTIONAL),

# Command to start ray on worker nodes. You shouldn't need to modify this.
"worker_start_ray_commands": list,
"worker_start_ray_commands": (list, OPTIONAL),

# Whether to avoid restarting the cluster during updates. This field is
# controlled by the ray --no-restart flag and cannot be set by the user.
"no_restart": None,
"no_restart": (None, OPTIONAL),
}


Expand Down Expand Up @@ -474,28 +482,57 @@ def typename(v):
return type(v).__name__


def validate_config(config, schema=CLUSTER_CONFIG_SCHEMA):
def check_required(config, schema):
# Check required schema entries
if type(config) is not dict:
raise ValueError("Config is not a dictionary")
for k, v in schema.items():

for k, (v, kreq) in schema.items():
if v is None:
continue # None means we don't validate the field
if k not in config:
if kreq is REQUIRED:
if k not in config:
type_str = typename(v)
raise ValueError(
"Missing required config key `{}` of type {}".format(
k, type_str))
if not isinstance(v, type):
check_required(config[k], v)


def check_extraneous(config, schema):
"""Make sure all items of config are in schema"""
if type(config) is not dict:
raise ValueError("Config is not a dictionary")
for k in config:
if k not in schema:
raise ValueError(
"Missing required config key `{}` of type {}".format(
k, typename(v)))
"Unexpected config key `{}` not in {}".format(
k, list(schema.keys())))
v, kreq = schema[k]
if isinstance(v, type):
if not isinstance(config[k], v):
raise ValueError(
"Config key `{}` has wrong type {}, expected {}".format(
k, type(config[k]).__name__, v.__name__))
else:
validate_config(config[k], schema[k])
for k in config.keys():
if k not in schema:
raise ValueError(
"Unexpected config key `{}` not in {}".format(
k, schema.keys()))
check_extraneous(config[k], v)


def validate_config(config, schema=CLUSTER_CONFIG_SCHEMA):
"""Required Dicts indicate that no extra fields can be introduced."""
if type(config) is not dict:
raise ValueError("Config is not a dictionary")

check_required(config, schema)
check_extraneous(config, schema)


def fillout_defaults(config):
defaults = get_default_config(config["provider"])
defaults.update(config)
dockerize_if_needed(defaults)
return defaults


def with_head_node_ip(cmds):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ cluster_name: default

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 1
min_workers: 0

# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
Expand Down Expand Up @@ -85,7 +85,10 @@ setup_commands:
# Note: if you're developing Ray, you probably want to create an AMI that
# has your Ray repo pre-cloned. Then, you can replace the pip installs
# below with a git checkout <your_sha> (and possibly a recompile).
- source activate tensorflow_p36 && most_recent() { echo pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/$(aws s3 ls s3://ray-wheels --recursive | grep $1 | sort -r | head -n 1 | awk '{print $4}'); } && $( most_recent "cp36-cp36m-manylinux1" ) || $( most_recent "cp35-cp35m-manylinux1" )
- echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc
# - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.3.1-cp27-cp27mu-manylinux1_x86_64.whl
# - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.3.1-cp35-cp35m-manylinux1_x86_64.whl
- pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.3.1-cp36-cp36m-manylinux1_x86_64.whl
# Consider uncommenting these if you also want to run apt-get commands during setup
# - sudo pkill -9 apt-get || true
# - sudo pkill -9 dpkg || true
Expand Down
17 changes: 17 additions & 0 deletions python/ray/autoscaler/aws/example-minimal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: minimal

# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers. min_workers default to 0.
max_workers: 1

# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a

# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu

7 changes: 3 additions & 4 deletions python/ray/autoscaler/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
from pipes import quote

from ray.autoscaler.autoscaler import validate_config, hash_runtime_conf, \
hash_launch_conf
from ray.autoscaler.docker import dockerize_if_needed
hash_launch_conf, fillout_defaults
from ray.autoscaler.node_provider import get_node_provider, NODE_PROVIDERS
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_LAUNCH_CONFIG, \
TAG_NAME
Expand All @@ -31,7 +30,7 @@ def create_or_update_cluster(

config = yaml.load(open(config_file).read())
validate_config(config)
dockerize_if_needed(config)
config = fillout_defaults(config)

if override_min_workers is not None:
config["min_workers"] = override_min_workers
Expand All @@ -53,7 +52,7 @@ def teardown_cluster(config_file, yes):

config = yaml.load(open(config_file).read())
validate_config(config)
dockerize_if_needed(config)
config = fillout_defaults(config)

confirm("This will destroy your cluster", yes)

Expand Down
2 changes: 2 additions & 0 deletions python/ray/autoscaler/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def dockerize_if_needed(config):
docker_image = config["docker"].get("image")
cname = config["docker"].get("container_name")
if not docker_image:
if cname:
print("Container name given but no Docker image - continuing...")
return config
else:
assert cname, "Must provide container name!"
Expand Down
30 changes: 30 additions & 0 deletions python/ray/autoscaler/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@
from __future__ import division
from __future__ import print_function

import os
import yaml


def import_aws():
from ray.autoscaler.aws.config import bootstrap_aws
from ray.autoscaler.aws.node_provider import AWSNodeProvider
return bootstrap_aws, AWSNodeProvider


def load_aws_config():
import ray.autoscaler.aws as ray_aws
return os.path.join(os.path.dirname(
ray_aws.__file__), "example-full.yaml")


NODE_PROVIDERS = {
"aws": import_aws,
"gce": None, # TODO: support more node providers
Expand All @@ -18,6 +27,15 @@ def import_aws():
"local_cluster": None,
}

DEFAULT_CONFIGS = {
"aws": load_aws_config,
"gce": None, # TODO: support more node providers
"azure": None,
"kubernetes": None,
"docker": None,
"local_cluster": None,
}


def get_node_provider(provider_config, cluster_name):
importer = NODE_PROVIDERS.get(provider_config["type"])
Expand All @@ -28,6 +46,18 @@ def get_node_provider(provider_config, cluster_name):
return provider_cls(provider_config, cluster_name)


def get_default_config(provider_config):
load_config = DEFAULT_CONFIGS.get(provider_config["type"])
if load_config is None:
raise NotImplementedError(
"Unsupported node provider: {}".format(provider_config["type"]))
path_to_default = load_config()
with open(path_to_default) as f:
defaults = yaml.load(f)

return defaults


class NodeProvider(object):
"""Interface for getting and returning nodes from a Cloud.
Expand Down
12 changes: 12 additions & 0 deletions python/ray/tune/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import numpy as np
import os
import yaml

from ray.tune.result import TrainingResult
from ray.tune.log_sync import get_syncer
Expand Down Expand Up @@ -176,3 +177,14 @@ def default(self, value):
return float(value)
if np.issubdtype(value, int):
return int(value)


def pretty_print(result):
result = result._replace(config=None) # drop config from pretty print
out = {}
for k, v in result._asdict().items():
if v is not None:
out[k] = v

cleaned = json.dumps(out, cls=_CustomEncoder)
return yaml.dump(json.loads(cleaned), default_flow_style=False)
Loading

0 comments on commit 162d063

Please sign in to comment.