-
Notifications
You must be signed in to change notification settings - Fork 0
/
a3c_display.py
106 lines (87 loc) · 2.97 KB
/
a3c_display.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# -*- coding: utf-8 -*-
import tensorflow as tf
import numpy as np
import random
from game_state import GameState
from game_ac_network import GameACFFNetwork, GameACLSTMNetwork, GameACFCNetwork
from a3c_training_thread import A3CTrainingThread
from rmsprop_applier import RMSPropApplier
from constants import ACTION_SIZE
from constants import PARALLEL_SIZE
from constants import CHECKPOINT_DIR
from constants import RMSP_EPSILON
from constants import RMSP_ALPHA
from constants import GRAD_NORM_CLIP
from constants import USE_GPU
from constants import USE_LSTM
def choose_action(pi_values):
values = []
sum = 0.0
for rate in pi_values:
sum = sum + rate
value = sum
values.append(value)
r = random.random() * sum
for i in range(len(values)):
if values[i] >= r:
return i;
#fail safe
return len(values)-1
# use CPU for display tool
device = "/cpu:0"
"""
if USE_LSTM:
global_network = GameACLSTMNetwork(ACTION_SIZE, -1, device)
else:
global_network = GameACFFNetwork(ACTION_SIZE, device)
"""
global_network = GameACFCNetwork(ACTION_SIZE, device)
learning_rate_input = tf.placeholder("float")
grad_applier = RMSPropApplier(learning_rate = learning_rate_input,
decay = RMSP_ALPHA,
momentum = 0.0,
epsilon = RMSP_EPSILON,
clip_norm = GRAD_NORM_CLIP,
device = device)
# training_threads = []
# for i in range(PARALLEL_SIZE):
# training_thread = A3CTrainingThread(i, global_network, 1.0,
# learning_rate_input,
# grad_applier,
# 8000000,
# device = device)
# training_threads.append(training_thread)
sess = tf.Session()
init = tf.initialize_all_variables()
sess.run(init)
saver = tf.train.Saver()
checkpoint = tf.train.get_checkpoint_state(CHECKPOINT_DIR)
if checkpoint and checkpoint.model_checkpoint_path:
saver.restore(sess, checkpoint.model_checkpoint_path)
print("checkpoint loaded:", checkpoint.model_checkpoint_path)
else:
print("Could not find old checkpoint")
#game_state = GameState(0, display=True, no_op_max=0)
import gym_ple
def register_game(game, display=False):
from gym.envs.registration import register
from gym_ple.ple_env import PLEEnv
register(
id='PLE-{}-v0'.format(game),
entry_point='gym_ple:PLEEnv',
kwargs={'game_name': game, 'display_screen':display},
timestep_limit=10000)
register_game('Catcher', True)
def get_game(thread_index):
from game_state_env import GameStateGymEnv
import gym
env = gym.make('PLE-Catcher-v0')
return GameStateGymEnv(env)
game_state = get_game(0)
while True:
pi_values = global_network.run_policy(sess, game_state.s_t)
action = choose_action(pi_values)
game_state.process(action)
game_state.update()
if game_state.terminal:
game_state.reset()