forked from SpEcHiDe/UniBorg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
_core.py
90 lines (79 loc) · 3.41 KB
/
_core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import asyncio
import traceback
import os
from datetime import datetime
from uniborg import util
DELETE_TIMEOUT = 5
@borg.on(util.admin_cmd(pattern="load (?P<shortname>\w+)$")) # pylint:disable=E0602
async def load_reload(event):
await event.delete()
shortname = event.pattern_match["shortname"]
try:
if shortname in borg._plugins: # pylint:disable=E0602
borg.remove_plugin(shortname) # pylint:disable=E0602
borg.load_plugin(shortname) # pylint:disable=E0602
msg = await event.respond(f"Successfully (re)loaded plugin {shortname}")
await asyncio.sleep(DELETE_TIMEOUT)
await msg.delete()
except Exception as e: # pylint:disable=C0103,W0703
trace_back = traceback.format_exc()
# pylint:disable=E0602
logger.warn(f"Failed to (re)load plugin {shortname}: {trace_back}")
await event.respond(f"Failed to (re)load plugin {shortname}: {e}")
@borg.on(util.admin_cmd(pattern="(?:unload|remove) (?P<shortname>\w+)$")) # pylint:disable=E0602
async def remove(event):
await event.delete()
shortname = event.pattern_match["shortname"]
if shortname == "_core":
msg = await event.respond(f"Not removing {shortname}")
elif shortname in borg._plugins: # pylint:disable=E0602
borg.remove_plugin(shortname) # pylint:disable=E0602
msg = await event.respond(f"Removed plugin {shortname}")
else:
msg = await event.respond(f"Plugin {shortname} is not loaded")
await asyncio.sleep(DELETE_TIMEOUT)
await msg.delete()
@borg.on(util.admin_cmd(pattern="send plugin (?P<shortname>\w+)$")) # pylint:disable=E0602
async def send_plug_in(event):
if event.fwd_from:
return
message_id = event.message.id
input_str = event.pattern_match["shortname"]
the_plugin_file = "./stdplugins/{}.py".format(input_str)
start = datetime.now()
await borg.send_file( # pylint:disable=E0602
event.chat_id,
the_plugin_file,
force_document=True,
allow_cache=False,
reply_to=message_id
)
end = datetime.now()
time_taken_in_ms = (end - start).seconds
await event.edit("Uploaded {} in {} seconds".format(input_str, time_taken_in_ms))
await asyncio.sleep(DELETE_TIMEOUT)
await event.delete()
@borg.on(util.admin_cmd(pattern="install plugin")) # pylint:disable=E0602
async def install_plug_in(event):
if event.fwd_from:
return
if event.reply_to_msg_id:
try:
downloaded_file_name = await borg.download_media( # pylint:disable=E0602
await event.get_reply_message(),
borg._plugin_path # pylint:disable=E0602
)
if "(" not in downloaded_file_name:
borg.load_plugin_from_file(downloaded_file_name) # pylint:disable=E0602
await event.edit("Installed Plugin `{}`".format(os.path.basename(downloaded_file_name)))
else:
os.remove(downloaded_file_name)
await event.edit("Errors! Cannot install this plugin.")
except Exception as e: # pylint:disable=C0103,W0703
await event.edit(str(e))
os.remove(downloaded_file_name)
await asyncio.sleep(DELETE_TIMEOUT)
await event.delete()