forked from Snapmali/discord-monitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDiscordMonitor.py
453 lines (409 loc) · 18.6 KB
/
DiscordMonitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import datetime
import os
import platform
import traceback
import discord
from aiohttp import ClientConnectorError, ClientProxyConnectionError, InvalidURL
from plyer import notification
from pytz import timezone as tz
from Config import config
from Log import add_log
from PushTextProcessor import PushTextProcessor
from QQPush import QQPush
# Log file path
log_path = 'discord_monitor.log'
# Timezone
timezone = tz('Asia/Shanghai')
img_MIME = ["image/png", "image/jpeg", "image/gif"]
class DiscordMonitor(discord.Client):
def __init__(self, **kwargs):
if config.proxy:
discord.Client.__init__(self, proxy=config.proxy, **kwargs)
else:
discord.Client.__init__(self, **kwargs)
self.message_user = config.message_monitor.users
self.message_channel = config.message_monitor.channel_ids
self.message_channel_name = config.message_monitor.channel_names
self.user_dynamic_user = config.user_dynamic_monitor.users
self.user_dynamic_server = config.user_dynamic_monitor.servers
self.qq_push = QQPush()
self.push_text_processor = PushTextProcessor()
self.event_set = set()
self.status_dict = {'online': '在线', 'offline': '离线', 'idle': '闲置', 'dnd': '请勿打扰'}
self.username_dict = {}
if platform.system() == 'Windows' and platform.release() == '10' and config.toast:
self.do_toast = True
else:
self.do_toast = False
self.message_monitoring = True
self.user_monitoring = True
if 0 in self.message_channel and len(self.message_channel_name) == 0:
self.message_monitoring = False
if 0 in self.user_dynamic_server or len(self.user_dynamic_user) == 0:
self.user_monitoring = False
def is_monitored_object(self, user, channel, server, user_dynamic=False):
"""
判断事件是否由被检测对象发出
:param channel: 动态来源Channel
:param user_dynamic:是否为用户动态
:param user:动态来源用户
:param server:动态来源Server
:return:
"""
# 用户动态
if user_dynamic:
# 被检测用户列表为空
if len(self.user_dynamic_user) == 0:
return False
# 用户id在列表中 且 server在列表中或列表为空
elif str(user.id) in self.user_dynamic_user and \
(server.id in self.user_dynamic_server or len(self.user_dynamic_server) == 0):
return True
# 消息动态
else:
# 被检测用户列表为空 或 用户id在列表中
if len(self.message_user) == 0 or str(user.id) in self.message_user:
# 被检测频道列表为空 或 频道在列表中 或 频道名称在列表中
if (len(self.message_channel) == 0 or channel.id in self.message_channel or
(server.name in self.message_channel_name and channel.name in self.message_channel_name[server.name])):
return True
return False
async def process_message(self, message: discord.Message, status):
"""
处理消息动态,并生成推送消息文本及log
:param message: Message
:param status: 消息动态
:return:
"""
content_cat = self.push_text_processor.get_content_cat(message.content)
if not content_cat and content_cat != "":
return
attachment_urls = list()
image_cqcodes = list()
for attachment in message.attachments:
attachment_urls.append(attachment.url)
if attachment.content_type in img_MIME:
# 尝试利用discord.py加载图片为base64,使用代理情况下会无法连接
#image = await attachment.read(use_cached=False)
#image_base64 = base64.b64encode(image).decode("utf8")
#image_cqcodes.append(f"[CQ:image,file=base64://{image_base64}==,timeout=5]")
image_cqcodes.append(f"[CQ:image,file={attachment.url},timeout=5]")
for embed in message.embeds:
if embed.image.proxy_url:
image_cqcodes.append(f"[CQ:image,file={embed.image.proxy_url},timeout=5]")
attachment_urls.append(embed.image.proxy_url)
attachment_str = ' ; '.join(attachment_urls)
image_str = "".join(image_cqcodes)
content = self.push_text_processor.sub(message.content)
if self.do_toast:
if status == '标注消息':
toast_title = '%s #%s %s' % (message.guild.name, message.channel.name, status)
elif len(self.message_user) != 0:
toast_title = '%s %s' % (self.message_user[str(message.author.id)], status)
else:
toast_title = '%s %s' % (message.author.name, status)
if len(content) >= 240:
toast_text = content[:240] + "..." if len(message.attachments) == 0 else content + "..." + "[附件]"
else:
toast_text = content if len(message.attachments) == 0 else content + "[附件]"
notification.notify(toast_title, toast_text, app_icon='icon.ico', app_name='Discord Monitor')
if len(attachment_str) > 0:
attachment_log = '. Attachment: ' + attachment_str
else:
attachment_log = ''
if status == '发送消息':
t = message.created_at.replace(tzinfo=datetime.timezone.utc).astimezone(timezone).strftime(
'%Y/%m/%d %H:%M:%S')
else:
t = datetime.datetime.now(tz=timezone).strftime('%Y/%m/%d %H:%M:%S')
log_text = '%s: ID: %d. Username: %s. Server: %s. Channel: %s. Content: %s%s' % \
(status, message.author.id,
message.author.name + '#' + message.author.discriminator,
message.guild.name, message.channel.name, message.content, attachment_log)
add_log(0, 'Discord', log_text)
keywords = {"type": status,
"user_id": str(message.author.id),
"user_name": message.author.name,
"user_discriminator": message.author.discriminator,
"channel_id:": str(message.channel.id),
"channel_name": message.channel.name,
"server_id": str(message.guild.id),
"server_name": message.guild.name,
"content": self.push_text_processor.escape_cqcode(content),
"content_cat": content_cat,
"attachment": attachment_str,
"image": image_str,
"time": t,
"timezone": timezone.zone}
if len(self.message_user) != 0:
keywords["user_display_name"] = self.message_user[str(message.author.id)]
else:
keywords["user_display_name"] = message.author.name + '#' + message.author.discriminator
push_text = self.push_text_processor.push_text_process(keywords, is_user_dynamic=False)
asyncio.create_task(self.qq_push.push_message(push_text, 1))
async def process_user_update(self, before, after, user: discord.Member, status):
"""
处理用户动态,并生成推送消息文本及log
未指定被检测用户时应无法进入此方法
:param before:
:param after:
:param user: Member或User
:param status: 事件类型
:return:
"""
if self.do_toast:
toast_title = '%s %s' % (self.user_dynamic_user[str(user.id)], status)
toast_text = '变更后:%s' % after
notification.notify(toast_title, toast_text[:250], app_icon='icon.ico', app_name='Discord Monitor')
t = datetime.datetime.now(tz=timezone).strftime('%Y/%m/%d %H:%M:%S')
log_text = '%s: ID: %d. Username: %s. Server: %s. Before: %s. After: %s.' % \
(status, user.id,
user.name + '#' + user.discriminator,
user.guild.name, before, after)
add_log(0, 'Discord', log_text)
keywords = {"type": status,
"user_id": user.id,
"user_name": user.name,
"user_discriminator": user.discriminator,
"user_display_name": self.user_dynamic_user[str(user.id)],
"server_id": str(user.guild.id),
"server_name": user.guild.name,
"before": before,
"after": after,
"time": t,
"timezone": timezone.zone}
push_text = self.push_text_processor.push_text_process(keywords, is_user_dynamic=True)
asyncio.create_task(self.qq_push.push_message(push_text, 2))
async def on_ready(self, *args, **kwargs):
"""
完全准备好时触发,暂时用于处理大型服务器中无法接收消息的问题,随时可能被依赖库修复
:param args:
:param kwargs:
:return:
"""
if not self.user.bot:
for guild in self.guilds:
payload = {
"op": 14,
"d": {
"guild_id": str(guild.id),
"typing": True,
"threads": False,
"activities": True,
"members": [],
"channels": {
str(guild.channels[0].id): [
[
0,
99
]
]
}
}
}
asyncio.ensure_future(self.ws.send_as_json(payload), loop=self.loop)
async def on_connect(self):
"""
监听连接事件,每次连接会刷新所监视用户的用户名列表。
重写自discord.Client
***眼来了***
:return:
"""
log_text = 'Logged in as %s, ID: %d.' % (self.user.name + '#' + self.user.discriminator, self.user.id)
print(log_text + '\n')
add_log(0, 'Discord', log_text)
if self.user_monitoring:
for uid in self.user_dynamic_user:
uid = int(uid)
user = None
for guild in self.guilds:
try:
user = await guild.fetch_member(uid)
except:
continue
if user:
self.username_dict[uid] = [user.name, user.discriminator]
else:
log_text = 'Fetching ID %s\'s username failed.' % uid
add_log(2, 'Discord', log_text)
async def on_disconnect(self):
"""
监听断开连接事件,重写自discord.Client
:return:
"""
log_text = 'Disconnected...'
add_log(1, 'Discord', log_text)
print()
async def on_message(self, message):
"""
监听消息发送事件,重写自discord.Client
:param message: Message
:return:
"""
if not self.message_monitoring:
return
# 消息标注事件亦会被捕获,同时其content及attachments为空,需特判排除
if self.is_monitored_object(message.author, message.channel, message.guild) and (message.content != '' or len(message.attachments) > 0):
await self.process_message(message, '发送消息')
async def on_message_delete(self, message):
"""
监听消息删除事件,重写自discord.Client
:param message: Message
:return:
"""
if not self.message_monitoring:
return
if self.is_monitored_object(message.author, message.channel, message.guild):
await self.process_message(message, '删除消息')
async def on_message_edit(self, before, after):
"""
监听消息编辑事件,重写自discord.Client
:param before: Message
:param after: Message
:return:
"""
if not self.message_monitoring:
return
if self.is_monitored_object(after.author, after.channel, after.guild) and before.content != after.content:
await self.process_message(after, '编辑消息')
async def on_guild_channel_pins_update(self, channel, last_pin):
"""
监听频道内标注消息更新事件,重写自discord.Client
:param channel: 频道
:param last_pin: datetime.datetime 最新标注消息的发送时间
:return:
"""
if not self.message_monitoring:
return
if channel.id in self.message_channel or len(self.message_channel) == 0 or \
(channel.guild.name in self.message_channel_name and channel.name in self.message_channel_name[channel.guild.name]):
pins = await channel.pins()
if len(pins) > 0:
await self.process_message(pins[0], '标注消息')
async def on_member_update(self, before, after):
"""
监听用户状态更新事件,重写自discord.Client
:param before: Member
:param after: Member
:return:
"""
if not self.user_monitoring:
return
if self.is_monitored_object(before, None, before.guild, user_dynamic=True):
# 昵称变更
if before.nick != after.nick:
event = str(before.nick) + str(after.nick)
if self.check_event(event):
await self.process_user_update(before.nick, after.nick, before, '昵称更新')
self.delete_event(event)
# 在线状态变更
if before.status != after.status:
event = str(before.id) + str(before.status) + str(after.status)
if self.check_event(event):
await self.process_user_update(self.get_status(before.status), self.get_status(after.status),
before, '状态更新')
self.delete_event(event)
# 用户名或Tag变更
try:
self.username_dict[before.id]
except KeyError:
self.username_dict[before.id] = [after.name, after.discriminator]
if self.username_dict[before.id][0] != after.name or self.username_dict[before.id][1] != after.discriminator:
before_screenname = self.username_dict[before.id][0] + '#' + self.username_dict[before.id][1]
after_screenname = after.name + '#' + after.discriminator
self.username_dict[before.id][0] = after.name
self.username_dict[before.id][1] = after.discriminator
event = before_screenname + after_screenname
if self.check_event(event):
await self.process_user_update(before_screenname, after_screenname, before, '用户名更新')
self.delete_event(event)
# 用户活动变更
if before.activity != after.activity:
if not before.activity:
event = after.activity.name
if self.check_event(event):
await self.process_user_update(None, after.activity.name, before, '活动更新')
self.delete_event(event)
elif not after.activity:
event = before.activity.name
if self.check_event(event):
await self.process_user_update(before.activity.name, None, before, '活动更新')
self.delete_event(event)
elif before.activity.name != after.activity.name:
event = before.activity.name + after.activity.name
if self.check_event(event):
await self.process_user_update(before.activity.name, after.activity.name, before, '活动更新')
self.delete_event(event)
def get_status(self, status):
"""
将api的用户在线状态转换为中文
:param status: api中的用户在线状态
:return: 中文在线状态
"""
status = str(status)
if status in self.status_dict:
return self.status_dict[status]
return status
def check_event(self, event):
"""
检查该事件是否已在用户动态事件set中,不在则将事件加入set,防止眼和监测用户同在多个Server中时重复推送用户动态
:param event: event
:return: True if yes, otherwise False
"""
if event in self.event_set:
return False
else:
self.event_set.add(event)
return True
def delete_event(self, event):
"""
延时删除set中的用户动态事件
:param event: event
:return:
"""
asyncio.get_event_loop().call_later(5, self.event_set.remove, event)
async def close(self):
"""
关闭至discord的连接,以及QQPush模块的连接
:return:
"""
await asyncio.gather(
super(DiscordMonitor, self).close(),
self.qq_push.close()
)
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if config.bot:
intents = discord.Intents.default()
dc = DiscordMonitor(loop=loop, intents=intents)
else:
dc = DiscordMonitor(loop=loop)
try:
print('Logging in...')
loop.run_until_complete(dc.start(config.token))
except (ClientProxyConnectionError, InvalidURL):
print('代理错误,请检查代理设置')
except (TimeoutError, ClientConnectorError):
print('连接超时,请检查连接状态及代理设置')
except discord.errors.LoginFailure:
print('登录失败,请检查Token及bot设置是否正确,或更新Token,或检查是否使用了正确的discord.py依赖库')
except KeyboardInterrupt:
print("用户退出")
except Exception:
print('登录失败,请检查配置文件中各参数是否正确')
traceback.print_exc()
finally:
loop.run_until_complete(dc.close())
# 2022.5.8:
# Windows环境下aiohttp似乎会在程序退出释放内存时自动调用方法关闭事件循环导致报错,在此对平台进行特判
# 暂未测试在Linux下的表现
if platform.system() != 'Windows':
loop.close()
if __name__ == '__main__':
main()
if platform.system() == 'Windows':
os.system('pause')