Skip to content

Commit

Permalink
[train] New persistence mode: Update 📖 `Doc tests and examples (exclu…
Browse files Browse the repository at this point in the history
…ding Ray AIR examples)` (ray-project#38940)

Signed-off-by: Justin Yu <[email protected]>
Signed-off-by: Matthew Deng <[email protected]>
Co-authored-by: Matthew Deng <[email protected]>
  • Loading branch information
justinvyu and matthewdeng authored Aug 27, 2023
1 parent d0b3f10 commit 02bb8f1
Show file tree
Hide file tree
Showing 25 changed files with 600 additions and 408 deletions.
4 changes: 2 additions & 2 deletions .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,8 @@
# (see https://github.com/ray-project/ray/pull/38432/)
- pip install "transformers==4.30.2" "datasets==2.14.0"
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=-timeseries_libs,-external,-ray_air,-gpu,-post_wheel_build,-doctest,-datasets_train,-highly_parallel,-new_storage
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_tag_filters=-timeseries_libs,-external,-ray_air,-gpu,-post_wheel_build,-doctest,-datasets_train,-highly_parallel
doc/...

- label: ":book: Doc tests and examples with time series libraries"
Expand Down
13 changes: 9 additions & 4 deletions doc/source/ray-air/doc_code/hvd_trainer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import tempfile

import horovod.torch as hvd
import ray
from ray import train
Expand Down Expand Up @@ -55,10 +58,12 @@ def train_loop_per_worker():
loss.backward()
optimizer.step()
print(f"epoch: {epoch}, loss: {loss.item()}")
train.report(
{},
checkpoint=Checkpoint.from_dict(dict(model=model.state_dict())),
)

with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report(
{"loss": loss.item()}, checkpoint=Checkpoint.from_directory(tmpdir)
)


train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# isort: skip_file

# __air_session_start__
import os
import tempfile

import tensorflow as tf
from ray import train
Expand Down Expand Up @@ -30,8 +32,9 @@ def train_func():
else:
model = build_model()

model.save("my_model", overwrite=True)
train.report(metrics={"iter": 1}, checkpoint=Checkpoint.from_directory("my_model"))
with tempfile.TemporaryDirectory() as tmpdir:
model.save(tmpdir, overwrite=True)
train.report(metrics={"iter": 1}, checkpoint=Checkpoint.from_directory(tmpdir))


scaling_config = ScalingConfig(num_workers=2)
Expand All @@ -45,7 +48,7 @@ def train_func():
train_loop_per_worker=train_func,
scaling_config=scaling_config,
# this is ultimately what is accessed through
# ``Session.get_checkpoint()``
# ``train.get_checkpoint()``
resume_from_checkpoint=result.checkpoint,
)
result2 = trainer2.fit()
Expand Down
18 changes: 12 additions & 6 deletions doc/source/ray-air/doc_code/torch_trainer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import tempfile

import torch
import torch.nn as nn

Expand Down Expand Up @@ -48,12 +51,15 @@ def train_loop_per_worker():
optimizer.step()
print(f"epoch: {epoch}, loss: {loss.item()}")

train.report(
{},
checkpoint=Checkpoint.from_dict(
dict(epoch=epoch, model=model.state_dict())
),
)
with tempfile.TemporaryDirectory() as tempdir:
torch.save(
{"epoch": epoch, "model": model.module.state_dict()},
os.path.join(tempdir, "checkpoint.pt"),
)
train.report(
{"loss": loss.item()},
checkpoint=Checkpoint.from_directory(tempdir),
)


train_dataset = ray.data.from_items([{"x": x, "y": 2 * x + 1} for x in range(200)])
Expand Down
19 changes: 14 additions & 5 deletions doc/source/ray-overview/doc_test/ray_train.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
import tempfile

import torch

import ray.train as train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer, LegacyTorchCheckpoint
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer


def train_func():
Expand All @@ -21,15 +24,21 @@ def train_func():

# Train.
for _ in range(5):
epoch_loss = []
for X, y in dataloader:
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train.report({"loss": loss.item()})

train.report({}, checkpoint=LegacyTorchCheckpoint.from_model(model))
epoch_loss.append(loss.item())
train.report({"loss": sum(epoch_loss) / len(epoch_loss)})

