Skip to content

Commit

Permalink
[train] enable new persistence mode for core and serve tests (ray-pro…
Browse files Browse the repository at this point in the history
…ject#38938)

Signed-off-by: Matthew Deng <[email protected]>
  • Loading branch information
matthewdeng authored Aug 26, 2023
1 parent cd3d7b6 commit 4dac931
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 27 deletions.
3 changes: 0 additions & 3 deletions .buildkite/pipeline.build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
--test_env=DOCKER_CERT_PATH=/certs/client
--test_env=DOCKER_TLS_CERTDIR=/certs
--test_env=RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING=0
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
$(cat test_shard.txt)

- label: ":serverless: Serve Tests (streaming and routing FFs off)"
Expand Down Expand Up @@ -130,7 +129,6 @@
--test_env=DOCKER_TLS_CERTDIR=/certs
--test_env=RAY_SERVE_ENABLE_NEW_ROUTING=0
--test_env=RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING=0
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
$(cat test_shard.txt)

- label: ":python: Minimal install Python {{matrix}}"
Expand Down Expand Up @@ -213,7 +211,6 @@
--test_env=CONDA_SHLVL
--test_env=CONDA_PREFIX
--test_env=CONDA_DEFAULT_ENV
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
python/ray/tests/...

- label: ":book: Doctest (CPU)"
Expand Down
1 change: 0 additions & 1 deletion .buildkite/pipeline.build_py37.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,4 @@
--test_env=DOCKER_TLS_VERIFY=1
--test_env=DOCKER_CERT_PATH=/certs/client
--test_env=DOCKER_TLS_CERTDIR=/certs
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
$(cat test_shard.txt)
1 change: 0 additions & 1 deletion .buildkite/pipeline.build_redis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
- DL=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- ./ci/ci.sh test_large --test_env=TEST_EXTERNAL_REDIS=1
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0

- label: ":redis: (External Redis) (Medium A-J)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
Expand Down
1 change: 0 additions & 1 deletion .buildkite/pipeline.gpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
- ./ci/env/install-dependencies.sh
- pip install -Ur ./python/requirements/ml/dl-gpu-requirements.txt
- bazel test --config=ci $(./ci/run/bazel_export_options) --test_tag_filters=gpu
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
python/ray/serve/...

# Todo: enable once tests pass
Expand Down
2 changes: 0 additions & 2 deletions ci/ray_ci/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ def _run_tests_in_docker(test_targets: List[str], team: str) -> subprocess.Popen
)
commands.append(
"bazel test --config=ci "
# TODO(matthewdeng): Remove this env var as part of #38570.
"--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0 "
"$(./ci/run/bazel_export_options) "
f"{' '.join(test_targets)}",
)
Expand Down
41 changes: 28 additions & 13 deletions python/ray/serve/tests/test_air_integrations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import tempfile
from typing import List, Optional
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
Expand All @@ -10,6 +10,7 @@
from fastapi import Depends, FastAPI

import ray
import ray.cloudpickle as ray_pickle
from ray import serve
from ray.train import Checkpoint
from ray.serve.air_integrations import _BatchingManager
Expand Down Expand Up @@ -128,6 +129,22 @@ def test_unpack_dataframe(self, batched_df, expected):
)


def create_dict_checkpoint(
data: Dict[str, Any], directory: Optional[str] = None
) -> Checkpoint:
if not directory:
directory = tempfile.mkdtemp()
with open(os.path.join(directory, "data.pkl"), "wb") as f:
ray_pickle.dump(data, f)
return Checkpoint.from_directory(directory)


def load_dict_checkpoint(checkpoint: Checkpoint) -> Dict[str, Any]:
with checkpoint.as_directory() as checkpoint_dir:
with open(os.path.join(checkpoint_dir, "data.pkl"), "rb") as f:
return ray_pickle.load(f)


class AdderPredictor(Predictor):
def __init__(self, increment: int, do_double: bool) -> None:
self.increment = increment
Expand All @@ -137,7 +154,7 @@ def __init__(self, increment: int, do_double: bool) -> None:
def from_checkpoint(
cls, checkpoint: Checkpoint, do_double: bool = False
) -> "AdderPredictor":
return cls(checkpoint.to_dict()["increment"], do_double)
return cls(load_dict_checkpoint(checkpoint)["increment"], do_double)

def predict(
self, data: np.ndarray, override_increment: Optional[int] = None
Expand Down Expand Up @@ -170,7 +187,7 @@ async def __call__(self, request: Request):
return self.predictor.predict(np.array(data["array"]))

AdderDeployment.options(name="Adder").deploy(
checkpoint=Checkpoint.from_dict({"increment": 2}),
checkpoint=create_dict_checkpoint({"increment": 2}),
)
resp = ray.get(send_request.remote(json={"array": [40]}))
assert resp == [{"value": 42, "batch_size": 1}]
Expand All @@ -189,7 +206,7 @@ async def __call__(self, request: Request):
)

AdderDeployment.options(name="Adder").deploy(
checkpoint=Checkpoint.from_dict({"increment": 2}),
checkpoint=create_dict_checkpoint({"increment": 2}),
)

resp = ray.get(send_request.remote(json={"array": [40]}))
Expand All @@ -207,7 +224,7 @@ async def __call__(self, request: Request):
return self.predictor.predict(np.array(data["array"]))

