forked from home-assistant/core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_runner.py
189 lines (151 loc) · 6 KB
/
test_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Test the runner."""
import asyncio
from collections.abc import Iterator
import subprocess
import threading
from unittest.mock import patch
import packaging.tags
import py
import pytest
from homeassistant import core, runner
from homeassistant.core import HomeAssistant
from homeassistant.util import executor, thread
# https://github.com/home-assistant/supervisor/blob/main/supervisor/docker/homeassistant.py
SUPERVISOR_HARD_TIMEOUT = 240
TIMEOUT_SAFETY_MARGIN = 10
async def test_cumulative_shutdown_timeout_less_than_supervisor() -> None:
"""Verify the cumulative shutdown timeout is at least 10s less than the supervisor."""
assert (
core.STOPPING_STAGE_SHUTDOWN_TIMEOUT
+ core.STOP_STAGE_SHUTDOWN_TIMEOUT
+ core.FINAL_WRITE_STAGE_SHUTDOWN_TIMEOUT
+ core.CLOSE_STAGE_SHUTDOWN_TIMEOUT
+ executor.EXECUTOR_SHUTDOWN_TIMEOUT
+ thread.THREADING_SHUTDOWN_TIMEOUT
+ TIMEOUT_SAFETY_MARGIN
<= SUPERVISOR_HARD_TIMEOUT
)
async def test_setup_and_run_hass(hass: HomeAssistant, tmpdir: py.path.local) -> None:
"""Test we can setup and run."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)
with (
patch("homeassistant.bootstrap.async_setup_hass", return_value=hass),
patch("threading._shutdown"),
patch("homeassistant.core.HomeAssistant.async_run") as mock_run,
):
await runner.setup_and_run_hass(default_config)
assert threading._shutdown == thread.deadlock_safe_shutdown
assert mock_run.called
def test_run(hass: HomeAssistant, tmpdir: py.path.local) -> None:
"""Test we can run."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)
with (
patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1),
patch("homeassistant.bootstrap.async_setup_hass", return_value=hass),
patch("threading._shutdown"),
patch("homeassistant.core.HomeAssistant.async_run") as mock_run,
):
runner.run(default_config)
assert mock_run.called
def test_run_executor_shutdown_throws(
hass: HomeAssistant, tmpdir: py.path.local
) -> None:
"""Test we can run and we still shutdown if the executor shutdown throws."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)
with (
patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1),
pytest.raises(RuntimeError),
patch("homeassistant.bootstrap.async_setup_hass", return_value=hass),
patch("threading._shutdown"),
patch(
"homeassistant.runner.InterruptibleThreadPoolExecutor.shutdown",
side_effect=RuntimeError,
) as mock_shutdown,
patch(
"homeassistant.core.HomeAssistant.async_run",
) as mock_run,
):
runner.run(default_config)
assert mock_shutdown.called
assert mock_run.called
def test_run_does_not_block_forever_with_shielded_task(
hass: HomeAssistant, tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
) -> None:
"""Test we can shutdown and not block forever."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)
tasks = []
async def _async_create_tasks(*_):
async def async_raise(*_):
try:
await asyncio.sleep(2)
except asyncio.CancelledError:
raise Exception # noqa: TRY002
async def async_shielded(*_):
try:
await asyncio.sleep(2)
except asyncio.CancelledError:
await asyncio.sleep(2)
tasks.append(asyncio.ensure_future(asyncio.shield(async_shielded())))
tasks.append(asyncio.ensure_future(asyncio.sleep(2)))
tasks.append(asyncio.ensure_future(async_raise()))
await asyncio.sleep(0)
return 0
with (
patch.object(runner, "TASK_CANCELATION_TIMEOUT", 0.1),
patch("homeassistant.bootstrap.async_setup_hass", return_value=hass),
patch("threading._shutdown"),
patch("homeassistant.core.HomeAssistant.async_run", _async_create_tasks),
):
runner.run(default_config)
assert len(tasks) == 3
assert (
"Task could not be canceled and was still running after shutdown" in caplog.text
)
async def test_unhandled_exception_traceback(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test an unhandled exception gets a traceback in debug mode."""
raised = asyncio.Event()
async def _unhandled_exception():
raised.set()
raise Exception("This is unhandled") # noqa: TRY002
try:
hass.loop.set_debug(True)
task = asyncio.create_task(_unhandled_exception(), name="name_of_task")
await raised.wait()
# Delete it without checking result to trigger unhandled exception
del task
finally:
hass.loop.set_debug(False)
assert "Task exception was never retrieved" in caplog.text
assert "This is unhandled" in caplog.text
assert "_unhandled_exception" in caplog.text
assert "name_of_task" in caplog.text
def test_enable_posix_spawn() -> None:
"""Test that we can enable posix_spawn on musllinux."""
def _mock_sys_tags_any() -> Iterator[packaging.tags.Tag]:
yield from packaging.tags.parse_tag("py3-none-any")
def _mock_sys_tags_musl() -> Iterator[packaging.tags.Tag]:
yield from packaging.tags.parse_tag("cp311-cp311-musllinux_1_1_x86_64")
with (
patch.object(subprocess, "_USE_POSIX_SPAWN", False),
patch(
"homeassistant.runner.packaging.tags.sys_tags",
side_effect=_mock_sys_tags_musl,
),
):
runner._enable_posix_spawn()
assert subprocess._USE_POSIX_SPAWN is True
with (
patch.object(subprocess, "_USE_POSIX_SPAWN", False),
patch(
"homeassistant.runner.packaging.tags.sys_tags",
side_effect=_mock_sys_tags_any,
),
):
runner._enable_posix_spawn()
assert subprocess._USE_POSIX_SPAWN is False