with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report(
{"loss": loss.item()}, checkpoint=Checkpoint.from_directory(tmpdir)
)


trainer = TorchTrainer(train_func, scaling_config=ScalingConfig(num_workers=4))
Expand Down
2 changes: 1 addition & 1 deletion doc/source/rllib/doc_code/checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# Create standard (pickle-based) checkpoint.
with tempfile.TemporaryDirectory() as pickle_cp_dir:
# Note: `save()` always creates a pickle based checkpoint.
pickle_cp_dir = algo1.save(checkpoint_dir=pickle_cp_dir)
algo1.save(checkpoint_dir=pickle_cp_dir)

# But we can convert this pickle checkpoint to a msgpack one using an RLlib utility
# function.
Expand Down
18 changes: 12 additions & 6 deletions doc/source/train/doc_code/accelerate_trainer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import tempfile

import torch
import torch.nn as nn

Expand Down Expand Up @@ -51,12 +54,15 @@ def train_loop_per_worker():
optimizer.step()
print(f"epoch: {epoch}, loss: {loss.item()}")

train.report(
metrics={"epoch": epoch, "loss": loss.item()},
checkpoint=Checkpoint.from_dict(
dict(epoch=epoch, model=accelerator.unwrap_model(model).state_dict())
),
)
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(
accelerator.unwrap_model(model).state_dict(),
os.path.join(tmpdir, "model.pt"),
)
train.report(
metrics={"epoch": epoch, "loss": loss.item()},
checkpoint=Checkpoint.from_directory(tmpdir),
)


train_dataset = ray.data.from_items([{"x": x, "y": 2 * x + 1} for x in range(200)])
Expand Down
42 changes: 25 additions & 17 deletions doc/source/train/doc_code/dl_guide.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
MOCK = True

# __ft_initial_run_start__
import os
import tempfile
from typing import Dict, Optional

import torch

import ray
from ray import train
from ray.train.torch import LegacyTorchCheckpoint, TorchTrainer
from ray.train import Checkpoint
from ray.train.torch import TorchTrainer


def get_datasets() -> Dict[str, ray.data.Dataset]:
Expand All @@ -17,30 +22,34 @@ def get_datasets() -> Dict[str, ray.data.Dataset]:
def train_loop_per_worker(config: dict):
from torchvision.models import resnet18

model = resnet18()

# Checkpoint loading
checkpoint: Optional[LegacyTorchCheckpoint] = train.get_checkpoint()
model = checkpoint.get_model() if checkpoint else resnet18()
ray.train.torch.prepare_model(model)
checkpoint: Optional[Checkpoint] = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
model.load_state_dict(model_state_dict)

model = train.torch.prepare_model(model)

train_ds = train.get_dataset_shard("train")

for epoch in range(5):
# Do some training...

# Checkpoint saving
train.report(
{"epoch": epoch},
checkpoint=LegacyTorchCheckpoint.from_model(model),
)
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report({"epoch": epoch}, checkpoint=Checkpoint.from_directory(tmpdir))


trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
datasets=get_datasets(),
scaling_config=train.ScalingConfig(num_workers=2),
run_config=train.RunConfig(
storage_path="~/ray_results",
name="dl_trainer_restore",
name="dl_trainer_restore", storage_path=os.path.expanduser("~/ray_results")
),
)
result = trainer.fit()
Expand All @@ -50,7 +59,7 @@ def train_loop_per_worker(config: dict):
from ray.train.torch import TorchTrainer

restored_trainer = TorchTrainer.restore(
path="~/ray_results/dl_trainer_restore",
path=os.path.expanduser("~/ray_results/dl_trainer_restore"),
datasets=get_datasets(),
)
# __ft_restored_run_end__
Expand Down Expand Up @@ -78,19 +87,18 @@ def train_loop_per_worker(config: dict):


