Skip to content

Commit

Permalink
[WIP] use gym Env in VectorEnv (facebookresearch#892)
Browse files Browse the repository at this point in the history
* [WIP] use gym Env in VectorEnv

* make it working for cartpole

* making cartpole work

* fixing pikle5 issue

* fixing one of the tests

* fixing filename typo

* changing a test

* removing the FIX INFO DICT option

* removing debug print statement I forgot to remove

* removing dead code

* using a generator instead of list comprehension

* removing redundant .cpu() call

* flatten the dict

* changing action space to space.Discrete

* adding some training tests

* adding GYM.CLASS_NAME to more configs

* deleting ENV_NAME

* fixing tests

* using Rearrange in Test

* using the dataset better

* deleting last_obs

* removing a test to check cuda error

* run the baseline training tests separately

* separating the tests more explicitely

* removing the fuse keys that correspond to sensors

* better data_profile path

Co-authored-by: vincentpierre <[email protected]>
  • Loading branch information
vincentpierre and vincentpierre authored Jul 12, 2022
1 parent 5b74829 commit 710beab
Show file tree
Hide file tree
Showing 63 changed files with 564 additions and 773 deletions.
15 changes: 14 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,20 @@ jobs:
. activate habitat; cd habitat-lab
python setup.py develop --all
export PYTHONPATH=.:$PYTHONPATH
python setup.py test --addopts "--cov-report=xml --cov=./"
python setup.py test --addopts "--cov-report=xml --cov-report term --cov=./"
bash <(curl -s https://codecov.io/bash) -f coverage.xml
- run:
name: Run baseline training tests
no_output_timeout: 30m
command: |
export PATH=$HOME/miniconda/bin:/usr/local/cuda/bin:$PATH
. activate habitat; cd habitat-lab
python setup.py develop --all
export PYTHONPATH=.:$PYTHONPATH
# This is a flag that enables test_test_baseline_training to work
export TEST_BASELINE_SMALL=1
python setup.py test --addopts "test/test_baseline_training.py"
- run:
name: Run Hab2.0 benchmark
no_output_timeout: 30m
Expand Down Expand Up @@ -282,6 +293,8 @@ jobs:
conda install -y jinja2 pygments docutils
mkdir -p ../build/docs
./build-public.sh
- store_artifacts:
path: habitat-lab/data/profile # This is the benchmark profile


workflows:
Expand Down
2 changes: 2 additions & 0 deletions configs/tasks/imagenav.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 1000

Expand Down
2 changes: 2 additions & 0 deletions configs/tasks/imagenav_gibson.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 1000
ITERATOR_OPTIONS:
Expand Down
2 changes: 2 additions & 0 deletions configs/tasks/objectnav_hm3d.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 500

Expand Down
2 changes: 2 additions & 0 deletions configs/tasks/objectnav_mp3d.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 500

Expand Down
2 changes: 2 additions & 0 deletions configs/tasks/pointnav.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 500
SIMULATOR:
Expand Down
2 changes: 2 additions & 0 deletions configs/tasks/pointnav_gibson.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 500
SIMULATOR:
Expand Down
2 changes: 2 additions & 0 deletions configs/tasks/pointnav_hm3d.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 500
SIMULATOR:
Expand Down
2 changes: 2 additions & 0 deletions configs/tasks/pointnav_mp3d.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 500
SIMULATOR:
Expand Down
2 changes: 2 additions & 0 deletions configs/tasks/pointnav_rgbd.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 500
SIMULATOR:
Expand Down
2 changes: 2 additions & 0 deletions configs/test/habitat_all_sensors_test.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: RearrangeRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 10
ITERATOR_OPTIONS:
Expand Down
2 changes: 2 additions & 0 deletions configs/test/habitat_mp3d_object_nav_test.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GYM:
CLASS_NAME: NavRLEnv
ENVIRONMENT:
MAX_EPISODE_STEPS: 500
SIMULATOR:
Expand Down
3 changes: 1 addition & 2 deletions habitat/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,12 +867,11 @@ def __init__(self, *args, **kwargs):
# -----------------------------------------------------------------------------
_C.GYM = CN()
_C.GYM.AUTO_NAME = ""
_C.GYM.CLASS_NAME = "RearrangeRLEnv"
_C.GYM.CLASS_NAME = None
_C.GYM.OBS_KEYS = None
_C.GYM.ACTION_KEYS = None
_C.GYM.ACHIEVED_GOAL_KEYS = []
_C.GYM.DESIRED_GOAL_KEYS = []
_C.GYM.FIX_INFO_DICT = True

# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
Expand Down
50 changes: 50 additions & 0 deletions habitat/core/gym_env_episode_count_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Tuple, Union

from gym import Env, Wrapper, spaces
from gym.core import ActType, ObsType

from habitat.core.dataset import Episode


class EnvCountEpisodeWrapper(Wrapper):
OBSERVATION_KEY = "obs"
observation_space: spaces.Space

def __init__(self, env: Env):
"""
A helper wrapper to count the number of episodes available
"""
super().__init__(env)
self._has_number_episode = hasattr(env, "number_of_episodes")
self._current_episode = 0

@property
def number_of_episodes(self):
if self._has_number_episode:
return self.env.number_of_episodes
else:
return -1

@property
def current_episode(self) -> Episode:
if self._has_number_episode:
return self.env.current_episode
else:
return Episode(
episode_id=str(self._current_episode),
scene_id="default",
start_position=[],
start_rotation=[],
)

def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]:
"""Steps through the environment with action."""
o, r, done, i = self.env.step(action)
if done:
self._current_episode += 1
return o, r, done, i

def reset(self, **kwargs) -> Union[ObsType, Tuple[ObsType, dict]]:
"""Resets the environment with kwargs."""
self._current_episode += 1
return self.env.reset(**kwargs)
39 changes: 39 additions & 0 deletions habitat/core/gym_env_obs_dict_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Tuple, Union

from gym import Env, Wrapper, spaces
from gym.core import ActType, ObsType


class EnvObsDictWrapper(Wrapper):
OBSERVATION_KEY = "obs"
observation_space: spaces.Space

def __init__(self, env: Env):
"""
Wraps a VectorEnv environment and makes sure its obervation space is a
Dictionary (If it is a Box, it will be wrapped into a dictionary)
"""
super().__init__(env)
self._requires_dict = False
if isinstance(self.observation_space, spaces.Box):
self._requires_dict = True
self.observation_space = spaces.Dict(
{self.OBSERVATION_KEY: self.observation_space}
)

def step(self, action: ActType) -> Tuple[ObsType, float, bool, dict]:
obs, reward, done, info = self.env.step(action)
if self._requires_dict:
obs = {self.OBSERVATION_KEY: obs}
return obs, reward, done, info

def reset(self, **kwargs) -> Union[ObsType, Tuple[ObsType, dict]]:
if not self._requires_dict:
return self.env.reset(**kwargs)
reset_output = self.env.reset(**kwargs)
if isinstance(reset_output, tuple):
obs, info = self.env.reset(**kwargs)
return {self.OBSERVATION_KEY: obs}, info
else:
obs = self.env.reset(**kwargs)
return {self.OBSERVATION_KEY: obs}
88 changes: 36 additions & 52 deletions habitat/core/vector_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@
import habitat
from habitat.config import Config
from habitat.core.env import Env, RLEnv
from habitat.core.gym_env_episode_count_wrapper import EnvCountEpisodeWrapper
from habitat.core.gym_env_obs_dict_wrapper import EnvObsDictWrapper
from habitat.core.logging import logger
from habitat.core.utils import tile_images
from habitat.utils import profiling_wrapper
from habitat.utils.pickle5_multiprocessing import ConnectionWrapper
from habitat.utils.pickle5_multiprocessing import (
CloudpickleWrapper,
ConnectionWrapper,
)

try:
# Use torch.multiprocessing if we can.
Expand Down Expand Up @@ -140,8 +145,8 @@ class VectorEnv:

def __init__(
self,
make_env_fn: Callable[..., Union[Env, RLEnv]] = _make_env_fn,
env_fn_args: Sequence[Tuple] = None,
make_env_fn: Callable[..., gym.Env],
env_fn_args: Sequence[Tuple],
auto_reset_done: bool = True,
multiprocessing_start_method: str = "forkserver",
workers_ignore_signals: bool = False,
Expand All @@ -151,7 +156,7 @@ def __init__(
:param make_env_fn: function which creates a single environment. An
environment can be of type :ref:`env.Env` or :ref:`env.RLEnv`
:param env_fn_args: tuple of tuple of args to pass to the
:ref:`_make_env_fn`.
:ref:`make_gym_from_config`.
:param auto_reset_done: automatically reset the environment when
done. This functionality is provided for seamless training
of vectorized environments.
Expand Down Expand Up @@ -230,33 +235,20 @@ def _worker_env(
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
signal.signal(signal.SIGUSR2, signal.SIG_IGN)

env = env_fn(*env_fn_args)
env = EnvCountEpisodeWrapper(EnvObsDictWrapper(env_fn(*env_fn_args)))
if parent_pipe is not None:
parent_pipe.close()
try:
command, data = connection_read_fn()
while command != CLOSE_COMMAND:
if command == STEP_COMMAND:
# different step methods for habitat.RLEnv and habitat.Env
if isinstance(env, (habitat.RLEnv, gym.Env)):
# habitat.RLEnv
observations, reward, done, info = env.step(**data)
if auto_reset_done and done:
observations = env.reset()
with profiling_wrapper.RangeContext(
"worker write after step"
):
connection_write_fn(
(observations, reward, done, info)
)
elif isinstance(env, habitat.Env): # type: ignore
# habitat.Env
observations = env.step(**data)
if auto_reset_done and env.episode_over:
observations = env.reset()
connection_write_fn(observations)
else:
raise NotImplementedError
observations, reward, done, info = env.step(data)
if auto_reset_done and done:
observations = env.reset()
with profiling_wrapper.RangeContext(
"worker write after step"
):
connection_write_fn((observations, reward, done, info))

elif command == RESET_COMMAND:
observations = env.reset()
Expand Down Expand Up @@ -316,7 +308,7 @@ def _spawn_workers(
args=(
worker_conn.recv,
worker_conn.send,
make_env_fn,
CloudpickleWrapper(make_env_fn),
env_args,
self._auto_reset_done,
workers_ignore_signals,
Expand Down Expand Up @@ -395,20 +387,16 @@ def reset_at(self, index_env: int):
return results

def async_step_at(
self, index_env: int, action: Union[int, str, Dict[str, Any]]
self, index_env: int, action: Union[int, np.ndarray]
) -> None:
# Backward compatibility
if isinstance(action, (int, np.integer, str)):
action = {"action": {"action": action}}

self._warn_cuda_tensors(action)
self._connection_write_fns[index_env]((STEP_COMMAND, action))

@profiling_wrapper.RangeContext("wait_step_at")
def wait_step_at(self, index_env: int) -> Any:
return self._connection_read_fns[index_env]()

def step_at(self, index_env: int, action: Union[int, str, Dict[str, Any]]):
def step_at(self, index_env: int, action: Union[int, np.ndarray]):
r"""Step in the index_env environment in the vector.
:param index_env: index of the environment to be stepped into
Expand All @@ -418,14 +406,12 @@ def step_at(self, index_env: int, action: Union[int, str, Dict[str, Any]]):
self.async_step_at(index_env, action)
return self.wait_step_at(index_env)

def async_step(
self, data: Sequence[Union[int, str, Dict[str, Any]]]
) -> None:
def async_step(self, data: Sequence[Union[int, np.ndarray]]) -> None:
r"""Asynchronously step in the environments.
:param data: list of size _num_envs containing keyword arguments to
pass to :ref:`step` method for each Environment. For example,
:py:`[{"action": "TURN_LEFT", "action_args": {...}}, ...]`.
:py:`[1, 3 ,5 , ...]`.
"""

for index_env, act in enumerate(data):
Expand All @@ -438,14 +424,12 @@ def wait_step(self) -> List[Any]:
self.wait_step_at(index_env) for index_env in range(self.num_envs)
]

def step(
self, data: Sequence[Union[int, str, Dict[str, Any]]]
) -> List[Any]:
def step(self, data: Sequence[Union[int, np.ndarray]]) -> List[Any]:
r"""Perform actions in the vectorized environments.
:param data: list of size _num_envs containing keyword arguments to
pass to :ref:`step` method for each Environment. For example,
:py:`[{"action": "TURN_LEFT", "action_args": {...}}, ...]`.
:py:`[1, 3 ,5 , ...]`.
:return: list of outputs from the step method of envs.
"""
self.async_step(data)
Expand Down Expand Up @@ -550,7 +534,7 @@ def render(
) -> Optional[np.ndarray]:
r"""Render observations from all environments in a tiled image."""
for write_fn in self._connection_write_fns:
write_fn((RENDER_COMMAND, (args, {"mode": "rgb", **kwargs})))
write_fn((RENDER_COMMAND, (args, {"mode": "rgb_array", **kwargs})))
images = [read_fn() for read_fn in self._connection_read_fns]
tile = tile_images(images)
if mode == "human":
Expand All @@ -571,22 +555,22 @@ def _valid_start_methods(self) -> Set[str]:
return {"forkserver", "spawn", "fork"}

def _warn_cuda_tensors(
self, action: Dict[str, Any], prefix: Optional[str] = None
self,
action: Union[int, np.ndarray, Dict[str, Any]],
prefix: Optional[str] = None,
):
if torch is None:
return

for k, v in action.items():
if isinstance(v, dict):
if isinstance(action, dict):
for k, v in action.items():
subk = f"{prefix}.{k}" if prefix is not None else k
self._warn_cuda_tensors(v, prefix=subk)
elif torch.is_tensor(v) and v.device.type == "cuda":
subk = f"{prefix}.{k}" if prefix is not None else k
warnings.warn(
"Action with key {} is a CUDA tensor."
" This will result in a CUDA context in the subproccess worker."
" Using CPU tensors instead is recommended.".format(subk)
)
elif isinstance(action, torch.Tensor) and action.device.type == "cuda":
warnings.warn(
f"Action with key {subk} is a CUDA tensor."
" This will result in a CUDA context in the subproccess worker."
" Using CPU tensors instead is recommended."
)

def __del__(self):
self.close()
Expand Down
Loading

0 comments on commit 710beab

Please sign in to comment.