forked from SilentDemonSD/WZML-X
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmediainfo.py
96 lines (87 loc) · 4.24 KB
/
mediainfo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python3
import aiohttp
from re import search as re_search
from shlex import split as ssplit
from aiofiles import open as aiopen
from aiofiles.os import remove as aioremove, path as aiopath, mkdir
from os import path as ospath, getcwd
from pyrogram.handlers import MessageHandler
from pyrogram.filters import command
from bot import LOGGER, bot, config_dict
from bot.helper.telegram_helper.filters import CustomFilters
from bot.helper.telegram_helper.bot_commands import BotCommands
from bot.helper.telegram_helper.message_utils import editMessage, sendMessage
from bot.helper.ext_utils.bot_utils import cmd_exec
from bot.helper.ext_utils.telegraph_helper import telegraph
async def gen_mediainfo(message, link=None, media=None, mmsg=None):
temp_send = await sendMessage(message, '<i>Generating MediaInfo...</i>')
try:
path = "Mediainfo/"
if not await aiopath.isdir(path):
await mkdir(path)
if link:
filename = re_search(".+/(.+)", link).group(1)
des_path = ospath.join(path, filename)
headers = {"user-agent":"Mozilla/5.0 (Linux; Android 12; 2201116PI) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Mobile Safari/537.36"}
async with aiohttp.ClientSession() as session:
async with session.get(link, headers=headers) as response:
async with aiopen(des_path, "wb") as f:
async for chunk in response.content.iter_chunked(10000000):
await f.write(chunk)
break
elif media:
des_path = ospath.join(path, media.file_name)
if media.file_size <= 50000000:
await mmsg.download(ospath.join(getcwd(), des_path))
else:
async for chunk in bot.stream_media(media, limit=5):
async with aiopen(des_path, "ab") as f:
await f.write(chunk)
stdout, _, _ = await cmd_exec(ssplit(f'mediainfo "{des_path}"'))
tc = f"<h4>📌 {ospath.basename(des_path)}</h4><br><br>"
if len(stdout) != 0:
tc += parseinfo(stdout)
except Exception as e:
LOGGER.error(e)
await editMessage(temp_send, f"MediaInfo Stopped due to {str(e)}")
finally:
await aioremove(des_path)
link_id = (await telegraph.create_page(title='MediaInfo X', content=tc))["path"]
await temp_send.edit(f"<b>MediaInfo:</b>\n\n➲ <b>Link :</b> https://graph.org/{link_id}", disable_web_page_preview=False)
section_dict = {'General': '🗒', 'Video': '🎞', 'Audio': '🔊', 'Text': '🔠', 'Menu': '🗃'}
def parseinfo(out):
tc = ''
trigger = False
for line in out.split('\n'):
for section, emoji in section_dict.items():
if line.startswith(section):
trigger = True
if not line.startswith('General'):
tc += '</pre><br>'
tc += f"<h4>{emoji} {line.replace('Text', 'Subtitle')}</h4>"
break
if trigger:
tc += '<br><pre>'
trigger = False
else:
tc += line + '\n'
tc += '</pre><br>'
return tc
async def mediainfo(_, message):
rply = message.reply_to_message
help_msg = "<b>By replying to media:</b>"
help_msg += f"\n<code>/{BotCommands.MediaInfoCommand[0]} or /{BotCommands.MediaInfoCommand[1]}" + " {media}" + "</code>"
help_msg += "\n\n<b>By reply/sending download link:</b>"
help_msg += f"\n<code>/{BotCommands.MediaInfoCommand[0]} or /{BotCommands.MediaInfoCommand[1]}" + " {link}" + "</code>"
if len(message.command) > 1 or rply and rply.text:
link = rply.text if rply else message.command[1]
return await gen_mediainfo(message, link)
elif rply:
file = next((i for i in [rply.document, rply.video, rply.audio, rply.voice,
rply.animation, rply.video_note] if i is not None), None)
if not file:
return await sendMessage(message, help_msg)
return await gen_mediainfo(message, None, file, rply)
else:
return await sendMessage(message, help_msg)
bot.add_handler(MessageHandler(mediainfo, filters=command(BotCommands.MediaInfoCommand) & CustomFilters.authorized & ~CustomFilters.blacklisted))