Skip to content

Commit

Permalink
Avoid deadlock on shutdown when a task is shielded from cancelation (h…
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Sep 21, 2021
1 parent d494b35 commit 9831ff0
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 1 deletion.
70 changes: 69 additions & 1 deletion homeassistant/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
# use case.
#
MAX_EXECUTOR_WORKERS = 64
TASK_CANCELATION_TIMEOUT = 5

_LOGGER = logging.getLogger(__name__)


@dataclasses.dataclass
Expand Down Expand Up @@ -105,4 +108,69 @@ async def setup_and_run_hass(runtime_config: RuntimeConfig) -> int:
def run(runtime_config: RuntimeConfig) -> int:
"""Run Home Assistant."""
asyncio.set_event_loop_policy(HassEventLoopPolicy(runtime_config.debug))
return asyncio.run(setup_and_run_hass(runtime_config))
# Backport of cpython 3.9 asyncio.run with a _cancel_all_tasks that times out
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(setup_and_run_hass(runtime_config))
finally:
try:
_cancel_all_tasks_with_timeout(loop, TASK_CANCELATION_TIMEOUT)
loop.run_until_complete(loop.shutdown_asyncgens())
# Once cpython 3.8 is no longer supported we can use the
# the built-in loop.shutdown_default_executor
loop.run_until_complete(_shutdown_default_executor(loop))
finally:
asyncio.set_event_loop(None)
loop.close()


def _cancel_all_tasks_with_timeout(
loop: asyncio.AbstractEventLoop, timeout: int
) -> None:
"""Adapted _cancel_all_tasks from python 3.9 with a timeout."""
to_cancel = asyncio.all_tasks(loop)
if not to_cancel:
return

for task in to_cancel:
task.cancel()

loop.run_until_complete(asyncio.wait(to_cancel, timeout=timeout))

for task in to_cancel:
if task.cancelled():
continue
if not task.done():
_LOGGER.warning(
"Task could not be canceled and was still running after shutdown: %s",
task,
)
continue
if task.exception() is not None:
loop.call_exception_handler(
{
"message": "unhandled exception during shutdown",
"exception": task.exception(),
"task": task,
}
)


async def _shutdown_default_executor(loop: asyncio.AbstractEventLoop) -> None:
"""Backport of cpython 3.9 schedule the shutdown of the default executor."""
future = loop.create_future()

def _do_shutdown() -> None:
try:
loop._default_executor.shutdown(wait=True) # type: ignore # pylint: disable=protected-access
loop.call_soon_threadsafe(future.set_result, None)
except Exception as ex: # pylint: disable=broad-except
loop.call_soon_threadsafe(future.set_exception, ex)

thread = threading.Thread(target=_do_shutdown)
thread.start()
try:
await future
finally:
thread.join()
80 changes: 80 additions & 0 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Test the runner."""

import asyncio
import threading
from unittest.mock import patch

import pytest

from homeassistant import core, runner
from homeassistant.util import executor, thread

Expand Down Expand Up @@ -37,3 +40,80 @@ async def test_setup_and_run_hass(hass, tmpdir):
assert threading._shutdown == thread.deadlock_safe_shutdown

assert mock_run.called


def test_run(hass, tmpdir):
"""Test we can run."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)

with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), patch(
"homeassistant.bootstrap.async_setup_hass", return_value=hass
), patch("threading._shutdown"), patch(
"homeassistant.core.HomeAssistant.async_run"
) as mock_run:
runner.run(default_config)

assert mock_run.called


def test_run_executor_shutdown_throws(hass, tmpdir):
"""Test we can run and we still shutdown if the executor shutdown throws."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)

with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), pytest.raises(
RuntimeError
), patch("homeassistant.bootstrap.async_setup_hass", return_value=hass), patch(
"threading._shutdown"
), patch(
"homeassistant.runner.InterruptibleThreadPoolExecutor.shutdown",
side_effect=RuntimeError,
) as mock_shutdown, patch(
"homeassistant.core.HomeAssistant.async_run"
) as mock_run:
runner.run(default_config)

assert mock_shutdown.called
assert mock_run.called


def test_run_does_not_block_forever_with_shielded_task(hass, tmpdir, caplog):
"""Test we can shutdown and not block forever."""
test_dir = tmpdir.mkdir("config")
default_config = runner.RuntimeConfig(test_dir)
created_tasks = False

async def _async_create_tasks(*_):
nonlocal created_tasks

async def async_raise(*_):
try:
await asyncio.sleep(2)
except asyncio.CancelledError:
raise Exception

async def async_shielded(*_):
try:
await asyncio.sleep(2)
except asyncio.CancelledError:
await asyncio.sleep(2)

asyncio.ensure_future(asyncio.shield(async_shielded()))
asyncio.ensure_future(asyncio.sleep(2))
asyncio.ensure_future(async_raise())
await asyncio.sleep(0.1)
created_tasks = True
return 0

with patch.object(runner, "TASK_CANCELATION_TIMEOUT", 1), patch(
"homeassistant.bootstrap.async_setup_hass", return_value=hass
), patch("threading._shutdown"), patch(
"homeassistant.core.HomeAssistant.async_run", _async_create_tasks
):
runner.run(default_config)

assert created_tasks is True
assert (
"Task could not be canceled and was still running after shutdown" in caplog.text
)

0 comments on commit 9831ff0

Please sign in to comment.