Skip to content

Commit

Permalink
[TEST] Additional data processing nightly test (ray-project#16078)
Browse files Browse the repository at this point in the history
* in progress

* in progress

* almost done

* Lint

* almost done

* All tests are available now

* Change the test a little more stressful

* Modify paramter to make tests a little more stressful
  • Loading branch information
rkooo567 authored Jun 10, 2021
1 parent d390344 commit c8a5d7b
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 82 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,7 @@ project-id
# gitpod cache related
.pip-cache/
.bazel-cache/

# release test related
.anyscale.yaml
test_state.json
19 changes: 10 additions & 9 deletions release/data_processing_tests/dask-on-ray-test.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/usr/bin/env bash
#!/bin/bash

# NOTE: Only working for Python 3.7 on MacOS.
# NOTE: Please modify the wheel URL.
DASK_VERSION=("2021.4.0" "2021.3.1" "2021.2.0" "2021.1.1" "2020.12.0")
DASK_VERSION=("2021.5.0" "2021.4.1" "2021.4.0" "2021.3.1" "2021.2.0" "2021.1.1" "2020.12.0")

unset RAY_ADDRESS

Expand All @@ -12,21 +13,21 @@ echo "Please run vi dask-on-ray-test.sh and modify the ray wheel properly."
echo "Also make sure that you are in the right branch on your repo."
echo "For example, if you are using releases/1.3.0 wheel, you should checkout to that repo."
echo "Example: git checkout -b releases/1.3.0 upstream/releases/1.3.0"
exit 1
# pip uninstall -y ray
# pip install -U "ray[full] @ https://s3-us-west-2.amazonaws.com/ray-wheels/releases/1.3.0/cb3661e547662f309a0cc55c5495b3adb779a309/ray-1.3.0-cp37-cp37m-macosx_10_13_intel.whl"
#exit 1
pip uninstall -y ray
pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/releases/1.4.0/0e95428a5975b774d266893102c39c8e137da5d8/ray-1.4.0-cp38-cp38-manylinux2014_x86_64.whl

for dask_version in "${DASK_VERSION[@]}"
do # The quotes are necessary here
echo "=================================================="
echo "Downloading Dask of version '${dask_version}'"
pip uninstall -y dask
pip install -U dask=="$dask_version"
pip install -U "dask[complete]==${dask_version}"
printf "==================================================\n\n\n"
echo "=================================================="
echo "Running tests against dask version ${dask_version}"
pytest -v ../../python/ray/tests/test_dask_scheduler.py
pytest -v ../../python/ray/tests/test_dask_callback.py
pytest -v ../../python/ray/tests/test_dask_optimization.py
python -m pytest -v ../../python/ray/tests/test_dask_scheduler.py
python -m pytest -v ../../python/ray/tests/test_dask_callback.py
python -m pytest -v ../../python/ray/tests/test_dask_optimization.py
printf "==================================================\n\n\n"
done
11 changes: 5 additions & 6 deletions release/data_processing_tests/dask_on_ray.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ docker:
container_name: ""
# If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
# if no cached version is present.
pull_before_run: True
pull_before_run: False
run_options: [] # Extra options to pass into "docker run"

# Example of running a GPU head with CPU workers
Expand All @@ -43,7 +43,7 @@ provider:
# Availability zone(s), comma-separated, that nodes may be launched in.
# Nodes are currently spread between zones by a round-robin approach,
# however this implementation detail should not be relied upon.
availability_zone: us-west-2a,us-west-2b
availability_zone: us-west-2a
# Whether to allow node reuse. If set to False, nodes will be terminated
# instead of stopped.
cache_stopped_nodes: True # If not present, the default is True.
Expand All @@ -62,13 +62,12 @@ available_node_types:
InstanceType: r5n.16xlarge
# For AWS instances, autoscaler will automatically add the available
# CPUs/GPUs/accelerator_type ({"CPU": 4} for m4.xlarge) in "resources".
# resources: {"CPU": 4}
resources: {"CPU": 0}
worker_node_t:
node_config:
InstanceType: r5.4xlarge
# Autoscaler will auto fill the CPU resources below.
resources: {"CPU": 96}
resources: {"CPU": 16}
min_workers: 250
max_workers: 250

Expand Down Expand Up @@ -140,9 +139,9 @@ setup_commands:
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ray start --num-cpus=0 --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --system-config='{"asio_event_loop_stats_collection_enabled":true,"scheduler_loadbalance_spillback":true}'
- ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --system-config='{"asio_event_loop_stats_collection_enabled":true,"locality_aware_leasing_enabled":true,"scheduler_hybrid_scheduling":false,"scheduler_loadbalance_spillback":true,"overcommit_plasma_memory":false}'

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ray start --address=$RAY_HEAD_IP:6379 --num-cpus=96 --object-manager-port=8076
- ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@
"""

MINUTES_IN_A_MONTH = 43800
# MINUTES_IN_A_MONTH = 43800
MINUTES_IN_A_MONTH = 10950
NUM_MINS_PER_OUTPUT_FILE = 30
SAMPLING_RATE = 2000000
SECONDS_IN_A_MIN = 60
SAMPLING_RATE = 200000
SECONDS_IN_A_MIN = 20
INPUT_SHAPE = (3, SAMPLING_RATE * SECONDS_IN_A_MIN)
PEAK_MEMORY_CONSUMPTION_IN_GB = 60
PEAK_MEMORY_CONSUMPTION_IN_GB = 20

logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
base_image: "anyscale/ray-ml:pinned-nightly-py37"
env_vars: {"RAY_scheduler_hybrid_threshold": "0"}
debian_packages: []

python:
pip_packages: ["dask[complete]", tqdm, scipy, xarray, zarr, boto, s3fs, pyarrow]
conda_packages: []

post_build_cmds:
# - pip install fastparquet
- pip3 uninstall -y ray
- pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }}
- pip3 install -U ray[default]
- echo {{env["DATESTAMP"]}}
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2

max_workers: 20

# TODO(ekl/sang) switch to i2.8xl and mount the NVMe disks
head_node_type:
name: head_node
instance_type: i3.8xlarge
resources: {"object_store_memory": 53687091200}

worker_node_types:
- name: worker_node
instance_type: i3.8xlarge
min_workers: 20
max_workers: 20
min_workers: 4
max_workers: 4
use_spot: false
resources: {"object_store_memory": 53687091200}
81 changes: 45 additions & 36 deletions release/nightly_tests/dask_on_ray/large_scale_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import xarray
from ray.util.dask import ray_dask_get
import math
import json
"""
We simulate a real-life usecase where we process a time-series
data of 1 month, using Dask/Xarray on a Ray cluster.
Expand All @@ -27,22 +28,22 @@
Perform decimation to reduce data size.
(3) Segment the Xarray from (2) into 30-minute Xarrays;
at this point, we have 43800 / 30 = 1460 Xarrays.
at this point, we have 4380 / 30 = 146 Xarrays.
(4) Trigger save to disk for each of the 30-minute Xarrays.
This triggers Dask computations; there will be 1460 graphs.
This triggers Dask computations; there will be 146 graphs.
Since 1460 graphs is too much to process at once,
we determine the batch_size based on script parameters.
(e.g. if batch_size is 100, we'll have 15 batches).
"""

MINUTES_IN_A_MONTH = 43800
MINUTES_IN_A_MONTH = 500
NUM_MINS_PER_OUTPUT_FILE = 30
SAMPLING_RATE = 2000000
SAMPLING_RATE = 200000
SECONDS_IN_A_MIN = 60
INPUT_SHAPE = (3, SAMPLING_RATE * SECONDS_IN_A_MIN)
PEAK_MEMORY_CONSUMPTION_IN_GB = 60
PEAK_MEMORY_CONSUMPTION_IN_GB = 6

logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
Expand Down Expand Up @@ -141,7 +142,7 @@ def lazy_load_xarray_one_month(test_spec: TestSpec) -> xarray.Dataset:
def load_array_one_minute(test_spec: TestSpec) -> np.ndarray:
"""
Load an array representing 1 minute of data. Each load consumes
~1.44GB of memory (3 * 2000000 * 60 * 4 (bytes in a float)) = ~1.44
~0.144GB of memory (3 * 200000 * 60 * 4 (bytes in a float)) = ~0.14GB
In real life, this is loaded from cloud storage or disk.
"""
Expand Down Expand Up @@ -420,36 +421,44 @@ def parse_script_args():
def main():
args, unknown = parse_script_args()
logging.info("Received arguments: {}".format(args))

# Create test spec
test_spec = TestSpec(
num_workers=args.num_workers,
worker_obj_store_size_in_gb=args.worker_obj_store_size_in_gb,
error_rate=args.error_rate,
trigger_object_spill=args.trigger_object_spill,
)
logging.info("Created test spec: {}".format(test_spec))

# Create the data save path if it doesn't exist.
data_save_path = args.data_save_path
if not os.path.exists(data_save_path):
os.makedirs(data_save_path, mode=0o777, exist_ok=True)
os.chmod(data_save_path, mode=0o777)

# Lazily construct Xarrays
xarray_filename_pairs = lazy_create_xarray_filename_pairs(test_spec)

# Connect to the Ray cluster
ray.init(address="auto")

# Save all the Xarrays to disk; this will trigger Dask computations on Ray.
logging.info("Saving {} xarrays..".format(len(xarray_filename_pairs)))
SaveRoutines.save_all_xarrays(
xarray_filename_pairs=xarray_filename_pairs,
dirpath=data_save_path,
batch_size=test_spec.batch_size,
ray_scheduler=ray_dask_get,
)
success = 1
try:
# Create test spec
test_spec = TestSpec(
num_workers=args.num_workers,
worker_obj_store_size_in_gb=args.worker_obj_store_size_in_gb,
error_rate=args.error_rate,
trigger_object_spill=args.trigger_object_spill,
)
logging.info("Created test spec: {}".format(test_spec))

# Create the data save path if it doesn't exist.
data_save_path = args.data_save_path
if not os.path.exists(data_save_path):
os.makedirs(data_save_path, mode=0o777, exist_ok=True)
os.chmod(data_save_path, mode=0o777)

# Lazily construct Xarrays
xarray_filename_pairs = lazy_create_xarray_filename_pairs(test_spec)

# Connect to the Ray cluster
ray.init(address="auto")

# Save all the Xarrays to disk; this will trigger
# Dask computations on Ray.
logging.info("Saving {} xarrays..".format(len(xarray_filename_pairs)))
SaveRoutines.save_all_xarrays(
xarray_filename_pairs=xarray_filename_pairs,
dirpath=data_save_path,
batch_size=test_spec.batch_size,
ray_scheduler=ray_dask_get,
)
print(ray.internal.internal_api.memory_summary(stats_only=True))
except Exception as e:
logging.exception(e)
success = 0
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(json.dumps({"success": success}))


if __name__ == "__main__":
Expand Down
54 changes: 36 additions & 18 deletions release/nightly_tests/nightly_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,39 @@
compute_template: shuffle/shuffle_compute_smoke.yaml # Does not exist yet

# Test multi nodes 100GB shuffle with a large number of partitions.
# TODO(sang): Not working due to a bug https://github.com/ray-project/ray/issues/16025.
# - name: shuffle_100gb_large_partition
# owner:
# mail: "[email protected]"
# slack: "@proj-data-processing"

# cluster:
# app_config: shuffle/shuffle_app_config.yaml
# compute_template: shuffle/shuffle_compute_multi.yaml

# run:
# timeout: 3000
# prepare: python wait_cluster.py 4 600
# script: python shuffle/shuffle_test.py --num-partitions=1000 --partition-size=100e6

# smoke_test:
# cluster:
# compute_template: shuffle/shuffle_compute_smoke.yaml # Does not exist yet
- name: shuffle_1tb_large_partition
owner:
mail: "[email protected]"
slack: "@proj-data-processing"

cluster:
app_config: shuffle/shuffle_app_config.yaml
compute_template: shuffle/shuffle_compute_large_scale.yaml

run:
timeout: 3000
prepare: python wait_cluster.py 20 600
script: python shuffle/shuffle_test.py --num-partitions=1000 --partition-size=1e9

smoke_test:
cluster:
compute_template: shuffle/shuffle_compute_smoke.yaml # Does not exist yet

# Test large scale dask on ray test without spilling.
- name: dask_on_ray_large_scale_test_no_spilling
owner:
mail: "[email protected]"
slack: "@proj-data-processing"

cluster:
app_config: dask_on_ray/large_scale_dask_on_ray_app_config.yaml
compute_template: dask_on_ray/large_scale_dask_on_ray_compute_template.yaml

run:
timeout: 7200
prepare: python wait_cluster.py 5 600
script: python dask_on_ray/large_scale_test.py --num_workers 16 --worker_obj_store_size_in_gb 20 --error_rate 0 --data_save_path /tmp/ray

smoke_test:
cluster:
compute_template: shuffle/shuffle_compute_smoke.yaml # Does not exist yet
15 changes: 15 additions & 0 deletions release/nightly_tests/shuffle/shuffle_compute_large_scale.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2

head_node_type:
name: head_node
instance_type: i3.4xlarge
resources: {"object_store_memory": 21474836480}

worker_node_types:
- name: worker_node
instance_type: i3.4xlarge
min_workers: 19
max_workers: 19
use_spot: false
resources: {"object_store_memory": 21474836480}
3 changes: 2 additions & 1 deletion release/nightly_tests/shuffle/shuffle_compute_multi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ region: us-west-2

max_workers: 3

# TODO(ekl/sang) switch to i2.8xl and mount the NVMe disks
head_node_type:
name: head_node
instance_type: i3.4xlarge
resources: {"object_store_memory": 21474836480}

worker_node_types:
- name: worker_node
instance_type: i3.4xlarge
min_workers: 3
max_workers: 3
use_spot: false
resources: {"object_store_memory": 21474836480}
6 changes: 3 additions & 3 deletions release/nightly_tests/shuffle/shuffle_compute_single.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ region: us-west-2

max_workers: 0

# TODO(ekl/sang) switch to i2.8xl and mount the NVMe disks
head_node_type:
name: head_node2
name: head_node
instance_type: i3.4xlarge
resources: {"object_store_memory": 21474836480}

worker_node_types:
- name: worker_node2
- name: worker_node
instance_type: i3.4xlarge
min_workers: 0
max_workers: 0
Expand Down
3 changes: 3 additions & 0 deletions release/nightly_tests/shuffle/shuffle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

start = time.time()
success = 1

commands = [
"python", "-m", "ray.experimental.shuffle", "--ray-address={}".format(
os.environ["RAY_ADDRESS"]),
Expand All @@ -35,6 +36,8 @@
success = 0
delta = time.time() - start

# Report the running time as 0 if it fails so that
# it is easy to be discovered from the graph.
if not success:
delta = 0
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
Expand Down

0 comments on commit c8a5d7b

Please sign in to comment.