forked from home-assistant/core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdebounce.py
182 lines (151 loc) · 5.73 KB
/
debounce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""Debounce helper."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from logging import Logger
from homeassistant.core import HassJob, HomeAssistant, callback
class Debouncer[_R_co]:
"""Class to rate limit calls to a specific command."""
def __init__(
self,
hass: HomeAssistant,
logger: Logger,
*,
cooldown: float,
immediate: bool,
function: Callable[[], _R_co] | None = None,
background: bool = False,
) -> None:
"""Initialize debounce.
immediate: indicate if the function needs to be called right away and
wait <cooldown> until executing next invocation.
function: optional and can be instantiated later.
"""
self.hass = hass
self.logger = logger
self._function = function
self.cooldown = cooldown
self.immediate = immediate
self._timer_task: asyncio.TimerHandle | None = None
self._execute_at_end_of_timer: bool = False
self._execute_lock = asyncio.Lock()
self._background = background
self._job: HassJob[[], _R_co] | None = (
None
if function is None
else HassJob(
function, f"debouncer cooldown={cooldown}, immediate={immediate}"
)
)
self._shutdown_requested = False
@property
def function(self) -> Callable[[], _R_co] | None:
"""Return the function being wrapped by the Debouncer."""
return self._function
@function.setter
def function(self, function: Callable[[], _R_co]) -> None:
"""Update the function being wrapped by the Debouncer."""
self._function = function
if self._job is None or function != self._job.target:
self._job = HassJob(
function,
f"debouncer cooldown={self.cooldown}, immediate={self.immediate}",
)
@callback
def async_schedule_call(self) -> None:
"""Schedule a call to the function."""
if self._async_schedule_or_call_now():
self._execute_at_end_of_timer = True
self._on_debounce()
def _async_schedule_or_call_now(self) -> bool:
"""Check if a call should be scheduled.
Returns True if the function should be called immediately.
Returns False if there is nothing to do.
"""
if self._shutdown_requested:
self.logger.debug("Debouncer call ignored as shutdown has been requested.")
return False
if self._timer_task:
if not self._execute_at_end_of_timer:
self._execute_at_end_of_timer = True
return False
# Locked means a call is in progress. Any call is good, so abort.
if self._execute_lock.locked():
return False
if not self.immediate:
self._execute_at_end_of_timer = True
self._schedule_timer()
return False
return True
async def async_call(self) -> None:
"""Call the function."""
if not self._async_schedule_or_call_now():
return
async with self._execute_lock:
# Abort if timer got set while we're waiting for the lock.
if self._timer_task:
return
assert self._job is not None
try:
if task := self.hass.async_run_hass_job(
self._job, background=self._background
):
await task
finally:
self._schedule_timer()
async def _handle_timer_finish(self) -> None:
"""Handle a finished timer."""
assert self._job is not None
self._execute_at_end_of_timer = False
# Locked means a call is in progress. Any call is good, so abort.
if self._execute_lock.locked():
return
async with self._execute_lock:
# Abort if timer got set while we're waiting for the lock.
if self._timer_task:
return
try:
if task := self.hass.async_run_hass_job(
self._job, background=self._background
):
await task
except Exception:
self.logger.exception("Unexpected exception from %s", self.function)
finally:
# Schedule a new timer to prevent new runs during cooldown
self._schedule_timer()
@callback
def async_shutdown(self) -> None:
"""Cancel any scheduled call, and prevent new runs."""
self._shutdown_requested = True
self.async_cancel()
@callback
def async_cancel(self) -> None:
"""Cancel any scheduled call."""
if self._timer_task:
self._timer_task.cancel()
self._timer_task = None
self._execute_at_end_of_timer = False
@callback
def _on_debounce(self) -> None:
"""Create job task, but only if pending."""
self._timer_task = None
if not self._execute_at_end_of_timer:
return
self._execute_at_end_of_timer = False
name = f"debouncer {self._job} finish cooldown={self.cooldown}, immediate={self.immediate}"
if not self._background:
self.hass.async_create_task(
self._handle_timer_finish(), name, eager_start=True
)
return
self.hass.async_create_background_task(
self._handle_timer_finish(), name, eager_start=True
)
@callback
def _schedule_timer(self) -> None:
"""Schedule a timer."""
if not self._shutdown_requested:
self._timer_task = self.hass.loop.call_later(
self.cooldown, self._on_debounce
)