-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcem.py
127 lines (116 loc) · 3.86 KB
/
cem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import copy
import numpy as np
import ray
import gym
@ray.remote
def rollout(env, dist, args):
if dist == "Bernoulli":
actions = np.random.binomial(**args)
else:
raise ValueError("Unknown distribution")
sampled_reward = 0
for a in actions:
obs, reward, done, info = env.step(a)
sampled_reward += reward
if done:
break
return actions, sampled_reward
class CEM:
def __init__(
self,
env_name,
optimizer,
look_ahead,
num_parallel,
elite_frac,
num_ep,
opt_iters,
dist,
control,
):
self.env = gym.make(env_name)
self.optimizer = optimizer
self.look_ahead = look_ahead
self.num_parallel = num_parallel
self.elite_frac = elite_frac
self.num_ep = num_ep
self.opt_iters = opt_iters
self.dist = dist
self.control = control
self.reset()
def optimize(self):
for i in range(self.num_ep):
self.env.reset()
ep_reward = 0
done = False
j = 0
while not done:
print(j)
j += 1
if self.optimizer == "cem":
actions = self.cross_ent_optimizer()
else:
raise ValueError("Unknown Optimizer")
if self.control == "open-loop":
for a in actions:
obs, reward, done, info = self.env.step(a)
ep_reward += reward
if done:
break
elif self.control == "closed-loop":
obs, reward, done, info = self.env.step(actions[0])
ep_reward += reward
else:
raise ValueError("Unknown control type.")
if done:
self.episode_rewards.append(ep_reward)
self.num_episodes += 1
print(f"Episode {i}, reward: {ep_reward}")
def cross_ent_optimizer(self):
n_elites = int(np.ceil(self.num_parallel * self.elite_frac))
if self.dist == "Bernoulli":
p = [0.5] * self.look_ahead
for i in range(self.opt_iters):
# print(f"Opt iteration {i}")
futures = []
for j in range(self.num_parallel):
args = {"n": 1, "p": p, "size": self.look_ahead}
fid = rollout.remote(copy.deepcopy(self.env), self.dist, args)
futures.append(fid)
results = [tuple(ray.get(id)) for id in futures]
# print(results)
sampled_rewards = [r for _, r in results]
# print(f"Mean of sampled rewards {np.mean(sampled_rewards)}")
elite_ix = np.argsort(sampled_rewards)[-n_elites:]
# print(elite_ix)
elite_actions = np.array([a for a, _ in results])[elite_ix]
# print(elite_actions)
# print(elite_actions.shape)
p = np.mean(elite_actions, axis=0)
# print(p)
actions = np.random.binomial(n=1, p=p, size=self.look_ahead)
else:
raise ValueError("Unknown distribution")
return actions
def reset(self):
self.episode_rewards = []
self.num_episodes = 0
@property
def avg_reward(self):
return np.mean(self.episode_rewards)
if __name__ == "__main__":
np.random.seed(0)
ray.init()
cem = CEM(
env_name="CartPole-v0",
optimizer="cem",
look_ahead=10,
num_parallel=50,
elite_frac=0.1,
num_ep=1,
opt_iters=5,
dist="Bernoulli",
control="closed-loop",
)
cem.optimize()
print(f"Average epsiode reward in {cem.num_episodes} is {cem.avg_reward}")