forked from LonamiWebs/Telethon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
statecache.py
164 lines (142 loc) · 5.57 KB
/
statecache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import inspect
from .tl import types
# Which updates have the following fields?
_has_channel_id = []
# TODO EntityCache does the same. Reuse?
def _fill():
for name in dir(types):
update = getattr(types, name)
if getattr(update, 'SUBCLASS_OF_ID', None) == 0x9f89304e:
cid = update.CONSTRUCTOR_ID
sig = inspect.signature(update.__init__)
for param in sig.parameters.values():
if param.name == 'channel_id' and param.annotation == int:
_has_channel_id.append(cid)
if not _has_channel_id:
raise RuntimeError('FIXME: Did the init signature or updates change?')
# We use a function to avoid cluttering the globals (with name/update/cid/doc)
_fill()
class StateCache:
"""
In-memory update state cache, defaultdict-like behaviour.
"""
def __init__(self, initial, loggers):
# We only care about the pts and the date. By using a tuple which
# is lightweight and immutable we can easily copy them around to
# each update in case they need to fetch missing entities.
self._logger = loggers[__name__]
if initial:
self._pts_date = initial.pts, initial.date
else:
self._pts_date = None, None
def reset(self):
self.__dict__.clear()
self._pts_date = None, None
# TODO Call this when receiving responses too...?
def update(
self,
update,
*,
channel_id=None,
has_pts=frozenset(x.CONSTRUCTOR_ID for x in (
types.UpdateNewMessage,
types.UpdateDeleteMessages,
types.UpdateReadHistoryInbox,
types.UpdateReadHistoryOutbox,
types.UpdateWebPage,
types.UpdateReadMessagesContents,
types.UpdateEditMessage,
types.updates.State,
types.updates.DifferenceTooLong,
types.UpdateShortMessage,
types.UpdateShortChatMessage,
types.UpdateShortSentMessage
)),
has_date=frozenset(x.CONSTRUCTOR_ID for x in (
types.UpdateUserPhoto,
types.UpdateEncryption,
types.UpdateEncryptedMessagesRead,
types.UpdateChatParticipantAdd,
types.updates.DifferenceEmpty,
types.UpdateShortMessage,
types.UpdateShortChatMessage,
types.UpdateShort,
types.UpdatesCombined,
types.Updates,
types.UpdateShortSentMessage,
)),
has_channel_pts=frozenset(x.CONSTRUCTOR_ID for x in (
types.UpdateChannelTooLong,
types.UpdateNewChannelMessage,
types.UpdateDeleteChannelMessages,
types.UpdateEditChannelMessage,
types.UpdateChannelWebPage,
types.updates.ChannelDifferenceEmpty,
types.updates.ChannelDifferenceTooLong,
types.updates.ChannelDifference
)),
check_only=False
):
"""
Update the state with the given update.
"""
cid = update.CONSTRUCTOR_ID
if check_only:
return cid in has_pts or cid in has_date or cid in has_channel_pts
if cid in has_pts:
if cid in has_date:
self._pts_date = update.pts, update.date
else:
self._pts_date = update.pts, self._pts_date[1]
elif cid in has_date:
self._pts_date = self._pts_date[0], update.date
if cid in has_channel_pts:
if channel_id is None:
channel_id = self.get_channel_id(update)
if channel_id is None:
self._logger.info(
'Failed to retrieve channel_id from %s', update)
else:
self.__dict__[channel_id] = update.pts
def get_channel_id(
self,
update,
has_channel_id=frozenset(_has_channel_id),
# Hardcoded because only some with message are for channels
has_message=frozenset(x.CONSTRUCTOR_ID for x in (
types.UpdateNewChannelMessage,
types.UpdateEditChannelMessage
))
):
"""
Gets the **unmarked** channel ID from this update, if it has any.
Fails for ``*difference`` updates, where ``channel_id``
is supposedly already known from the outside.
"""
cid = update.CONSTRUCTOR_ID
if cid in has_channel_id:
return update.channel_id
elif cid in has_message:
if update.message.peer_id is None:
# Telegram sometimes sends empty messages to give a newer pts:
# UpdateNewChannelMessage(message=MessageEmpty(id), pts=pts, pts_count=1)
# Not sure why, but it's safe to ignore them.
self._logger.debug('Update has None peer_id %s', update)
else:
return update.message.peer_id.channel_id
return None
def __getitem__(self, item):
"""
If `item` is `None`, returns the default ``(pts, date)``.
If it's an **unmarked** channel ID, returns its ``pts``.
If no information is known, ``pts`` will be `None`.
"""
if item is None:
return self._pts_date
else:
return self.__dict__.get(item)
def __setitem__(self, where, value):
if where is None:
self._pts_date = value
else:
self.__dict__[where] = value