Skip to content

Commit

Permalink
Test cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
dabeaz committed Mar 11, 2017
1 parent cad9d4e commit bc5346a
Showing 1 changed file with 63 additions and 171 deletions.
234 changes: 63 additions & 171 deletions tests/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,335 +5,227 @@
from curio import *
kernel_clock = clock


def test_hello(kernel):
results = []
async def hello():
results.append('hello')

kernel.run(hello)
assert results == ['hello']


def test_return(kernel):

async def hello():
return 'hello'

result = kernel.run(hello)
assert result == 'hello'


def test_raise(kernel):

class Error(Exception):
pass

async def boom():
raise Error
raise Error()

try:
kernel.run(boom)
assert False, 'boom() did not raise'
except TaskError as exc:
assert isinstance(exc.__cause__, Error)


def test_sleep(kernel):
results = []
start = end = 0
async def main():
results.append('start')
nonlocal start, end
start = time.time()
await sleep(0.5)
results.append('end')
end = time.time()

start = time.time()
kernel.run(main)
end = time.time()
assert results == [
'start',
'end',
]
elapsed = end - start
assert elapsed > 0.5


def test_wakeat(kernel):
results = []
async def main():
clock = await kernel_clock() + 0.5
results.append('start')
newclock = await wake_at(clock)
results.append(round(newclock - clock, 1))
clock = await kernel_clock()
newclock = await wake_at(clock+0.5)
assert (newclock - clock) >= 0.5

start = time.time()
kernel.run(main)
end = time.time()
assert results == [
'start',
0.0,
]
elapsed = end - start
assert elapsed > 0.5
assert (end - start) > 0.5


def test_sleep_cancel(kernel):
results = []
cancelled = False

async def sleeper():
results.append('start')
nonlocal cancelled
try:
await sleep(1)
results.append('not here')
assert False
except CancelledError:
results.append('cancelled')
cancelled = True

async def main():
task = await spawn(sleeper)
await sleep(0.5)
await sleep(0.1)
await task.cancel()

kernel.run(main)
assert results == [
'start',
'cancelled',
]

assert cancelled

def test_sleep_timeout(kernel):
results = []
cancelled = True

async def sleeper():
results.append('start')
nonlocal cancelled
try:
await timeout_after(0.5, sleep, 1)
results.append('not here')
await timeout_after(0.1, sleep, 1)
assert False
except TaskTimeout:
results.append('timeout')
cancelled = True

async def main():
task = await spawn(sleeper)
await task.join()

kernel.run(main)
assert results == [
'start',
'timeout',
]

assert cancelled

def test_sleep_timeout_absolute(kernel):
results = []
cancelled = False

async def sleeper():
results.append('start')
nonlocal cancelled
try:
await timeout_at((await kernel_clock()) + 0.5, sleep, 1)
results.append('not here')
await timeout_at((await kernel_clock()) + 0.1, sleep, 1)
assert False
except TaskTimeout:
results.append('timeout')
cancelled = True

async def main():
task = await spawn(sleeper)
await task.join()

kernel.run(main)
assert results == [
'start',
'timeout',
]

assert cancelled

def test_sleep_ignore_timeout(kernel):
results = []

async def sleeper():
results.append('start')
if await ignore_after(0.5, sleep(1)) is None:
results.append('timeout')
cancelled = False

async with ignore_after(0.5) as s:
await sleep(1)
if await ignore_after(0.1, sleep(1)) is None:
cancelled = True
assert cancelled

cancelled = False
async with ignore_after(0.1) as s:
await sleep(1)
if s.result is None:
results.append('timeout2')
cancelled = True

assert cancelled

async def main():
task = await spawn(sleeper)
await task.join()

kernel.run(main)
assert results == [
'start',
'timeout',
'timeout2',
]


def test_sleep_ignore_timeout_absolute(kernel):
results = []

async def sleeper():
results.append('start')
if await ignore_at((await kernel_clock()) + 0.5, sleep(1)) is None:
results.append('timeout')
cancelled = False
if await ignore_at((await kernel_clock()) + 0.1, sleep(1)) is None:
cancelled = True

async with ignore_at((await kernel_clock()) + 0.5) as s:
assert cancelled

cancelled = False
async with ignore_at((await kernel_clock()) + 0.1) as s:
await sleep(1)

if s.result is None:
results.append('timeout2')
cancelled = True
assert cancelled

async def main():
task = await spawn(sleeper)
await task.join()

kernel.run(main)
assert results == [
'start',
'timeout',
'timeout2',
]


def test_sleep_notimeout(kernel):
results = []

async def sleeper():
results.append('start')
try:
await timeout_after(1.5, sleep(1))
results.append('here')
await timeout_after(0.5, sleep(0.1))
assert True
except TaskTimeout:
results.append('not here')

await sleep(1)
results.append('here2')
assert False
await sleep(0.5)
assert True

async def main():
task = await spawn(sleeper)
await task.join()

kernel.run(main)
assert results == [
'start',
'here',
'here2'
]


def test_task_join(kernel):
results = []

async def child():
results.append('start')
await sleep(0.5)
results.append('end')
return 37

async def main():
task = await spawn(child)
await sleep(0.1)
results.append('joining')
r = await task.join()
results.append(r)
assert r == 37

kernel.run(main)
assert results == [
'start',
'joining',
'end',
37
]


def test_task_join_error(kernel):
results = []

async def child():
results.append('start')
int('bad')

async def main():
task = await spawn(child)
await sleep(0.1)
results.append('joining')
try:
r = await task.join()
results.append(r)
assert False
except TaskError as e:
results.append('task fail')
results.append(type(e))
results.append(type(e.__cause__))
assert isinstance(e.__cause__, ValueError)

kernel.run(main)
assert results == [
'start',
'joining',
'task fail',
TaskError,
ValueError,
]


def test_task_cancel(kernel):
results = []

cancelled = False
async def child():
results.append('start')
nonlocal cancelled
try:
await sleep(0.5)
results.append('end')
assert False
except CancelledError:
results.append('cancelled')
cancelled = True

async def main():
task = await spawn(child)
results.append('cancel start')
await sleep(0.1)
results.append('cancelling')
await task.cancel()
results.append('done')
assert cancelled

kernel.run(main)
assert results == [
'cancel start',
'start',
'cancelling',
'cancelled',
'done',
]


def test_task_cancel_poll(kernel):
results = []

async def child():
async with disable_cancellation():
myself = await current_task()
results.append('start')
await sleep(0.5)
await sleep(0.1)
results.append('success')
if await check_cancellation():
results.append('cancelled')
return
else:
results.append('end')
assert False

async def main():
task = await spawn(child)
results.append('cancel start')
await sleep(0.1)
results.append('cancelling')
await task.cancel()
results.append('done')

kernel.run(main)
assert results == [
'cancel start',
'start',
'cancelling',
'cancelled',
'done',
]
assert results == ['success', 'cancelled', 'done']

def test_task_cancel_not_blocking(kernel):
async def child(e1, e2):
Expand Down

0 comments on commit bc5346a

Please sign in to comment.