forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpettingzoo_env.py
214 lines (163 loc) · 5.85 KB
/
pettingzoo_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from typing import Optional
import gymnasium as gym
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.utils.annotations import PublicAPI
@PublicAPI
class PettingZooEnv(MultiAgentEnv):
"""An interface to the PettingZoo MARL environment library.
See: https://github.com/Farama-Foundation/PettingZoo
Inherits from MultiAgentEnv and exposes a given AEC
(actor-environment-cycle) game from the PettingZoo project via the
MultiAgentEnv public API.
Note that the wrapper has the following important limitation:
Environments are positive sum games (-> Agents are expected to cooperate
to maximize reward). This isn't a hard restriction, it just that
standard algorithms aren't expected to work well in highly competitive
games.
Also note that the earlier existing restriction of all agents having the same
observation- and action spaces has been lifted. Different agents can now have
different spaces and the entire environment's e.g. `self.action_space` is a Dict
mapping agent IDs to individual agents' spaces. Same for `self.observation_space`.
.. testcode::
:skipif: True
from pettingzoo.butterfly import prison_v3
from ray.rllib.env.wrappers.pettingzoo_env import PettingZooEnv
env = PettingZooEnv(prison_v3.env())
obs, infos = env.reset()
# only returns the observation for the agent which should be stepping
print(obs)
.. testoutput::
{
'prisoner_0': array([[[0, 0, 0],
[0, 0, 0],
[0, 0, 0],
...,
[0, 0, 0],
[0, 0, 0],
[0, 0, 0]]], dtype=uint8)
}
.. testcode::
:skipif: True
obs, rewards, terminateds, truncateds, infos = env.step({
"prisoner_0": 1
})
# only returns the observation, reward, info, etc, for
# the agent who's turn is next.
print(obs)
.. testoutput::
{
'prisoner_1': array([[[0, 0, 0],
[0, 0, 0],
[0, 0, 0],
...,
[0, 0, 0],
[0, 0, 0],
[0, 0, 0]]], dtype=uint8)
}
.. testcode::
:skipif: True
print(rewards)
.. testoutput::
{
'prisoner_1': 0
}
.. testcode::
:skipif: True
print(terminateds)
.. testoutput::
{
'prisoner_1': False, '__all__': False
}
.. testcode::
:skipif: True
print(truncateds)
.. testoutput::
{
'prisoner_1': False, '__all__': False
}
.. testcode::
:skipif: True
print(infos)
.. testoutput::
{
'prisoner_1': {'map_tuple': (1, 0)}
}
"""
def __init__(self, env):
super().__init__()
self.env = env
env.reset()
self._agent_ids = set(self.env.agents)
self.observation_space = gym.spaces.Dict(
{aid: self.env.observation_space(aid) for aid in self._agent_ids}
)
self.action_space = gym.spaces.Dict(
{aid: self.env.action_space(aid) for aid in self._agent_ids}
)
def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
info = self.env.reset(seed=seed, options=options)
return (
{self.env.agent_selection: self.env.observe(self.env.agent_selection)},
info or {},
)
def step(self, action):
self.env.step(action[self.env.agent_selection])
obs_d = {}
rew_d = {}
terminated_d = {}
truncated_d = {}
info_d = {}
while self.env.agents:
obs, rew, terminated, truncated, info = self.env.last()
agent_id = self.env.agent_selection
obs_d[agent_id] = obs
rew_d[agent_id] = rew
terminated_d[agent_id] = terminated
truncated_d[agent_id] = truncated
info_d[agent_id] = info
if (
self.env.terminations[self.env.agent_selection]
or self.env.truncations[self.env.agent_selection]
):
self.env.step(None)
else:
break
all_gone = not self.env.agents
terminated_d["__all__"] = all_gone and all(terminated_d.values())
truncated_d["__all__"] = all_gone and all(truncated_d.values())
return obs_d, rew_d, terminated_d, truncated_d, info_d
def close(self):
self.env.close()
def render(self):
return self.env.render(self.render_mode)
@property
def get_sub_environments(self):
return self.env.unwrapped
@PublicAPI
class ParallelPettingZooEnv(MultiAgentEnv):
def __init__(self, env):
super().__init__()
self.par_env = env
self.par_env.reset()
self._agent_ids = set(self.par_env.agents)
self.observation_space = gym.spaces.Dict(
{aid: self.par_env.observation_space(aid) for aid in self._agent_ids}
)
self.action_space = gym.spaces.Dict(
{aid: self.par_env.action_space(aid) for aid in self._agent_ids}
)
def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
obs, info = self.par_env.reset(seed=seed, options=options)
return obs, info or {}
def step(self, action_dict):
obss, rews, terminateds, truncateds, infos = self.par_env.step(action_dict)
terminateds["__all__"] = all(terminateds.values())
truncateds["__all__"] = all(truncateds.values())
return obss, rews, terminateds, truncateds, infos
def close(self):
self.par_env.close()
def render(self):
return self.par_env.render(self.render_mode)
@property
def get_sub_environments(self):
return self.par_env.unwrapped