This repository was archived by the owner on May 13, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathcompute_page_test.py
228 lines (176 loc) · 7.97 KB
/
compute_page_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# Copyright 2015 Google Inc. All Rights Reserved.
import unittest
import webtest
import apprtc
import compute_page
from test_util import CapturingFunction
from test_util import ReplaceFunction
from google.appengine.ext import testbed
class FakeComputeService(object):
"""Handles Compute Engine service calls to the Google API client."""
def __init__(self):
# This is a bit hacky but the Google API is lots of function
# chaining. This at least makes things easier to reason about as
# opposed to making a new variable for each level.
self.instances = CapturingFunction(lambda: self.instances)
self.instances.start = CapturingFunction(lambda: self.instances.start)
self.instances.start.execute = CapturingFunction()
self.instances.stop = CapturingFunction(lambda: self.instances.stop)
self.instances.stop.execute = CapturingFunction()
# Change the status of an instance by setting the return value.
self.get_return_value = None
self.instances.get = CapturingFunction(lambda: self.instances.get)
self.instances.get.execute = CapturingFunction(
lambda: self.get_return_value)
class ComputePageHandlerTest(unittest.TestCase):
def setUp(self):
# First, create an instance of the Testbed class.
self.testbed = testbed.Testbed()
# Then activate the testbed, which prepares the service stubs for use.
self.testbed.activate()
# Next, declare which service stubs you want to use.
self.testbed.init_taskqueue_stub()
self.test_app = webtest.TestApp(apprtc.app)
# Fake out the compute service.
self.build_compute_service_replacement = ReplaceFunction(
compute_page.ComputePage,
'_build_compute_service',
self.fake_build_compute_service)
# Note that '_build_compute_service' is called at request
# time. Tests need to be able to setup test data in the fake
# before ComputePage is initialize. Thus compute_service is
# created here and it is assumed that _build_compute_service is
# only called once.
self.compute_service = FakeComputeService()
# Default testbed app name.
self.app_id = 'testbed-test'
def fake_build_compute_service(self, *args):
return self.compute_service
def tearDown(self):
self.testbed.deactivate()
del self.build_compute_service_replacement
# TODO(decurtis): Extract these functions to our own test base class.
def makePostRequest(self, path, body=''):
return self.test_app.post(path, body, headers={'User-Agent': 'Safari'})
def makeGetRequest(self, path):
# PhantomJS uses WebKit, so Safari is closest to the thruth.
return self.test_app.get(path, headers={'User-Agent': 'Safari'})
def testGetStatus(self):
instance = 'test-instance'
zone = 'test-zone'
# Fake instance has status of RUNNING
instance_status = compute_page.COMPUTE_STATUS_RUNNING
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
response = self.makeGetRequest('/compute/%s/%s/%s' %
(compute_page.ACTION_STATUS, instance, zone))
get_dict = {'project': self.app_id,
'instance': instance,
'zone': zone}
actual_get_dict = self.compute_service.instances.get.last_kwargs
self.assertEqual(get_dict, actual_get_dict)
self.assertEqual(instance_status, response.body)
def testPostStartWhenTerminated(self):
"""Test start action with instance in the TERMINATED state."""
instance = 'test-instance'
zone = 'test-zone'
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
# Suppose that instance is TERMINATED.
instance_status = compute_page.COMPUTE_STATUS_TERMINATED
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
# makePostRequest will check for 200 success.
self.makePostRequest(start_url)
# If the state is TERMINATED then we should have a new start task.
tasks = self.testbed.get_stub('taskqueue').GetTasks('default')
self.assertEqual(1, len(tasks))
# Verify start() API called only once.
self.assertEqual(1, self.compute_service.instances.start.num_calls)
# Check start() API called when in the TERMINATED state.
get_dict = {'project': self.app_id,
'instance': instance,
'zone': zone}
actual_get_dict = self.compute_service.instances.start.last_kwargs
self.assertEqual(get_dict, actual_get_dict)
def testPostStartWhenRunning(self):
"""Test start action when instance in the RUNNING state."""
instance = 'test-instance'
zone = 'test-zone'
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
instance_status = compute_page.COMPUTE_STATUS_RUNNING
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
self.makePostRequest(start_url)
# No further tasks needed.
tasks = self.testbed.get_stub('taskqueue').GetTasks('default')
self.assertEqual(0, len(tasks))
# Verify start() API not called.
self.assertEqual(0, self.compute_service.instances.start.num_calls)
def testPostStartNotRunningOrTerminated(self):
"""Test start action in an intermediate state."""
compute_instances = self.compute_service.instances
taskqueue = self.testbed.get_stub('taskqueue')
instance = 'test-instance'
zone = 'test-zone'
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
# Check all intermediate states.
for instance_status in ['PROVISIONING', 'STAGING', 'STOPPING']:
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: instance_status
}
self.makePostRequest(start_url)
# Since if the instance neither RUNNING nor TERMINATED then a
# new task should be queued.
tasks = taskqueue.GetTasks('default')
self.assertEqual(1, len(tasks))
task = tasks[-1]
self.assertEqual(start_url, task['url'])
# Verify start() API not called.
self.assertEqual(0, self.compute_service.instances.start.num_calls)
# Simulate the start task running AGAIN but instance is RUNNING.
taskqueue.FlushQueue('default')
def testPostRestartWhenRunning(self):
"""Test restart action in a running state."""
compute_instances = self.compute_service.instances
taskqueue = self.testbed.get_stub('taskqueue')
instance = 'test-instance'
zone = 'test-zone'
restart_url = '/compute/%s/%s/%s' % (compute_page.ACTION_RESTART,
instance, zone)
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: 'RUNNING'
}
self.makePostRequest(restart_url)
# A new task should be queued.
tasks = taskqueue.GetTasks('default')
self.assertEqual(1, len(tasks))
task = tasks[-1]
start_url = '/compute/%s/%s/%s' % (compute_page.ACTION_START,
instance, zone)
self.assertEqual(start_url, task['url'])
# Verify stop() API called.
self.assertEqual(1, self.compute_service.instances.stop.num_calls)
def testPostRestartWhenNotRunning(self):
"""Test restart action in a non-running state."""
compute_instances = self.compute_service.instances
taskqueue = self.testbed.get_stub('taskqueue')
instance = 'test-instance'
zone = 'test-zone'
restart_url = '/compute/%s/%s/%s' % (compute_page.ACTION_RESTART,
instance, zone)
for status in ['PROVISIONING', 'STAGING', 'STOPPING', 'TERMINATED']:
self.compute_service.get_return_value = {
compute_page.COMPUTE_STATUS: status
}
self.makePostRequest(restart_url)
# No task should be queued.
tasks = taskqueue.GetTasks('default')
self.assertEqual(0, len(tasks))
# Verify stop() API not called.
self.assertEqual(0, self.compute_service.instances.stop.num_calls)