forked from home-assistant/core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdebounce.py
126 lines (99 loc) · 3.83 KB
/
debounce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Debounce helper."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from logging import Logger
from typing import Any
from homeassistant.core import HassJob, HomeAssistant, callback
class Debouncer:
"""Class to rate limit calls to a specific command."""
def __init__(
self,
hass: HomeAssistant,
logger: Logger,
*,
cooldown: float,
immediate: bool,
function: Callable[..., Awaitable[Any]] | None = None,
) -> None:
"""Initialize debounce.
immediate: indicate if the function needs to be called right away and
wait <cooldown> until executing next invocation.
function: optional and can be instantiated later.
"""
self.hass = hass
self.logger = logger
self._function = function
self.cooldown = cooldown
self.immediate = immediate
self._timer_task: asyncio.TimerHandle | None = None
self._execute_at_end_of_timer: bool = False
self._execute_lock = asyncio.Lock()
self._job: HassJob | None = None if function is None else HassJob(function)
@property
def function(self) -> Callable[..., Awaitable[Any]] | None:
"""Return the function being wrapped by the Debouncer."""
return self._function
@function.setter
def function(self, function: Callable[..., Awaitable[Any]]) -> None:
"""Update the function being wrapped by the Debouncer."""
self._function = function
if self._job is None or function != self._job.target:
self._job = HassJob(function)
async def async_call(self) -> None:
"""Call the function."""
assert self._job is not None
if self._timer_task:
if not self._execute_at_end_of_timer:
self._execute_at_end_of_timer = True
return
# Locked means a call is in progress. Any call is good, so abort.
if self._execute_lock.locked():
return
if not self.immediate:
self._execute_at_end_of_timer = True
self._schedule_timer()
return
async with self._execute_lock:
# Abort if timer got set while we're waiting for the lock.
if self._timer_task:
return
task = self.hass.async_run_hass_job(self._job)
if task:
await task
self._schedule_timer()
async def _handle_timer_finish(self) -> None:
"""Handle a finished timer."""
assert self._job is not None
self._timer_task = None
if not self._execute_at_end_of_timer:
return
self._execute_at_end_of_timer = False
# Locked means a call is in progress. Any call is good, so abort.
if self._execute_lock.locked():
return
async with self._execute_lock:
# Abort if timer got set while we're waiting for the lock.
if self._timer_task:
return # type: ignore[unreachable]
try:
task = self.hass.async_run_hass_job(self._job)
if task:
await task
except Exception: # pylint: disable=broad-except
self.logger.exception("Unexpected exception from %s", self.function)
self._schedule_timer()
@callback
def async_cancel(self) -> None:
"""Cancel any scheduled call."""
if self._timer_task:
self._timer_task.cancel()
self._timer_task = None
self._execute_at_end_of_timer = False
@callback
def _schedule_timer(self) -> None:
"""Schedule a timer."""
self._timer_task = self.hass.loop.call_later(
self.cooldown,
lambda: self.hass.async_create_task(self._handle_timer_finish()),
)