forked from lazyprogrammer/machine_learning_examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapprox_semigradient_sarsa_control.py
190 lines (165 loc) · 5.98 KB
/
approx_semigradient_sarsa_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# https://deeplearningcourses.com/c/artificial-intelligence-reinforcement-learning-in-python
# https://www.udemy.com/artificial-intelligence-reinforcement-learning-in-python
from __future__ import print_function, division
from builtins import range
# Note: you may need to update your version of future
# sudo pip install -U future
import numpy as np
import matplotlib.pyplot as plt
from grid_world import standard_grid, negative_grid
from iterative_policy_evaluation import print_values, print_policy
from monte_carlo_es import max_dict
from sarsa import random_action, GAMMA, ALPHA, ALL_POSSIBLE_ACTIONS
SA2IDX = {}
IDX = 0
class Model:
def __init__(self):
self.theta = np.random.randn(25) / np.sqrt(25)
# if we use SA2IDX, a one-hot encoding for every (s,a) pair
# in reality we wouldn't want to do this b/c we have just
# as many params as before
# print "D:", IDX
# self.theta = np.random.randn(IDX) / np.sqrt(IDX)
def sa2x(self, s, a):
# NOTE: using just (r, c, r*c, u, d, l, r, 1) is not expressive enough
return np.array([
s[0] - 1 if a == 'U' else 0,
s[1] - 1.5 if a == 'U' else 0,
(s[0]*s[1] - 3)/3 if a == 'U' else 0,
(s[0]*s[0] - 2)/2 if a == 'U' else 0,
(s[1]*s[1] - 4.5)/4.5 if a == 'U' else 0,
1 if a == 'U' else 0,
s[0] - 1 if a == 'D' else 0,
s[1] - 1.5 if a == 'D' else 0,
(s[0]*s[1] - 3)/3 if a == 'D' else 0,
(s[0]*s[0] - 2)/2 if a == 'D' else 0,
(s[1]*s[1] - 4.5)/4.5 if a == 'D' else 0,
1 if a == 'D' else 0,
s[0] - 1 if a == 'L' else 0,
s[1] - 1.5 if a == 'L' else 0,
(s[0]*s[1] - 3)/3 if a == 'L' else 0,
(s[0]*s[0] - 2)/2 if a == 'L' else 0,
(s[1]*s[1] - 4.5)/4.5 if a == 'L' else 0,
1 if a == 'L' else 0,
s[0] - 1 if a == 'R' else 0,
s[1] - 1.5 if a == 'R' else 0,
(s[0]*s[1] - 3)/3 if a == 'R' else 0,
(s[0]*s[0] - 2)/2 if a == 'R' else 0,
(s[1]*s[1] - 4.5)/4.5 if a == 'R' else 0,
1 if a == 'R' else 0,
1
])
# if we use SA2IDX, a one-hot encoding for every (s,a) pair
# in reality we wouldn't want to do this b/c we have just
# as many params as before
# x = np.zeros(len(self.theta))
# idx = SA2IDX[s][a]
# x[idx] = 1
# return x
def predict(self, s, a):
x = self.sa2x(s, a)
return self.theta.dot(x)
def grad(self, s, a):
return self.sa2x(s, a)
def getQs(model, s):
# we need Q(s,a) to choose an action
# i.e. a = argmax[a]{ Q(s,a) }
Qs = {}
for a in ALL_POSSIBLE_ACTIONS:
q_sa = model.predict(s, a)
Qs[a] = q_sa
return Qs
if __name__ == '__main__':
# NOTE: if we use the standard grid, there's a good chance we will end up with
# suboptimal policies
# e.g.
# ---------------------------
# R | R | R | |
# ---------------------------
# R* | | U | |
# ---------------------------
# U | R | U | L |
# since going R at (1,0) (shown with a *) incurs no cost, it's OK to keep doing that.
# we'll either end up staying in the same spot, or back to the start (2,0), at which
# point we whould then just go back up, or at (0,0), at which point we can continue
# on right.
# instead, let's penalize each movement so the agent will find a shorter route.
#
# grid = standard_grid()
grid = negative_grid(step_cost=-0.1)
# print rewards
print("rewards:")
print_values(grid.rewards, grid)
# no policy initialization, we will derive our policy from most recent Q
# enumerate all (s,a) pairs, each will have its own weight in our "dumb" model
# essentially each weight will be a measure of Q(s,a) itself
states = grid.all_states()
for s in states:
SA2IDX[s] = {}
for a in ALL_POSSIBLE_ACTIONS:
SA2IDX[s][a] = IDX
IDX += 1
# initialize model
model = Model()
# repeat until convergence
t = 1.0
t2 = 1.0
deltas = []
for it in range(20000):
if it % 100 == 0:
t += 0.01
t2 += 0.01
if it % 1000 == 0:
print("it:", it)
alpha = ALPHA / t2
# instead of 'generating' an epsiode, we will PLAY
# an episode within this loop
s = (2, 0) # start state
grid.set_state(s)
# get Q(s) so we can choose the first action
Qs = getQs(model, s)
# the first (s, r) tuple is the state we start in and 0
# (since we don't get a reward) for simply starting the game
# the last (s, r) tuple is the terminal state and the final reward
# the value for the terminal state is by definition 0, so we don't
# care about updating it.
a = max_dict(Qs)[0]
a = random_action(a, eps=0.5/t) # epsilon-greedy
biggest_change = 0
while not grid.game_over():
r = grid.move(a)
s2 = grid.current_state()
# we need the next action as well since Q(s,a) depends on Q(s',a')
# if s2 not in policy then it's a terminal state, all Q are 0
old_theta = model.theta.copy()
if grid.is_terminal(s2):
model.theta += alpha*(r - model.predict(s, a))*model.grad(s, a)
else:
# not terminal
Qs2 = getQs(model, s2)
a2 = max_dict(Qs2)[0]
a2 = random_action(a2, eps=0.5/t) # epsilon-greedy
# we will update Q(s,a) AS we experience the episode
model.theta += alpha*(r + GAMMA*model.predict(s2, a2) - model.predict(s, a))*model.grad(s, a)
# next state becomes current state
s = s2
a = a2
biggest_change = max(biggest_change, np.abs(model.theta - old_theta).sum())
deltas.append(biggest_change)
plt.plot(deltas)
plt.show()
# determine the policy from Q*
# find V* from Q*
policy = {}
V = {}
Q = {}
for s in grid.actions.keys():
Qs = getQs(model, s)
Q[s] = Qs
a, max_q = max_dict(Qs)
policy[s] = a
V[s] = max_q
print("values:")
print_values(V, grid)
print("policy:")
print_policy(policy, grid)