Skip to content

Commit

Permalink
[RLlib] Change (prioritized) SA episode buffer to return episode list…
Browse files Browse the repository at this point in the history
…s (instead of batch) from `sample()`. (ray-project#45123)
  • Loading branch information
simonsays1980 authored May 8, 2024
1 parent f2b0d91 commit 7589bd5
Show file tree
Hide file tree
Showing 14 changed files with 256 additions and 425 deletions.
2 changes: 1 addition & 1 deletion rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ py_test(
# )

py_test(
name = "learning_tests_carpole_dqn_envrunner",
name = "learning_tests_cartpole_dqn_envrunner",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_cartpole", "learning_tests_discrete"],
size = "large",
Expand Down
22 changes: 8 additions & 14 deletions rllib/algorithms/dqn/dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ray.rllib.execution.rollout_ops import (
synchronous_parallel_sample,
)
from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
from ray.rllib.policy.sample_batch import MultiAgentBatch
from ray.rllib.execution.train_ops import (
train_one_step,
multi_gpu_train_one_step,
Expand Down Expand Up @@ -656,24 +656,20 @@ def _training_step_new_api_stack(self, *, with_noise_reset) -> ResultDict:
self.learner_group.foreach_learner(lambda lrnr: lrnr._reset_noise())
# Run multiple sample-from-buffer and update iterations.
for _ in range(sample_and_train_weight):
# Sample training batch from replay_buffer.
# TODO (simon): Use sample_with_keys() here.
# Sample a list of episodes used for learning from the replay buffer.
with self.metrics.log_time((TIMERS, REPLAY_BUFFER_SAMPLE_TIMER)):
train_dict = self.local_replay_buffer.sample(
episodes = self.local_replay_buffer.sample(
num_items=self.config.train_batch_size,
n_step=self.config.n_step,
gamma=self.config.gamma,
beta=self.config.replay_buffer_config["beta"],
)
train_batch = SampleBatch(train_dict)
# Convert to multi-agent batch as `LearnerGroup` depends on it.
# TODO (sven, simon): Remove this conversion once the `LearnerGroup`
# supports dict.
train_batch = train_batch.as_multi_agent()

# Perform an update on the buffer-sampled train batch.
with self.metrics.log_time((TIMERS, LEARNER_UPDATE_TIMER)):
learner_results = self.learner_group.update_from_batch(train_batch)
learner_results = self.learner_group.update_from_episodes(
episodes=episodes,
)
# Isolate TD-errors from result dicts (we should not log these to
# disk or WandB, they might be very large).
td_errors = defaultdict(list)
Expand Down Expand Up @@ -713,10 +709,8 @@ def _training_step_new_api_stack(self, *, with_noise_reset) -> ResultDict:
# Update replay buffer priorities.
with self.metrics.log_time((TIMERS, REPLAY_BUFFER_UPDATE_PRIOS_TIMER)):
update_priorities_in_episode_replay_buffer(
self.local_replay_buffer,
self.config,
train_batch,
td_errors,
replay_buffer=self.local_replay_buffer,
td_errors=td_errors,
)

# Update the target networks, if necessary.
Expand Down
23 changes: 22 additions & 1 deletion rllib/algorithms/dqn/dqn_rainbow_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
from typing import TYPE_CHECKING

from ray.rllib.core.learner.learner import Learner
from ray.rllib.utils.annotations import override
from ray.rllib.connectors.common.add_observations_from_episodes_to_batch import (
AddObservationsFromEpisodesToBatch,
)
from ray.rllib.connectors.learner.add_next_observations_from_episodes_to_train_batch import ( # noqa
AddNextObservationsFromEpisodesToTrainBatch,
)
from ray.rllib.utils.annotations import (
override,
OverrideToImplementCustomLogic_CallToSuperRecommended,
)
from ray.rllib.utils.metrics import LAST_TARGET_UPDATE_TS, NUM_TARGET_UPDATES
from ray.rllib.utils.typing import ModuleID

Expand All @@ -28,6 +37,18 @@


class DQNRainbowLearner(Learner):
@OverrideToImplementCustomLogic_CallToSuperRecommended
@override(Learner)
def build(self) -> None:
super().build()
# Prepend a NEXT_OBS from episodes to train batch connector piece (right
# after the observation default piece).
if self.config.add_default_connectors_to_learner_pipeline:
self._learner_connector.insert_after(
AddObservationsFromEpisodesToBatch,
AddNextObservationsFromEpisodesToTrainBatch(),
)

@override(Learner)
def additional_update_for_module(
self, *, module_id: ModuleID, config: "DQNConfig", timestep: int, **kwargs
Expand Down
4 changes: 2 additions & 2 deletions rllib/algorithms/dqn/torch/dqn_rainbow_torch_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def compute_loss_for_module(
r_tau = torch.clamp(
batch[Columns.REWARDS].unsqueeze(dim=-1)
+ (
config.gamma ** batch["n_steps"]
config.gamma ** batch["n_step"]
* (1.0 - batch[Columns.TERMINATEDS].float())
).unsqueeze(dim=-1)
* z,
Expand Down Expand Up @@ -171,7 +171,7 @@ def compute_loss_for_module(
# backpropagate through the target network when optimizing the Q loss.
q_selected_target = (
batch[Columns.REWARDS]
+ (config.gamma ** batch["n_steps"]) * q_next_best_masked
+ (config.gamma ** batch["n_step"]) * q_next_best_masked
).detach()

# Choose the requested loss function. Note, in case of the Huber loss
Expand Down
1 change: 1 addition & 0 deletions rllib/algorithms/ppo/ppo_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def _update_from_batch_or_episodes(
# episodes).
if self.config.enable_env_runner_and_connector_v2:
batch, episodes = self._compute_gae_from_episodes(episodes=episodes)

# Now that GAE (advantages and value targets) have been added to the train
# batch, we can proceed normally (calling super method) with the update step.
return super()._update_from_batch_or_episodes(
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/sac/torch/sac_torch_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def compute_loss_for_module(
# Detach this node from the computation graph as we do not want to
# backpropagate through the target network when optimizing the Q loss.
q_selected_target = (
batch[Columns.REWARDS] + (config.gamma ** batch["n_steps"]) * q_next_masked
batch[Columns.REWARDS] + (config.gamma ** batch["n_step"]) * q_next_masked
).detach()

# Calculate the TD-error. Note, this is needed for the priority weights in
Expand Down
4 changes: 4 additions & 0 deletions rllib/connectors/learner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
from ray.rllib.connectors.learner.add_columns_from_episodes_to_train_batch import (
AddColumnsFromEpisodesToTrainBatch,
)
from ray.rllib.connectors.learner.add_next_observations_from_episodes_to_train_batch import ( # noqa
AddNextObservationsFromEpisodesToTrainBatch,
)
from ray.rllib.connectors.learner.learner_connector_pipeline import (
LearnerConnectorPipeline,
)

__all__ = [
"AddColumnsFromEpisodesToTrainBatch",
"AddNextObservationsFromEpisodesToTrainBatch",
"AddObservationsFromEpisodesToBatch",
"AddStatesFromEpisodesToBatch",
"AgentToModuleMapping",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from typing import Any, List, Optional

import gymnasium as gym

from ray.rllib.core.columns import Columns
from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.core.rl_module.rl_module import RLModule
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType


class AddNextObservationsFromEpisodesToTrainBatch(ConnectorV2):
"""Adds the NEXT_OBS column with the correct episode observations to train batch.
- Operates on a list of Episode objects.
- Gets all observation(s) from all the given episodes (except the very first ones)
and adds them to the batch under construction in the NEXT_OBS column (as a list of
individual observations).
- Does NOT alter any observations (or other data) in the given episodes.
- Can be used in Learner connector pipelines.
.. testcode::
import gymnasium as gym
import numpy as np
from ray.rllib.connectors.learner import (
AddNextObservationsFromEpisodesToTrainBatch
)
from ray.rllib.core.columns import Columns
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.utils.test_utils import check
# Create two dummy SingleAgentEpisodes, each containing 3 observations,
# 2 actions and 2 rewards (both episodes are length=2).
obs_space = gym.spaces.Box(-1.0, 1.0, (2,), np.float32)
act_space = gym.spaces.Discrete(2)
episodes = [SingleAgentEpisode(
observations=[obs_space.sample(), obs_space.sample(), obs_space.sample()],
actions=[act_space.sample(), act_space.sample()],
rewards=[1.0, 2.0],
len_lookback_buffer=0,
) for _ in range(2)]
eps_1_next_obses = episodes[0].get_observations([1, 2])
eps_2_next_obses = episodes[1].get_observations([1, 2])
print(f"1st Episode's next obses are {eps_1_next_obses}")
print(f"2nd Episode's next obses are {eps_2_next_obses}")
# Create an instance of this class.
connector = AddNextObservationsFromEpisodesToTrainBatch()
# Call the connector with the two created episodes.
# Note that this particular connector works without an RLModule, so we
# simplify here for the sake of this example.
output_data = connector(
rl_module=None,
data={},
episodes=episodes,
explore=True,
shared_data={},
)
# The output data should now contain the last observations of both episodes,
# in a "per-episode organized" fashion.
check(
output_data,
{
Columns.NEXT_OBS: {
(episodes[0].id_,): eps_1_next_obses,
(episodes[1].id_,): eps_2_next_obses,
},
},
)
"""

def __init__(
self,
input_observation_space: Optional[gym.Space] = None,
input_action_space: Optional[gym.Space] = None,
**kwargs,
):
"""Initializes a AddNextObservationsFromEpisodesToTrainBatch instance."""
super().__init__(
input_observation_space=input_observation_space,
input_action_space=input_action_space,
**kwargs,
)

@override(ConnectorV2)
def __call__(
self,
*,
rl_module: RLModule,
data: Optional[Any],
episodes: List[EpisodeType],
explore: Optional[bool] = None,
shared_data: Optional[dict] = None,
**kwargs,
) -> Any:
# If "obs" already in data, early out.
if Columns.NEXT_OBS in data:
return data

for sa_episode in self.single_agent_episode_iterator(
# This is a Learner-only connector -> Get all episodes (for train batch).
episodes,
agents_that_stepped_only=False,
):
self.add_n_batch_items(
data,
Columns.NEXT_OBS,
items_to_add=sa_episode.get_observations(slice(1, len(sa_episode) + 1)),
num_items=len(sa_episode),
single_agent_episode=sa_episode,
)
return data
2 changes: 1 addition & 1 deletion rllib/core/learner/learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ def _update_from_batch_or_episodes(
# Call the learner connector pipeline.
batch = self._learner_connector(
rl_module=self.module,
data=batch,
data=batch if batch is not None else {},
episodes=episodes,
shared_data={},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ def _sample_independent(
Columns.TERMINATEDS: np.array(is_terminated),
Columns.TRUNCATEDS: np.array(is_truncated),
"weights": np.array(weights),
"n_steps": np.array(n_steps),
"n_step": np.array(n_steps),
}
# Include infos if necessary.
if include_infos:
Expand Down Expand Up @@ -804,7 +804,7 @@ def _sample_synchonized(
Columns.TERMINATEDS: np.array(is_terminated[module_id]),
Columns.TRUNCATEDS: np.array(is_truncated[module_id]),
"weights": np.array(weights[module_id]),
"n_steps": np.array(n_steps[module_id]),
"n_step": np.array(n_steps[module_id]),
}
for module_id in observations.keys()
}
Expand Down
Loading

0 comments on commit 7589bd5

Please sign in to comment.