forked from Tasshack/dreame-vacuum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcamera.py
352 lines (306 loc) · 12.6 KB
/
camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
from __future__ import annotations
import collections
import time
import asyncio
from typing import Any, Dict
from dataclasses import dataclass
from datetime import datetime
from functools import partial
from aiohttp import web
from homeassistant.components.camera import Camera, CameraEntityDescription
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_UNAVAILABLE, CONTENT_TYPE_MULTIPART
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers import entity_registry
from .const import DOMAIN, CONF_COLOR_SCHEME, CONF_ICON_SET, CONF_MAP_OBJECTS, MAP_OBJECTS, ATTR_CALIBRATION, CONTENT_TYPE, LOGGER
from .coordinator import DreameVacuumDataUpdateCoordinator
from .entity import DreameVacuumEntity, DreameVacuumEntityDescription
from .dreame.map import DreameVacuumMapRenderer, DreameVacuumMapDataRenderer
@dataclass
class DreameVacuumCameraEntityDescription(
DreameVacuumEntityDescription, CameraEntityDescription
):
"""Describes Dreame Vacuum Camera entity."""
map_data_json: bool = False
CAMERAS: tuple[CameraEntityDescription, ...] = (
DreameVacuumCameraEntityDescription(
key="map", icon="mdi:map"
),
DreameVacuumCameraEntityDescription(
key="map_data",
icon="mdi:map",
entity_category=EntityCategory.CONFIG,
map_data_json=True,
entity_registry_enabled_default=False,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Dreame Vacuum Camera based on a config entry."""
coordinator: DreameVacuumDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]
color_scheme = entry.options.get(CONF_COLOR_SCHEME)
icon_set = entry.options.get(CONF_ICON_SET)
map_objects = entry.options.get(CONF_MAP_OBJECTS, MAP_OBJECTS.keys())
if coordinator.device.status.map_available:
async_add_entities(
DreameVacuumCameraEntity(coordinator, description, color_scheme, icon_set, map_objects)
for description in CAMERAS
)
update_map_cameras = partial(
async_update_map_cameras, coordinator, {}, async_add_entities, color_scheme, icon_set, map_objects
)
coordinator.async_add_listener(update_map_cameras)
update_map_cameras()
@callback
def async_update_map_cameras(
coordinator: DreameVacuumDataUpdateCoordinator,
current: dict[str, list[DreameVacuumCameraEntity]],
async_add_entities,
color_scheme: str,
icon_set: str,
map_objects: list[str],
) -> None:
new_indexes = set(
[k for k in range(1, len(coordinator.device.status.map_list) + 1)])
current_ids = set(current)
new_entities = []
for map_index in current_ids - new_indexes:
async_remove_map_cameras(map_index, coordinator, current)
for map_index in new_indexes - current_ids:
current[map_index] = [
DreameVacuumCameraEntity(
coordinator,
DreameVacuumCameraEntityDescription(
entity_category=EntityCategory.CONFIG,
icon="mdi:map-search",
),
color_scheme,
icon_set,
map_objects,
map_index,
)
]
new_entities = new_entities + current[map_index]
if new_entities:
async_add_entities(new_entities)
def async_remove_map_cameras(
map_index: str,
coordinator: DreameVacuumDataUpdateCoordinator,
current: dict[str, DreameVacuumCameraEntity],
) -> None:
registry = entity_registry.async_get(coordinator.hass)
entities = current[map_index]
for entity in entities:
if entity.entity_id in registry.entities:
registry.async_remove(entity.entity_id)
del current[map_index]
class DreameVacuumCameraEntity(DreameVacuumEntity, Camera):
"""Defines a Dreame Vacuum Camera entity."""
def __init__(
self,
coordinator: DreameVacuumDataUpdateCoordinator,
description: DreameVacuumCameraEntityDescription,
color_scheme: str = None,
icon_set: str = None,
map_objects: list[str] = None,
map_index: int = 0,
) -> None:
"""Initialize a Dreame Vacuum Camera entity."""
super().__init__(coordinator, description)
self.content_type = CONTENT_TYPE
self.stream = None
self.access_tokens = collections.deque([], 2)
self.async_update_token()
self._rtsp_to_webrtc = False
self._should_poll = True
self._last_updated = -1
self._frame_id = -1
self._last_map_request = 0
self._attr_is_streaming = True
self._calibration_points = None
self._available = self.device.device_connected and self.device.cloud_connected
if description.map_data_json:
self._renderer = DreameVacuumMapDataRenderer()
else:
self._renderer = DreameVacuumMapRenderer(color_scheme, icon_set, map_objects, self.device.status.robot_shape)
self._image = self._renderer.default_map_image
self._default_map = True
self.map_index = map_index
self._state = STATE_UNAVAILABLE
map_data = self._map_data
if map_data:
self._map_id = map_data.map_id
if self.map_index:
if map_data:
self._map_name = map_data.custom_name
else:
self._map_name = None
self._set_map_name()
self._attr_unique_id = f"{self.device.mac}_map_{self.map_index}"
self.entity_id = f"camera.{self.device.name.lower()}_map_{self.map_index}"
else:
self._attr_name = f"{self.device.name} Current {description.name}"
self._attr_unique_id = f"{self.device.mac}_map_{description.key}"
self.entity_id = (
f"camera.{self.device.name.lower()}_{description.key.lower()}"
)
self.update()
def _set_map_name(self) -> None:
name = (
f"{self.map_index}"
if self._map_name is None
else f"{self._map_name.replace('_', ' ').replace('-', ' ').title()}"
)
self._attr_name = f"{self.device.name} Saved Map {name}"
@callback
def _handle_coordinator_update(self) -> None:
"""Fetch state from the device."""
self._available = self.device.cloud_connected
self._last_map_request = 0
map_data = self._map_data
if (
map_data
and self.available
and (self.map_index > 0 or self.device.status.located)
):
if self.map_index > 0:
if self._map_name != map_data.custom_name:
self._map_name = map_data.custom_name
self._set_map_name()
if self._map_id != map_data.map_id:
self._map_id = map_data.map_id
self._frame_id = None
self._last_updated = None
if (
self._default_map == True or
self._frame_id != map_data.frame_id
):
self._frame_id = map_data.frame_id
if not self.device.status.active:
self.update()
else:
self.update()
self.async_write_ha_state()
async def async_camera_image(
self, width: int | None = None, height: int | None = None
) -> bytes | None:
if self._should_poll is True:
self._should_poll = False
now = time.time()
if now - self._last_map_request >= self.frame_interval:
self._last_map_request = now
if self.map_index == 0:
self.device.update_map()
self.update()
self._should_poll = True
return self._image
async def handle_async_still_stream(
self, request: web.Request, interval: float
) -> web.StreamResponse:
"""Generate an HTTP MJPEG stream from camera images."""
response = web.StreamResponse()
response.content_type = CONTENT_TYPE_MULTIPART.format(
"--frameboundary")
await response.prepare(request)
last_image = None
while True:
img_bytes = await self.async_camera_image()
if not img_bytes:
img_bytes = self._default_map_image
if img_bytes != last_image:
# Always write twice, otherwise chrome ignores last frame and displays previous frame after second one
for k in range(2):
await response.write(
bytes(
"--frameboundary\r\n"
"Content-Type: {}\r\n"
"Content-Length: {}\r\n\r\n".format(
self.content_type, len(img_bytes)),
"utf-8",
)
+ img_bytes
+ b"\r\n"
)
last_image = img_bytes
await asyncio.sleep(interval)
return response
def update(self) -> None:
map_data = self._map_data
if (
map_data
and self.available
and (self.map_index > 0 or self.device.status.located)
):
if self.map_index == 0 and not self.entity_description.map_data_json and map_data.last_updated != self._last_updated and not self._renderer.render_complete:
LOGGER.warning("Waiting render complete")
if (
self._renderer.render_complete
and map_data.last_updated != self._last_updated
):
if self.map_index == 0 and not self.entity_description.map_data_json:
LOGGER.debug("Update map")
self._last_updated = map_data.last_updated
self._frame_id = map_data.frame_id
self._default_map = False
if map_data.timestamp_ms and not map_data.saved_map:
self._state = datetime.fromtimestamp(
int(map_data.timestamp_ms / 1000)
)
elif map_data.last_updated:
self._state = datetime.fromtimestamp(
int(map_data.last_updated))
self.coordinator.hass.async_create_task(self._update_image(
self.device.get_map_for_render(self.map_index), self.device.status.robot_status))
elif not self._default_map:
self._image = self._default_map_image
self._default_map = True
self._frame_id = -1
self._last_updated = -1
self._state = STATE_UNAVAILABLE
async def _update_image(self, map_data, robot_status) -> None:
self._image = self._renderer.render_map(map_data, robot_status)
if not self.entity_description.map_data_json and self._calibration_points != self._renderer.calibration_points:
self._calibration_points = self._renderer.calibration_points
self.coordinator.async_set_updated_data()
@property
def _map_data(self) -> Any:
return self.device.get_map(self.map_index)
@property
def _default_map_image(self) -> Any:
if self._image and (not self.device.device_connected or not self.device.cloud_connected):
return self._renderer.disconnected_map_image
return self._renderer.default_map_image
@property
def frame_interval(self) -> float:
return 0.25
@property
def supported_features(self) -> int:
return 0
@property
def state(self) -> str:
"""Return the status of the map."""
return self._state
@property
def available(self) -> bool:
"""Return True if entity is available."""
return self._available
@property
def extra_state_attributes(self) -> Dict[str, Any]:
if not self.entity_description.map_data_json:
map_data = self._map_data
if (
map_data
and not map_data.empty_map
and (self.map_index > 0 or self.device.status.located)
):
attributes = map_data.as_dict()
if attributes:
attributes[ATTR_CALIBRATION] = self._calibration_points if self._calibration_points else self._renderer.calibration_points
return attributes
elif self.available:
return {ATTR_CALIBRATION: self._renderer.default_calibration_points}