AdderDeployment.options(name="Adder").deploy(
checkpoint=Checkpoint.from_dict({"increment": 2}),
checkpoint=create_dict_checkpoint({"increment": 2}),
)
resp = ray.get(send_request.remote(json={"array": [40]}))
assert resp == [{"value": 84, "batch_size": 1}]
Expand All @@ -226,7 +243,7 @@ async def __call__(self, requests: List[Request]):
return self.predictor.predict(batch)

AdderDeployment.options(name="Adder").deploy(
checkpoint=Checkpoint.from_dict({"increment": 2}),
checkpoint=create_dict_checkpoint({"increment": 2}),
)

refs = [send_request.remote(json={"array": [40]}) for _ in range(2)]
Expand All @@ -250,8 +267,7 @@ async def predict(self, data=Depends(json_to_ndarray)):

def test_air_integrations_in_pipeline(serve_instance):
path = tempfile.mkdtemp()
uri = f"file://{path}/test_uri"
Checkpoint.from_dict({"increment": 2}).to_uri(uri)
create_dict_checkpoint({"increment": 2}, path)

@serve.deployment
class AdderDeployment:
Expand All @@ -263,7 +279,7 @@ async def __call__(self, data):

with InputNode() as dag_input:
m1 = AdderDeployment.bind(
checkpoint=Checkpoint.from_uri(uri),
checkpoint=Checkpoint.from_directory(path),
)
dag = m1.__call__.bind(dag_input)
deployments = build(Ingress.bind(dag), "")
Expand All @@ -278,8 +294,7 @@ async def __call__(self, data):

def test_air_integrations_reconfigure(serve_instance):
path = tempfile.mkdtemp()
uri = f"file://{path}/test_uri"
Checkpoint.from_dict({"increment": 2}).to_uri(uri)
create_dict_checkpoint({"increment": 2}, path)

@serve.deployment
class AdderDeployment:
Expand All @@ -288,7 +303,7 @@ def __init__(self, checkpoint: Checkpoint):

def reconfigure(self, config):
self.predictor = AdderPredictor.from_checkpoint(
Checkpoint.from_dict(config["checkpoint"])
create_dict_checkpoint(config["checkpoint"])
)

async def __call__(self, data):
Expand All @@ -300,7 +315,7 @@ async def __call__(self, data):

with InputNode() as dag_input:
m1 = AdderDeployment.options(user_config=additional_config).bind(
checkpoint=Checkpoint.from_uri(uri),
checkpoint=Checkpoint.from_directory(path),
)
dag = m1.__call__.bind(dag_input)
deployments = build(Ingress.bind(dag), "")
Expand Down
7 changes: 6 additions & 1 deletion python/ray/serve/tests/test_air_integrations_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ def __init__(self, checkpoint):
async def __call__(self, data):
return self.predictor.predict(data)

import tempfile

tmpdir = tempfile.mkdtemp()
checkpoint = Checkpoint.from_directory(tmpdir)

serve.run(
DAGDriver.bind(
DummyGPUDeployment.options(name="GPU").bind(Checkpoint.from_dict({"x": 1})),
DummyGPUDeployment.options(name="GPU").bind(checkpoint),
http_adapter=json_to_ndarray,
)
)
Expand Down
5 changes: 3 additions & 2 deletions python/ray/tests/test_multi_node_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,15 @@ def test_run_driver_twice(ray_start_regular):
address_info = ray_start_regular
driver_script = """
import ray
import ray.train
import ray.tune as tune
import os
import time
def train_func(config, reporter): # add a reporter arg
def train_func(config):
for i in range(2):
time.sleep(0.1)
reporter(timesteps_total=i, mean_accuracy=i+97) # report metrics
ray.train.report(dict(timesteps_total=i, mean_accuracy=i+97)) # report metrics
os.environ["TUNE_RESUME_PROMPT_OFF"] = "True"
ray.init(address="{}", namespace="default_test_namespace")
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tests/test_task_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ def test_parent_task_id_tune_e2e(shutdown_only):
script = """
import numpy as np
import ray
import ray.train
from ray import tune
import time
Expand All @@ -448,7 +449,7 @@ def train_function(config):
for i in range(5):
loss = config["mean"] * np.random.randn() + ray.get(
train_step_1.remote())
tune.report(loss=loss, nodes=ray.nodes())
ray.train.report(dict(loss=loss, nodes=ray.nodes()))
def tune_function():
Expand Down
2 changes: 0 additions & 2 deletions python/ray/tests/test_usage_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1206,9 +1206,7 @@ def run_usage_stats_server(reporter):
if os.environ.get("RAY_MINIMAL") != "1":
expected_payload["tune_scheduler"] = "FIFOScheduler"
expected_payload["tune_searcher"] = "BasicVariantGenerator"
expected_payload["air_storage_configuration"] = "driver"
expected_payload["air_entrypoint"] = "Tuner.fit"
expected_payload["air_env_vars"] = '["RAY_AIR_NEW_PERSISTENCE_MODE"]'
assert payload["extra_usage_tags"] == expected_payload
assert payload["total_num_nodes"] == 1
assert payload["total_num_running_jobs"] == 1
Expand Down

0 comments on commit 4dac931

Please sign in to comment.