-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathagent.py
208 lines (164 loc) · 7.77 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import random
import math
from environment import Agent, Environment
from planner import RoutePlanner
from simulator import Simulator
class LearningAgent(Agent):
""" An agent that learns to drive in the Smartcab world.
This is the object you will be modifying. """
def __init__(self, env, learning=False, epsilon=1.0, alpha=0.5):
super(LearningAgent, self).__init__(env) # Set the agent in the evironment
self.planner = RoutePlanner(self.env, self) # Create a route planner
self.valid_actions = self.env.valid_actions # The set of valid actions
# Set parameters of the learning agent
self.learning = learning # Whether the agent is expected to learn
self.Q = dict() # Create a Q-table which will be a dictionary of tuples
self.epsilon = epsilon # Random exploration factor
self.alpha = alpha # Learning factor
self.N = 0
###########
## TO DO ##
###########
# Set any additional class parameters as needed
def reset(self, destination=None, testing=False):
""" The reset function is called at the beginning of each trial.
'testing' is set to True if testing trials are being used
once training trials have completed. """
# Select the destination as the new location to route to
self.planner.route_to(destination)
###########
## TO DO ##
###########
# Update epsilon using a decay function of your choice
self.N += 1
#self.epsilon = math.exp(-0.005*self.alpha*self.N)
#self.alpha = 0.8*math.exp(-0.01*self.N)
self.epsilon -= 0.001
# Update additional class parameters as needed
# If 'testing' is True, set epsilon and alpha to 0
if testing == True:
self.epsilon = 0
self.alpha = 0
return None
def build_state(self):
""" The build_state function is called when the agent requests data from the
environment. The next waypoint, the intersection inputs, and the deadline
are all features available to the agent. """
# Collect data about the environment
waypoint = self.planner.next_waypoint() # The next waypoint
inputs = self.env.sense(self) # Visual input - intersection light and traffic
deadline = self.env.get_deadline(self) # Remaining deadline
###########
## TO DO ##
###########
# Set 'state' as a tuple of relevant data for the agent
state = (waypoint, inputs['light'], ('oncoming',inputs['oncoming']), ('left',inputs['left']))
return state
def get_maxQ(self, state):
""" The get_max_Q function is called when the agent is asked to find the
maximum Q-value of all actions based on the 'state' the smartcab is in. """
###########
## TO DO ##
###########
# Calculate the maximum Q-value of all actions for a given state
maxQ = max(self.Q[state].values())
return maxQ
def createQ(self, state):
""" The createQ function is called when a state is generated by the agent. """
###########
## TO DO ##
###########
# When learning, check if the 'state' is not in the Q-table
# If it is not, create a new dictionary for that state
# Then, for each action available, set the initial Q-value to 0.0
if self.learning == True:
if self.Q.get(state) == None:
self.Q[state] = {a:0.0 for a in Environment.valid_actions}
return
def choose_action(self, state):
""" The choose_action function is called when the agent is asked to choose
which action to take, based on the 'state' the smartcab is in. """
# Set the agent state and default action
self.state = state
self.next_waypoint = self.planner.next_waypoint()
action = None
###########
## TO DO ##
###########
# When not learning, choose a random action
if self.learning == False:
action = random.choice(Environment.valid_actions)
# When learning, choose a random action with 'epsilon' probability
# Otherwise, choose an action with the highest Q-value for the current state
else:
keys = []
if random.random() < self.epsilon:
action = random.choice(Environment.valid_actions)
else:
for key, value in self.Q[state].iteritems():
if value == self.get_maxQ(state):
keys.append(key)
print "Appending key={}, value={}".format(key, value)
action = random.choice(keys)
print "action is={}".format(action)
return action
def learn(self, state, action, reward):
""" The learn function is called after the agent completes an action and
receives an award. This function does not consider future rewards
when conducting learning. """
###########
## TO DO ##
###########
# When learning, implement the value iteration update rule
# Use only the learning rate 'alpha' (do not use the discount factor 'gamma')
if self.learning == True:
self.Q[state][action] += self.alpha * (reward - self.Q[state][action])
return
def update(self):
""" The update function is called when a time step is completed in the
environment for a given trial. This function will build the agent
state, choose an action, receive a reward, and learn if enabled. """
state = self.build_state() # Get current state
self.createQ(state) # Create 'state' in Q-table
action = self.choose_action(state) # Choose an action
reward = self.env.act(self, action) # Receive a reward
self.learn(state, action, reward) # Q-learn
return
def run():
""" Driving function for running the simulation.
Press ESC to close the simulation, or [SPACE] to pause the simulation. """
##############
# Create the environment
# Flags:
# verbose - set to True to display additional output from the simulation
# num_dummies - discrete number of dummy agents in the environment, default is 100
# grid_size - discrete number of intersections (columns, rows), default is (8, 6)
env = Environment()
##############
# Create the driving agent
# Flags:
# learning - set to True to force the driving agent to use Q-learning
# * epsilon - continuous value for the exploration factor, default is 1
# * alpha - continuous value for the learning rate, default is 0.5
agent = env.create_agent(LearningAgent, learning=True, alpha=0.5)
##############
# Follow the driving agent
# Flags:
# enforce_deadline - set to True to enforce a deadline metric
env.set_primary_agent(agent, enforce_deadline=True)
##############
# Create the simulation
# Flags:
# update_delay - continuous time (in seconds) between actions, default is 2.0 seconds
# display - set to False to disable the GUI if PyGame is enabled
# log_metrics - set to True to log trial and simulation results to /logs
# optimized - set to True to change the default log file name
sim = Simulator(env, update_delay=0.01, log_metrics=True, optimized=True)
##############
# Run the simulator
# Flags:
# tolerance - epsilon tolerance before beginning testing, default is 0.05
# n_test - discrete number of testing trials to perform, default is 0
sim.run(tolerance=0.01, n_test=100)
if __name__ == '__main__':
run()