Skip to content

Commit

Permalink
Revert "[RLlib] Reparameterize the construction of TrainerRunner and …
Browse files Browse the repository at this point in the history
…RLTrainers (ray-project#31991)" (ray-project#32130)

Reverts ray-project#31991

This PR seems to have broken CI.

Screenshot 2023-01-31 at 1 39 09 PM

The error is https://buildkite.com/ray-project/oss-ci-build-branch/builds/2099#01860972-e02e-47c4-8f86-8be28ea18d92/3786-3992
AttributeError: '_TFStub' object has no attribute 'Tensor'
architkulkarni authored Jan 31, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 10d52f7 commit d15ccfc
Showing 15 changed files with 319 additions and 436 deletions.
9 changes: 8 additions & 1 deletion rllib/BUILD
Original file line number Diff line number Diff line change
@@ -1846,10 +1846,17 @@ py_test(
py_test(
name = "test_trainer_runner",
tags = ["team:rllib", "multi_gpu", "exclusive"],
size = "large",
size = "medium",
srcs = ["core/rl_trainer/tests/test_trainer_runner.py"]
)

py_test(
name = "test_trainer_runner_local",
tags = ["team:rllib", "core", "exclusive"],
size = "medium",
srcs = ["core/rl_trainer/tests/test_trainer_runner_local.py"]
)

py_test(
name = "test_trainer_runner_config",
tags = ["team:rllib", "core"],
46 changes: 3 additions & 43 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@
import ray
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from ray.rllib.core.rl_module.rl_module import SingleAgentRLModuleSpec
from ray.rllib.core.rl_trainer.rl_trainer import RLTrainerHPs
from ray.rllib.core.rl_trainer.trainer_runner_config import (
TrainerRunnerConfig,
ModuleSpec,
@@ -243,9 +242,6 @@ def __init__(self, algo_class=None):
self.num_gpus_per_worker = 0
self._fake_gpus = False
self.num_cpus_for_local_worker = 1
self.num_trainer_workers = 0
self.num_gpus_per_trainer_worker = 0
self.num_cpus_per_trainer_worker = 1
self.custom_resources_per_worker = {}
self.placement_strategy = "PACK"

@@ -322,10 +318,6 @@ def __init__(self, algo_class=None):
self.max_requests_in_flight_per_sampler_worker = 2
self.rl_trainer_class = None
self._enable_rl_trainer_api = False
# experimental: this will contain the hyper-parameters that are passed to the
# RLTrainer, for computing loss, etc. New algorithms have to set this to their
# own default. .training() will modify the fields of this object.
self._rl_trainer_hps = RLTrainerHPs()

# `self.callbacks()`
self.callbacks_class = DefaultCallbacks
@@ -451,10 +443,6 @@ def __init__(self, algo_class=None):
self.soft_horizon = DEPRECATED_VALUE
self.no_done_at_end = DEPRECATED_VALUE

@property
def rl_trainer_hps(self) -> RLTrainerHPs:
return self._rl_trainer_hps

def to_dict(self) -> AlgorithmConfigDict:
"""Converts all settings into a legacy config dict for backward compatibility.
@@ -959,9 +947,6 @@ def resources(
num_cpus_per_worker: Optional[Union[float, int]] = NotProvided,
num_gpus_per_worker: Optional[Union[float, int]] = NotProvided,
num_cpus_for_local_worker: Optional[int] = NotProvided,
num_trainer_workers: Optional[int] = NotProvided,
num_cpus_per_trainer_worker: Optional[Union[float, int]] = NotProvided,
num_gpus_per_trainer_worker: Optional[Union[float, int]] = NotProvided,
custom_resources_per_worker: Optional[dict] = NotProvided,
placement_strategy: Optional[str] = NotProvided,
) -> "AlgorithmConfig":
@@ -981,20 +966,6 @@ def resources(
fractional. This is usually needed only if your env itself requires a
GPU (i.e., it is a GPU-intensive video game), or model inference is
unusually expensive.
num_trainer_workers: Number of workers used for training. A value of 0
means training will take place on a local worker on head node CPUs or 1
GPU (determined by `num_gpus_per_trainer_worker`). For multi-gpu
training, set number of workers greater than 1 and set
`num_gpus_per_trainer_worker` accordingly (e.g. 4 GPUs total, and model
needs 2 GPUs: `num_trainer_workers = 2` and
`num_gpus_per_trainer_worker = 2`)
num_cpus_per_trainer_worker: Number of CPUs allocated per trainer worker.
Only necessary for custom processing pipeline inside each RLTrainer
requiring multiple CPU cores. Ignored if `num_trainer_workers = 0`.
num_gpus_per_trainer_worker: Number of GPUs allocated per worker. If
`num_trainer_workers = 0`, any value greater than 0 will run the
training on a single GPU on the head node, while a value of 0 will run
the training on head node CPU cores.
custom_resources_per_worker: Any custom Ray resources to allocate per
worker.
num_cpus_for_local_worker: Number of CPUs to allocate for the algorithm.
@@ -1035,13 +1006,6 @@ def resources(
if placement_strategy is not NotProvided:
self.placement_strategy = placement_strategy

if num_trainer_workers is not NotProvided:
self.num_trainer_workers = num_trainer_workers
if num_cpus_per_trainer_worker is not NotProvided:
self.num_cpus_per_trainer_worker = num_cpus_per_trainer_worker
if num_gpus_per_trainer_worker is not NotProvided:
self.num_gpus_per_trainer_worker = num_gpus_per_trainer_worker

return self

def framework(
@@ -2681,16 +2645,12 @@ def get_trainer_runner_config(
.module(module_spec)
.trainer(
trainer_class=self.rl_trainer_class,
eager_tracing=self.eager_tracing,
# TODO (Kourosh): optimizer config can now be more complicated.
optimizer_config={"lr": self.lr},
rl_trainer_hps=self.rl_trainer_hps,
)
.resources(
num_trainer_workers=self.num_trainer_workers,
num_cpus_per_trainer_worker=self.num_cpus_per_trainer_worker,
num_gpus_per_trainer_worker=self.num_gpus_per_trainer_worker,
)
.framework(eager_tracing=self.eager_tracing)
.resources(num_gpus=self.num_gpus, fake_gpus=self._fake_gpus)
.algorithm(algorithm_config=self)
)

return config
93 changes: 10 additions & 83 deletions rllib/core/rl_trainer/rl_trainer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import abc

from dataclasses import dataclass, field
import logging
import numpy as np
from typing import (
@@ -15,6 +14,7 @@
Tuple,
Type,
Union,
TYPE_CHECKING,
)

from ray.rllib.utils.framework import try_import_tf, try_import_torch
@@ -23,7 +23,6 @@
ModuleID,
SingleAgentRLModuleSpec,
)

from ray.rllib.core.rl_module.marl_module import (
MultiAgentRLModule,
MultiAgentRLModuleSpec,
@@ -32,8 +31,10 @@
from ray.rllib.utils.nested_dict import NestedDict
from ray.rllib.utils.numpy import convert_to_numpy
from ray.rllib.utils.typing import TensorType
from ray.rllib.core.rl_trainer.scaling_config import TrainerScalingConfig

if TYPE_CHECKING:
from ray.air.config import ScalingConfig
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

torch, _ = try_import_torch()
tf1, tf, tfv = try_import_tf()
@@ -47,32 +48,6 @@
ParamDictType = Dict[ParamRef, ParamType]


@dataclass
class FrameworkHPs:
"""The framework specific hyper-parameters.
Args:
eager_tracing: Whether to trace the model in eager mode. This enables tf
tracing mode by wrapping the loss function computation in a tf.function.
This is useful for speeding up the training loop. However, it is not
compatible with all tf operations. For example, tf.print is not supported
in tf.function.
"""

eager_tracing: bool = False


@dataclass
class RLTrainerHPs:
"""The hyper-parameters for RLTrainer.
When creating a new RLTrainer, the new hyper-parameters have to be defined by
subclassing this class and adding the new hyper-parameters as fields.
"""

pass


class RLTrainer:
"""Base class for RLlib algorithm trainers.
@@ -143,9 +118,9 @@ def __init__(
] = None,
module: Optional[RLModule] = None,
optimizer_config: Mapping[str, Any] = None,
trainer_scaling_config: TrainerScalingConfig = TrainerScalingConfig(),
trainer_hyperparameters: Optional[RLTrainerHPs] = RLTrainerHPs(),
framework_hyperparameters: Optional[FrameworkHPs] = FrameworkHPs(),
distributed: bool = False,
scaling_config: Optional["ScalingConfig"] = None,
algorithm_config: Optional["AlgorithmConfig"] = None,
):
# TODO (Kourosh): Having the entire algorithm_config inside trainer may not be
# the best idea in the world, but it's easy to implement and user will
@@ -165,10 +140,9 @@ def __init__(
self.module_spec = module_spec
self.module_obj = module
self.optimizer_config = optimizer_config
self.config = trainer_hyperparameters

# pick the configs that we need for the trainer from scaling config
self._distributed = trainer_scaling_config.num_workers > 1
self.distributed = distributed
self.scaling_config = scaling_config
self.config = algorithm_config

# These are the attributes that are set during build
self._module: MultiAgentRLModule = None
@@ -177,10 +151,6 @@ def __init__(
self._param_to_optim: Dict[ParamRef, Optimizer] = {}
self._params: ParamDictType = {}

@property
def distributed(self) -> bool:
return self._distributed

@property
def module(self) -> MultiAgentRLModule:
return self._module
@@ -641,46 +611,3 @@ def __check_if_build_called(self):
"RLTrainer.build() must be called after constructing a "
"RLTrainer and before calling any methods on it."
)


@dataclass
class RLTrainerSpec:
"""The spec for construcitng RLTrainer actors.
Args:
rl_trainer_class: The RLTrainer class to use.
module_spec: The underlying (MA)RLModule spec to completely define the module.
module: Alternatively the RLModule instance can be passed in directly. This
only works if the RLTrainer is not an actor.
backend_config: The backend config for properly distributing the RLModule.
optimizer_config: The optimizer setting to apply during training.
trainer_hyperparameters: The extra config for the loss/additional update. This
should be a subclass of RLTrainerHPs. This is useful for passing in
algorithm configs that contains the hyper-parameters for loss computation,
change of training behaviors, etc. e.g lr, entropy_coeff.
"""

rl_trainer_class: Type["RLTrainer"]
module_spec: Union["SingleAgentRLModuleSpec", "MultiAgentRLModuleSpec"] = None
module: Optional["RLModule"] = None
trainer_scaling_config: TrainerScalingConfig = field(
default_factory=TrainerScalingConfig
)
optimizer_config: Dict[str, Any] = field(default_factory=dict)
trainer_hyperparameters: RLTrainerHPs = field(default_factory=RLTrainerHPs)
framework_hyperparameters: FrameworkHPs = field(default_factory=FrameworkHPs)

def get_params_dict(self) -> Dict[str, Any]:
"""Returns the parameters than be passed to the RLTrainer constructor."""
return {
"module": self.module,
"module_spec": self.module_spec,
"trainer_scaling_config": self.trainer_scaling_config,
"optimizer_config": self.optimizer_config,
"trainer_hyperparameters": self.trainer_hyperparameters,
"framework_hyperparameters": self.framework_hyperparameters,
}

def build(self) -> "RLTrainer":
"""Builds the RLTrainer instance."""
return self.rl_trainer_class(**self.get_params_dict())
21 changes: 0 additions & 21 deletions rllib/core/rl_trainer/scaling_config.py

This file was deleted.

10 changes: 7 additions & 3 deletions rllib/core/rl_trainer/tests/test_rl_trainer.py
Original file line number Diff line number Diff line change
@@ -11,12 +11,16 @@
from ray.rllib.core.testing.tf.bc_rl_trainer import BCTfRLTrainer
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils.test_utils import check, get_cartpole_dataset_reader
from ray.rllib.core.rl_trainer.scaling_config import TrainerScalingConfig


def get_trainer() -> RLTrainer:
def get_trainer(distributed=False) -> RLTrainer:
env = gym.make("CartPole-v1")

# TODO: Another way to make RLTrainer would be to construct the module first
# and then apply trainer to it. We should also allow that. In fact if we figure
# out the serialization of RLModules we can simply pass the module the trainer
# and internally it will serialize and deserialize the module for distributed
# construction.
trainer = BCTfRLTrainer(
module_spec=SingleAgentRLModuleSpec(
module_class=DiscreteBCTFModule,
@@ -25,7 +29,7 @@ def get_trainer() -> RLTrainer:
model_config={"hidden_dim": 32},
),
optimizer_config={"lr": 1e-3},
trainer_scaling_config=TrainerScalingConfig(),
distributed=distributed,
)

trainer.build()
126 changes: 22 additions & 104 deletions rllib/core/rl_trainer/tests/test_trainer_runner.py
Original file line number Diff line number Diff line change
@@ -2,130 +2,55 @@
import unittest
import ray
import time
import numpy as np
import itertools

from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID, MultiAgentBatch
from ray.rllib.utils.test_utils import check, get_cartpole_dataset_reader
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.core.rl_trainer.scaling_config import TrainerScalingConfig
from ray.rllib.utils.test_utils import get_cartpole_dataset_reader
from ray.rllib.core.testing.utils import (
get_trainer_runner,
get_rl_trainer,
add_module_to_runner_or_trainer,
)


class TestTrainerRunner(unittest.TestCase):
"""This test is setup for 2 gpus."""

# TODO: This unittest should also test other resource allocations like multi-cpu,
# multi-node multi-gpu, etc.

@classmethod
def setUpClass(cls) -> None:
def setUp(cls) -> None:
ray.init()

# Settings to test
cls.scaling_configs = {
"local-cpu": TrainerScalingConfig(num_workers=0, num_gpus_per_worker=0),
"local-gpu": TrainerScalingConfig(num_workers=0, num_gpus_per_worker=0.5),
"remote-cpu": TrainerScalingConfig(num_workers=1),
"remote-gpu": TrainerScalingConfig(num_workers=1, num_gpus_per_worker=0.5),
"multi-gpu-ddp": TrainerScalingConfig(num_workers=2, num_gpus_per_worker=1),
"multi-cpu-ddp": TrainerScalingConfig(num_workers=2, num_cpus_per_worker=2),
# "multi-gpu-ddp-pipeline": TrainerScalingConfig(
# num_workers=2, num_gpus_per_worker=2
# ),
}

@classmethod
def tearDownClass(cls) -> None:
def tearDown(cls) -> None:
ray.shutdown()

def test_trainer_runner_local(self):

tf1, tf, tfv = try_import_tf()
tf1.executing_eagerly()

# TODO (Avnish): tf does not clear out the GPU memory footprint, therefore
# doing it first before torch will result in OOM. Find a way to clear out the
# GPU memory footprint of tf.
fws = ["torch"]
scaling_modes = ["local-cpu", "local-gpu"]
test_iterator = itertools.product(fws, scaling_modes)

env = gym.make("CartPole-v1")
for fw, scaling_mode in test_iterator:
print(f"Testing framework: {fw}, scaling mode: {scaling_mode}")
ray.init(ignore_reinit_error=True)
scaling_config = self.scaling_configs[scaling_mode]
runner = get_trainer_runner(fw, env, scaling_config)
local_trainer = get_rl_trainer(fw, env)
local_trainer.build()

# make the state of the trainer and the local runner identical
local_trainer.set_state(runner.get_state()[0])

reader = get_cartpole_dataset_reader(batch_size=500)
batch = reader.next()
batch = batch.as_multi_agent()
check(local_trainer.update(batch), runner.update(batch)[0])

new_module_id = "test_module"

add_module_to_runner_or_trainer(fw, env, new_module_id, runner)
add_module_to_runner_or_trainer(fw, env, new_module_id, local_trainer)

# make the state of the trainer and the local runner identical
local_trainer.set_state(runner.get_state()[0])

# do another update
batch = reader.next()
ma_batch = MultiAgentBatch(
{new_module_id: batch, DEFAULT_POLICY_ID: batch}, env_steps=batch.count
)
check(local_trainer.update(ma_batch), runner.update(ma_batch)[0])

check(local_trainer.get_state(), runner.get_state()[0])

# make sure the runner resources are freed up so that we don't autoscale
del runner
del local_trainer
ray.shutdown()
time.sleep(10)

def test_update_multigpu(self):
"""Test training in a 2 gpu setup and that weights are synchronized."""

# TODO (Avnish): The tf + remote-gpu test is flakey. Removing for now until
# investigated.
fws = ["torch"]
scaling_modes = self.scaling_configs.keys()
test_iterator = itertools.product(fws, scaling_modes)

for fw, scaling_mode in test_iterator:
print(f"Testing framework: {fw}, scaling mode: {scaling_mode}.")
for fw in ["tf", "torch"]:
ray.init(ignore_reinit_error=True)
print(f"Testing framework: {fw}.")
env = gym.make("CartPole-v1")

scaling_config = self.scaling_configs[scaling_mode]
runner = get_trainer_runner(fw, env, scaling_config)
reader = get_cartpole_dataset_reader(batch_size=1024)
runner = get_trainer_runner(fw, env, compute_config=dict(num_gpus=2))
reader = get_cartpole_dataset_reader(batch_size=500)

min_loss = float("inf")
for iter_i in range(1000):
batch = reader.next()
results = runner.update(batch.as_multi_agent())
res_0, res_1 = runner.update(batch.as_multi_agent())

loss = np.mean([res["loss"]["total_loss"] for res in results])
loss = (res_0["loss"]["total_loss"] + res_1["loss"]["total_loss"]) / 2
min_loss = min(loss, min_loss)
print(f"[iter = {iter_i}] Loss: {loss:.3f}, Min Loss: {min_loss:.3f}")
# The loss is initially around 0.69 (ln2). When it gets to around
# 0.57 the return of the policy gets to around 100.
if min_loss < 0.57:
break

for res1, res2 in zip(results, results[1:]):
self.assertEqual(
res1["mean_weight"]["default_policy"],
res2["mean_weight"]["default_policy"],
)

self.assertEqual(
res_0["mean_weight"]["default_policy"],
res_1["mean_weight"]["default_policy"],
)
self.assertLess(min_loss, 0.57)

# make sure the runner resources are freed up so that we don't autoscale
@@ -135,18 +60,11 @@ def test_update_multigpu(self):

def test_add_remove_module(self):

# TODO (Avnish): The tf + remote-gpu test is flakey. Removing for now until
# investigated.
fws = ["torch"]
scaling_modes = self.scaling_configs.keys()
test_iterator = itertools.product(fws, scaling_modes)

for fw, scaling_mode in test_iterator:
print(f"Testing framework: {fw}, scaling mode: {scaling_mode}.")
for fw in ["tf", "torch"]:
ray.init(ignore_reinit_error=True)
print(f"Testing framework: {fw}.")
env = gym.make("CartPole-v1")
scaling_config = self.scaling_configs[scaling_mode]
runner = get_trainer_runner(fw, env, scaling_config)
runner = get_trainer_runner(fw, env, compute_config=dict(num_gpus=2))
reader = get_cartpole_dataset_reader(batch_size=500)
batch = reader.next()

1 change: 1 addition & 0 deletions rllib/core/rl_trainer/tests/test_trainer_runner_config.py
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ def test_trainer_runner_build(self):
.trainer(
trainer_class=BCTfRLTrainer,
)
.algorithm(algorithm_config=AlgorithmConfig())
)
config.build()

70 changes: 70 additions & 0 deletions rllib/core/rl_trainer/tests/test_trainer_runner_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import gymnasium as gym
import unittest

import ray

from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID, MultiAgentBatch
from ray.rllib.utils.test_utils import check, get_cartpole_dataset_reader
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.core.testing.utils import (
add_module_to_runner_or_trainer,
get_trainer_runner,
get_rl_trainer,
)


tf1, tf, tfv = try_import_tf()
tf1.executing_eagerly()


class TestTrainerRunnerLocal(unittest.TestCase):
"""This test is a trainer test setup for no gpus."""

# TODO: Make a unittest that does not need 2 gpus to run.
# So that the user can run it locally as well.
@classmethod
def setUp(cls) -> None:
ray.init()

@classmethod
def tearDown(cls) -> None:
ray.shutdown()

def test_trainer_runner_no_gpus(self):
env = gym.make("CartPole-v1")
for fw in ["tf", "torch"]:
runner = get_trainer_runner(fw, env, compute_config=dict(num_gpus=0))
local_trainer = get_rl_trainer(fw, env)
local_trainer.build()

# make the state of the trainer and the local runner identical
local_trainer.set_state(runner.get_state()[0])

reader = get_cartpole_dataset_reader(batch_size=500)
batch = reader.next()
batch = batch.as_multi_agent()
check(local_trainer.update(batch), runner.update(batch)[0])

new_module_id = "test_module"

add_module_to_runner_or_trainer(fw, env, new_module_id, runner)
add_module_to_runner_or_trainer(fw, env, new_module_id, local_trainer)

# make the state of the trainer and the local runner identical
local_trainer.set_state(runner.get_state()[0])

# do another update
batch = reader.next()
ma_batch = MultiAgentBatch(
{new_module_id: batch, DEFAULT_POLICY_ID: batch}, env_steps=batch.count
)
check(local_trainer.update(ma_batch), runner.update(ma_batch)[0])

check(local_trainer.get_state(), runner.get_state()[0])


if __name__ == "__main__":
import pytest
import sys

sys.exit(pytest.main(["-v", __file__]))
32 changes: 26 additions & 6 deletions rllib/core/rl_trainer/tf/tf_rl_trainer.py
Original file line number Diff line number Diff line change
@@ -9,10 +9,10 @@
Dict,
Sequence,
Hashable,
TYPE_CHECKING,
)

from ray.rllib.core.rl_trainer.rl_trainer import (
FrameworkHPs,
RLTrainer,
ParamOptimizerPairs,
ParamRef,
@@ -25,13 +25,19 @@
ModuleID,
SingleAgentRLModuleSpec,
)
from ray.rllib.core.rl_module.marl_module import MultiAgentRLModule
from ray.rllib.core.rl_module.marl_module import (
MultiAgentRLModule,
MultiAgentRLModuleSpec,
)
from ray.rllib.policy.sample_batch import MultiAgentBatch
from ray.rllib.utils.annotations import override
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.typing import TensorType
from ray.rllib.utils.nested_dict import NestedDict

if TYPE_CHECKING:
from ray.air.config import ScalingConfig
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

tf1, tf, tfv = try_import_tf()

@@ -88,10 +94,24 @@ class TfRLTrainer(RLTrainer):
def __init__(
self,
*,
framework_hyperparameters: Optional[FrameworkHPs] = FrameworkHPs(),
**kwargs,
module_spec: Optional[
Union[SingleAgentRLModuleSpec, MultiAgentRLModuleSpec]
] = None,
module: Optional[RLModule] = None,
optimizer_config: Mapping[str, Any],
distributed: bool = False,
enable_tf_function: bool = True,
scaling_config: Optional["ScalingConfig"] = None,
algorithm_config: Optional["AlgorithmConfig"] = None,
):
super().__init__(framework_hyperparameters=framework_hyperparameters, **kwargs)
super().__init__(
module_spec=module_spec,
module=module,
optimizer_config=optimizer_config,
distributed=distributed,
scaling_config=scaling_config,
algorithm_config=algorithm_config,
)

# TODO (Kourosh): This is required to make sure tf computes the values in the
# end. Two question remains:
@@ -101,7 +121,7 @@ def __init__(
# does not mention this as a requirement?
tf1.enable_eager_execution()

self._enable_tf_function = framework_hyperparameters.eager_tracing
self._enable_tf_function = enable_tf_function
if self._enable_tf_function:
self._update_fn = tf.function(self._do_update_fn)
else:
25 changes: 17 additions & 8 deletions rllib/core/rl_trainer/torch/tests/test_torch_rl_trainer.py
Original file line number Diff line number Diff line change
@@ -12,21 +12,30 @@
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils.test_utils import check, get_cartpole_dataset_reader
from ray.rllib.utils.numpy import convert_to_numpy
from ray.rllib.core.rl_trainer.scaling_config import TrainerScalingConfig

from ray.air.config import ScalingConfig

def _get_trainer() -> RLTrainer:
env = gym.make("CartPole-v1")

def _get_trainer(scaling_config=None, distributed: bool = False) -> RLTrainer:
env = gym.make("CartPole-v1")
scaling_config = scaling_config or ScalingConfig()
distributed = False

# TODO: Another way to make RLTrainer would be to construct the module first
# and then apply trainer to it. We should also allow that. In fact if we figure
# out the serialization of RLModules we can simply pass the module the trainer
# and internally it will serialize and deserialize the module for distributed
# construction.
trainer = BCTorchRLTrainer(
module_spec=SingleAgentRLModuleSpec(
module_class=DiscreteBCTorchModule,
observation_space=env.observation_space,
action_space=env.action_space,
model_config={"hidden_dim": 32},
),
scaling_config=scaling_config,
optimizer_config={"lr": 1e-3},
trainer_scaling_config=TrainerScalingConfig(),
distributed=distributed,
)

trainer.build()
@@ -45,7 +54,7 @@ def tearDown(cls) -> None:

def test_end_to_end_update(self):

trainer = _get_trainer()
trainer = _get_trainer(scaling_config=ScalingConfig(num_workers=2))
reader = get_cartpole_dataset_reader(batch_size=512)

min_loss = float("inf")
@@ -68,7 +77,7 @@ def test_compute_gradients(self):
Tests that if we sum all the trainable variables the gradient of output w.r.t.
the weights is all ones.
"""
trainer = _get_trainer()
trainer = _get_trainer(scaling_config=ScalingConfig(num_workers=2))

params = trainer.get_parameters(trainer.module[DEFAULT_POLICY_ID])
loss = {"total_loss": sum([param.sum() for param in params])}
@@ -87,7 +96,7 @@ def test_apply_gradients(self):
standard SGD/Adam update rule.
"""

trainer = _get_trainer()
trainer = _get_trainer(scaling_config=ScalingConfig(num_workers=2))

# calculated the expected new params based on gradients of all ones.
params = trainer.get_parameters(trainer.module[DEFAULT_POLICY_ID])
@@ -111,7 +120,7 @@ def test_add_remove_module(self):
all variables the updated parameters follow the SGD update rule.
"""
env = gym.make("CartPole-v1")
trainer = _get_trainer()
trainer = _get_trainer(scaling_config=ScalingConfig(num_workers=2))

# add a test module with SGD optimizer with a known lr
lr = 1e-4
53 changes: 32 additions & 21 deletions rllib/core/rl_trainer/torch/torch_rl_trainer.py
Original file line number Diff line number Diff line change
@@ -8,14 +8,18 @@
Hashable,
Optional,
Callable,
TYPE_CHECKING,
)

from ray.rllib.core.rl_module.rl_module import (
RLModule,
ModuleID,
SingleAgentRLModuleSpec,
)
from ray.rllib.core.rl_module.marl_module import MultiAgentRLModule
from ray.rllib.core.rl_module.marl_module import (
MultiAgentRLModule,
MultiAgentRLModuleSpec,
)
from ray.rllib.core.rl_trainer.rl_trainer import (
RLTrainer,
ParamOptimizerPairs,
@@ -24,7 +28,6 @@
ParamDictType,
)
from ray.rllib.core.rl_module.torch.torch_rl_module import TorchDDPRLModule
from ray.rllib.core.rl_trainer.scaling_config import TrainerScalingConfig
from ray.rllib.policy.sample_batch import MultiAgentBatch
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import TensorType
@@ -34,8 +37,11 @@
torch, nn = try_import_torch()

if torch:
from ray.air.config import ScalingConfig
from ray.train.torch.train_loop_utils import _TorchAccelerator

if TYPE_CHECKING:
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

logger = logging.getLogger(__name__)

@@ -47,14 +53,31 @@ class TorchRLTrainer(RLTrainer):
def __init__(
self,
*,
trainer_scaling_config: TrainerScalingConfig = TrainerScalingConfig(),
**kwargs,
module_spec: Optional[
Union[SingleAgentRLModuleSpec, MultiAgentRLModuleSpec]
] = None,
module: Optional[RLModule] = None,
optimizer_config: Mapping[str, Any],
distributed: bool = False,
scaling_config: Optional["ScalingConfig"] = None,
algorithm_config: Optional["AlgorithmConfig"] = None,
):
super().__init__(trainer_scaling_config=trainer_scaling_config, **kwargs)
super().__init__(
module_spec=module_spec,
module=module,
optimizer_config=optimizer_config,
distributed=distributed,
scaling_config=scaling_config,
algorithm_config=algorithm_config,
)

# pick the stuff that we need from the scaling config
self._use_gpu = trainer_scaling_config.num_gpus_per_worker > 0
# TODO (Kourosh): Scaling config is required for torch trainer to do proper DDP
# wraping setup but not so much required for tf. we need to
scaling_config = scaling_config or ScalingConfig()
self._world_size = scaling_config.num_workers or 1
self._use_gpu = scaling_config.use_gpu

# These attributes are set in the `build` method.
self._device = None

@property
@@ -112,12 +135,6 @@ def build(self) -> None:
self._device = torch.device("cpu")
super().build()

@override(RLTrainer)
def _make_module(self) -> MultiAgentRLModule:
module = super()._make_module()
self._map_module_to_device(module)
return module

@override(RLTrainer)
def _make_distributed_module(self) -> MultiAgentRLModule:
module = self._make_module()
@@ -129,9 +146,11 @@ def _make_distributed_module(self) -> MultiAgentRLModule:
# register them in the MultiAgentRLModule. We should find a better way to
# handle this.
if isinstance(module, torch.nn.Module):
module.to(self._device)
module = TorchDDPRLModule(module)
else:
for key in module.keys():
module[key].to(self._device)
module.add_module(key, TorchDDPRLModule(module[key]), override=True)

return module
@@ -191,11 +210,3 @@ def add_module(
self._module.add_module(
module_id, TorchDDPRLModule(self._module[module_id]), override=True
)

def _map_module_to_device(self, module: MultiAgentRLModule) -> None:
"""Moves the module to the correct device."""
if isinstance(module, torch.nn.Module):
module.to(self._device)
else:
for key in module.keys():
module[key].to(self._device)
129 changes: 64 additions & 65 deletions rllib/core/rl_trainer/trainer_runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import math
from typing import Any, List, Mapping, Type, Optional, Callable, Dict, TYPE_CHECKING
from typing import Any, List, Mapping, Type, Optional, Callable, Dict

import ray

@@ -9,30 +9,16 @@
SingleAgentRLModuleSpec,
)
from ray.rllib.core.rl_trainer.rl_trainer import (
RLTrainerSpec,
RLTrainer,
ParamOptimizerPairs,
Optimizer,
)
from ray.rllib.policy.sample_batch import MultiAgentBatch
from ray.train._internal.backend_executor import BackendExecutor

if TYPE_CHECKING:
from ray.rllib.core.rl_trainer.rl_trainer import RLTrainer


def _get_backend_config(rl_trainer_class: Type["RLTrainer"]) -> str:
if rl_trainer_class.framework == "torch":
from ray.train.torch import TorchConfig

backend_config = TorchConfig()
elif rl_trainer_class.framework == "tf":
from ray.train.tensorflow import TensorflowConfig
from ray.rllib.policy.sample_batch import MultiAgentBatch

backend_config = TensorflowConfig()
else:
raise ValueError("framework must be either torch or tf")

return backend_config
from ray.air.config import ScalingConfig
from ray.train._internal.backend_executor import BackendExecutor


class TrainerRunner:
@@ -61,43 +47,56 @@ class TrainerRunner:

def __init__(
self,
rl_trainer_spec: RLTrainerSpec,
trainer_class: Type[RLTrainer],
trainer_config: Mapping[str, Any],
compute_config: Mapping[str, Any],
):
scaling_config = rl_trainer_spec.trainer_scaling_config
rl_trainer_class = rl_trainer_spec.rl_trainer_class
num_gpus = compute_config.get("num_gpus", 0)
use_fake_gpus = compute_config.get("_use_fake_gpus", False)
self._trainer_config = trainer_config

if num_gpus > 0:
scaling_config = ScalingConfig(
num_workers=num_gpus,
use_gpu=(not use_fake_gpus),
)

# TODO (Kourosh): Go with a _remote flag instead of _is_local to be more
# explicit
self._is_local = scaling_config.num_workers == 0
self._trainer = None
self._workers = None
if trainer_class.framework == "torch":
from ray.train.torch import TorchConfig

if self._is_local:
self._trainer = rl_trainer_class(**rl_trainer_spec.get_params_dict())
self._trainer.build()
else:
backend_config = _get_backend_config(rl_trainer_class)
backend_executor = BackendExecutor(
backend_config = TorchConfig()
elif trainer_class.framework == "tf":
from ray.train.tensorflow import TensorflowConfig

backend_config = TensorflowConfig()
else:
raise ValueError("framework must be either torch or tf")

self.backend_executor = BackendExecutor(
backend_config=backend_config,
num_workers=scaling_config.num_workers,
num_cpus_per_worker=scaling_config.num_cpus_per_worker,
num_gpus_per_worker=scaling_config.num_gpus_per_worker,
max_retries=0,
)

backend_executor.start(
train_cls=rl_trainer_class,
train_cls_kwargs=rl_trainer_spec.get_params_dict(),
# TODO(avnishn, kourosh): Should we pass in scaling config into the
# trainer?
trainer_config["distributed"] = self._distributed = bool(num_gpus > 1)
trainer_config["scaling_config"] = scaling_config
self.backend_executor.start(
train_cls=trainer_class, train_cls_kwargs=trainer_config
)
self._workers = [
w.actor for w in self.backend_executor.worker_group.workers
]

self._workers = [w.actor for w in backend_executor.worker_group.workers]

# run the neural network building code on remote workers
ray.get([w.build.remote() for w in self._workers])

@property
def is_local(self) -> bool:
return self._is_local
else:
trainer_config["distributed"] = self._distributed = False
self._trainer = trainer_class(**trainer_config)
self._trainer.build()

def update(self, batch: MultiAgentBatch) -> List[Mapping[str, Any]]:
"""Do a gradient based update to the RLTrainer(s) maintained by this TrainerRunner.
@@ -108,10 +107,10 @@ def update(self, batch: MultiAgentBatch) -> List[Mapping[str, Any]]:
Returns:
A list of dictionaries of results from the updates from the RLTrainer(s)
"""
if self.is_local:
return [self._trainer.update(batch)]
else:
if self._distributed:
return self._distributed_update(batch)
else:
return [self._trainer.update(batch)]

def _distributed_update(self, batch: MultiAgentBatch) -> List[Mapping[str, Any]]:
"""Do a gradient based update to the RLTrainers using DDP training.
@@ -158,13 +157,13 @@ def additional_update(self, *args, **kwargs) -> List[Mapping[str, Any]]:
A list of dictionaries of results from the updates from each worker.
"""

if self.is_local:
return [self._trainer.additional_update(*args, **kwargs)]
else:
if self._distributed:
refs = []
for worker in self._workers:
refs.append(worker.additional_update.remote(*args, **kwargs))
return ray.get(refs)
else:
return [self._trainer.additional_update(*args, **kwargs)]

def add_module(
self,
@@ -187,14 +186,7 @@ def add_module(
optimizer_cls: The optimizer class to use. If None, the set_optimizer_fn
should be provided.
"""
if self.is_local:
self._trainer.add_module(
module_id=module_id,
module_spec=module_spec,
set_optimizer_fn=set_optimizer_fn,
optimizer_cls=optimizer_cls,
)
else:
if self._distributed:
refs = []
for worker in self._workers:
ref = worker.add_module.remote(
@@ -205,6 +197,13 @@ def add_module(
)
refs.append(ref)
ray.get(refs)
else:
self._trainer.add_module(
module_id=module_id,
module_spec=module_spec,
set_optimizer_fn=set_optimizer_fn,
optimizer_cls=optimizer_cls,
)

def remove_module(self, module_id: ModuleID) -> None:
"""Remove a module from the RLTrainers maintained by this TrainerRunner.
@@ -213,14 +212,14 @@ def remove_module(self, module_id: ModuleID) -> None:
module_id: The id of the module to remove.
"""
if self.is_local:
self._trainer.remove_module(module_id)
else:
if self._distributed:
refs = []
for worker in self._workers:
ref = worker.remove_module.remote(module_id)
refs.append(ref)
ray.get(refs)
else:
self._trainer.remove_module(module_id)

def get_weight(self) -> Dict:
"""Get the weights of the MARLModule.
@@ -233,13 +232,13 @@ def get_weight(self) -> Dict:

def get_state(self) -> List[Mapping[ModuleID, Mapping[str, Any]]]:
"""Get the states of the RLTrainers"""
if self.is_local:
return [self._trainer.get_state()]
else:
if self._distributed:
refs = []
for worker in self._workers:
refs.append(worker.get_state.remote())
return ray.get(refs)
else:
return [self._trainer.get_state()]

def set_state(self, state: List[Mapping[ModuleID, Mapping[str, Any]]]) -> None:
"""Sets the states of the RLTrainers.
@@ -248,10 +247,10 @@ def set_state(self, state: List[Mapping[ModuleID, Mapping[str, Any]]]) -> None:
state: The state of the RLTrainers
"""
if self.is_local:
self._trainer.set_state(state)
else:
if self._distributed:
refs = []
for worker in self._workers:
refs.append(worker.set_state.remote(state))
ray.get(refs)
else:
self._trainer.set_state(state)
99 changes: 50 additions & 49 deletions rllib/core/rl_trainer/trainer_runner_config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
from typing import Type, Optional, TYPE_CHECKING, Union, Dict

from ray.rllib.core.rl_module.marl_module import MultiAgentRLModuleSpec
from ray.rllib.core.rl_module.rl_module import SingleAgentRLModuleSpec
from ray.rllib.core.rl_trainer.trainer_runner import TrainerRunner
from ray.rllib.core.rl_trainer.scaling_config import TrainerScalingConfig
from ray.rllib.core.rl_trainer.rl_trainer import (
RLTrainerSpec,
RLTrainerHPs,
FrameworkHPs,
)
from ray.rllib.utils.from_config import NotProvided

from ray.rllib.core.rl_trainer.trainer_runner import TrainerRunner

if TYPE_CHECKING:
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.rl_trainer import RLTrainer

ModuleSpec = Union[SingleAgentRLModuleSpec, MultiAgentRLModuleSpec]
@@ -33,16 +26,15 @@ def __init__(self, cls: Type[TrainerRunner] = None) -> None:

# `self.trainer()`
self.trainer_class = None
self.eager_tracing = True
self.optimizer_config = None
self.rl_trainer_hps = RLTrainerHPs()

# `self.resources()`
self.num_gpus_per_trainer_worker = 0
self.num_cpus_per_trainer_worker = 1
self.num_trainer_workers = 1
self.num_gpus = 0
self.fake_gpus = False

# `self.framework()`
self.eager_tracing = False
# `self.algorithm()`
self.algorithm_config = None

def validate(self) -> None:

@@ -58,39 +50,51 @@ def validate(self) -> None:
"the RLTrainer class with .trainer(trainer_class=MyTrainerClass)."
)

if self.algorithm_config is None:
raise ValueError(
"Must provide algorithm_config for RLTrainer. Use "
".algorithm(algorithm_config=MyConfig)."
)

if self.optimizer_config is None:
# get the default optimizer config if it's not provided
# TODO (Kourosh): Change the optimizer config to a dataclass object.
self.optimizer_config = {"lr": 1e-3}

if self.fake_gpus and self.num_gpus <= 0:
raise ValueError("If fake_gpus is True, num_gpus must be greater than 0.")

def build(self) -> TrainerRunner:
self.validate()

scaling_config = TrainerScalingConfig(
num_workers=self.num_trainer_workers,
num_gpus_per_worker=self.num_gpus_per_trainer_worker,
num_cpus_per_worker=self.num_cpus_per_trainer_worker,
# If the module class is a multi agent class it will override the default
# MultiAgentRLModule class. otherwise, it will be a single agent wrapped with
# mutliagent
# TODO (Kourosh): What should be scaling_config? it's not clear what
# should be passed in as trainer_config and what will be inferred
return self.trainer_runner_class(
trainer_class=self.trainer_class,
trainer_config={
"module_spec": self.module_spec,
# TODO (Kourosh): should this be inferred inside the constructor?
"distributed": self.num_gpus > 1,
# TODO (Avnish): add this
# "enable_tf_function": self.eager_tracing,
"optimizer_config": self.optimizer_config,
"algorithm_config": self.algorithm_config,
},
compute_config={
"num_gpus": self.num_gpus,
# TODO (Avnish): add this
# "fake_gpus": self.fake_gpus,
},
)

framework_hps = FrameworkHPs(eager_tracing=self.eager_tracing)

rl_trainer_spec = RLTrainerSpec(
rl_trainer_class=self.trainer_class,
module_spec=self.module_spec,
optimizer_config=self.optimizer_config,
trainer_scaling_config=scaling_config,
trainer_hyperparameters=self.rl_trainer_hps,
framework_hyperparameters=framework_hps,
)

return self.trainer_runner_class(rl_trainer_spec)

def framework(
self, eager_tracing: Optional[bool] = NotProvided
def algorithm(
self, algorithm_config: Optional["AlgorithmConfig"] = NotProvided
) -> "TrainerRunnerConfig":

if eager_tracing is not NotProvided:
self.eager_tracing = eager_tracing
if algorithm_config is not NotProvided:
self.algorithm_config = algorithm_config
return self

def module(
@@ -105,33 +109,30 @@ def module(

def resources(
self,
num_trainer_workers: Optional[int] = NotProvided,
num_gpus_per_trainer_worker: Optional[Union[float, int]] = NotProvided,
num_cpus_per_trainer_worker: Optional[Union[float, int]] = NotProvided,
num_gpus: Optional[Union[float, int]] = NotProvided,
fake_gpus: Optional[bool] = NotProvided,
) -> "TrainerRunnerConfig":

if num_trainer_workers is not NotProvided:
self.num_trainer_workers = num_trainer_workers
if num_gpus_per_trainer_worker is not NotProvided:
self.num_gpus_per_trainer_worker = num_gpus_per_trainer_worker
if num_cpus_per_trainer_worker is not NotProvided:
self.num_cpus_per_trainer_worker = num_cpus_per_trainer_worker
if num_gpus is not NotProvided:
self.num_gpus = num_gpus
if fake_gpus is not NotProvided:
self.fake_gpus = fake_gpus

return self

def trainer(
self,
*,
trainer_class: Optional[Type["RLTrainer"]] = NotProvided,
eager_tracing: Optional[bool] = NotProvided,
optimizer_config: Optional[Dict] = NotProvided,
rl_trainer_hps: Optional[RLTrainerHPs] = NotProvided,
) -> "TrainerRunnerConfig":

if trainer_class is not NotProvided:
self.trainer_class = trainer_class
if eager_tracing is not NotProvided:
self.eager_tracing = eager_tracing
if optimizer_config is not NotProvided:
self.optimizer_config = optimizer_config
if rl_trainer_hps is not NotProvided:
self.rl_trainer_hps = rl_trainer_hps

return self
12 changes: 4 additions & 8 deletions rllib/core/testing/utils.py
Original file line number Diff line number Diff line change
@@ -4,8 +4,6 @@

from ray.rllib.utils.annotations import DeveloperAPI
from ray.rllib.core.rl_trainer.trainer_runner import TrainerRunner
from ray.rllib.core.rl_trainer.rl_trainer import RLTrainerSpec
from ray.rllib.core.rl_trainer.scaling_config import TrainerScalingConfig

from ray.rllib.core.rl_module.marl_module import (
MultiAgentRLModuleSpec,
@@ -103,19 +101,17 @@ def get_rl_trainer(
def get_trainer_runner(
framework: str,
env: "gym.Env",
scaling_config: TrainerScalingConfig,
compute_config: dict,
is_multi_agent: bool = False,
) -> TrainerRunner:

rl_trainer_spec = RLTrainerSpec(
rl_trainer_class=get_trainer_class(framework),
trainer_class = get_trainer_class(framework)
trainer_cfg = dict(
module_spec=get_module_spec(
framework=framework, env=env, is_multi_agent=is_multi_agent
),
optimizer_config={"lr": 0.1},
trainer_scaling_config=scaling_config,
)
runner = TrainerRunner(rl_trainer_spec)
runner = TrainerRunner(trainer_class, trainer_cfg, compute_config=compute_config)

return runner

29 changes: 5 additions & 24 deletions rllib/utils/framework.py
Original file line number Diff line number Diff line change
@@ -59,7 +59,6 @@ def try_import_tf(error: bool = False):
Raises:
ImportError: If error=True and tf is not installed.
"""
tf_stub = _TFStub()
# Make sure, these are reset after each test case
# that uses them: del os.environ["RLLIB_TEST_NO_TF_IMPORT"]
if "RLLIB_TEST_NO_TF_IMPORT" in os.environ:
@@ -87,7 +86,7 @@ def try_import_tf(error: bool = False):
"install at least one deep-learning framework: "
"`pip install [torch|tensorflow|jax]`."
)
return None, tf_stub, None
return None, None, None

# Try "reducing" tf to tf.compat.v1.
try:
@@ -109,24 +108,6 @@ def try_import_tf(error: bool = False):
return tf1_module, tf_module, version


# Fake module for tf.
class _TFStub:
def __init__(self) -> None:
self.keras = _KerasStub()


# Fake module for tf.keras.
class _KerasStub:
def __init__(self) -> None:
self.Model = _FakeTfClassStub


# Fake classes under keras (e.g for tf.keras.Model)
class _FakeTfClassStub:
def __init__(self, *a, **kw):
raise ImportError("Could not import `tensorflow`. Try pip install tensorflow.")


@DeveloperAPI
def tf_function(tf_module):
"""Conditional decorator for @tf.function.
@@ -176,20 +157,20 @@ class _NNStub:
def __init__(self, *a, **kw):
# Fake nn.functional module within torch.nn.
self.functional = None
self.Module = _FakeTorchClassStub
self.Module = _FakeClassStub
self.parallel = _ParallelStub()


# Fake class for e.g. torch.nn.Module to allow it to be inherited from.
class _FakeTorchClassStub:
class _FakeClassStub:
def __init__(self, *a, **kw):
raise ImportError("Could not import `torch`. Try pip install torch.")


class _ParallelStub:
def __init__(self, *a, **kw):
self.DataParallel = _FakeTorchClassStub
self.DistributedDataParallel = _FakeTorchClassStub
self.DataParallel = _FakeClassStub
self.DistributedDataParallel = _FakeClassStub


@PublicAPI

0 comments on commit d15ccfc

Please sign in to comment.