# __ft_autoresume_start__
if TorchTrainer.can_restore("~/ray_results/dl_restore_autoresume"):
trainer = TorchTrainer.restore(
"~/ray_results/dl_restore_autoresume",
datasets=get_datasets(),
)
experiment_path = os.path.expanduser("~/ray_results/dl_restore_autoresume")
if TorchTrainer.can_restore(experiment_path):
trainer = TorchTrainer.restore(experiment_path, datasets=get_datasets())
result = trainer.fit()
else:
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
datasets=get_datasets(),
scaling_config=train.ScalingConfig(num_workers=2),
run_config=train.RunConfig(
storage_path="~/ray_results", name="dl_restore_autoresume"
storage_path=os.path.expanduser("~/ray_results"),
name="dl_restore_autoresume",
),
)
result = trainer.fit()
Expand Down
33 changes: 24 additions & 9 deletions doc/source/train/doc_code/key_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def train_fn(config):


# __session_checkpoint_start__
import json
import os
import tempfile

from ray import train
from ray.train import ScalingConfig, Checkpoint
from ray.train.data_parallel_trainer import DataParallelTrainer
Expand All @@ -63,22 +67,33 @@ def train_fn(config):
checkpoint = train.get_checkpoint()

if checkpoint:
state = checkpoint.to_dict()
with checkpoint.as_directory() as checkpoint_dir:
with open(os.path.join(checkpoint_dir, "checkpoint.json"), "r") as f:
state = json.load(f)
state["step"] += 1
else:
state = {"step": 0}

for i in range(state["step"], 10):
state["step"] += 1
train.report(
metrics={"step": state["step"], "loss": (100 - i) / 100},
checkpoint=Checkpoint.from_dict(state),
)
with tempfile.TemporaryDirectory() as tempdir:
with open(os.path.join(tempdir, "checkpoint.json"), "w") as f:
json.dump(state, f)

train.report(
metrics={"step": state["step"], "loss": (100 - i) / 100},
checkpoint=Checkpoint.from_directory(tempdir),
)


example_checkpoint_dir = tempfile.mkdtemp()
with open(os.path.join(example_checkpoint_dir, "checkpoint.json"), "w") as f:
json.dump({"step": 4}, f)

trainer = DataParallelTrainer(
train_loop_per_worker=train_fn,
scaling_config=ScalingConfig(num_workers=1),
resume_from_checkpoint=Checkpoint.from_dict({"step": 4}),
resume_from_checkpoint=Checkpoint.from_directory(example_checkpoint_dir),
)
trainer.fit()

Expand Down Expand Up @@ -108,7 +123,7 @@ def train_fn(config):
# Name of the training run (directory name).
name="my_train_run",
# The experiment results will be saved to: storage_path/name
storage_path="~/ray_results",
storage_path=os.path.expanduser("~/ray_results"),
# storage_path="s3://my_bucket/tune_results",
# Custom and built-in callbacks
callbacks=[WandbLoggerCallback()],
Expand Down Expand Up @@ -203,9 +218,9 @@ def train_fn(config):

# TODO(justinvyu): Re-enable this after updating all of doc_code.
# __result_restore_start__
# from ray.train import Result
from ray.train import Result

# restored_result = Result.from_path(result_path)
restored_result = Result.from_path(result_path)
print("Restored loss", result.metrics["loss"])
# __result_restore_end__

Expand Down
10 changes: 8 additions & 2 deletions doc/source/train/doc_code/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
# __xgboost_end__

# __torch_start__
import os

from ray import tune
from ray.tune import Tuner
from ray.train.examples.pytorch.torch_linear_example import (
Expand Down Expand Up @@ -104,7 +106,9 @@

tuner = Tuner(
trainable=trainer,
run_config=RunConfig(name="test_tuner", storage_path="~/ray_results"),
run_config=RunConfig(
name="test_tuner", storage_path=os.path.expanduser("~/ray_results")
),
param_space=param_space,
tune_config=tune.TuneConfig(
mode="min", metric="loss", num_samples=2, max_concurrent_trials=2
Expand Down Expand Up @@ -229,7 +233,9 @@ def get_another_dataset():

# __tune_restore_start__
tuner = Tuner.restore(
path="~/ray_results/test_tuner", trainable=trainer, restart_errored=True
path=os.path.expanduser("~/ray_results/test_tuner"),
trainable=trainer,
restart_errored=True,
)
tuner.fit()
# __tune_restore_end__
Loading

0 comments on commit 02bb8f1

Please sign in to comment.