forked from AbirHasan2005/PDisk-Videos-Search
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
97 lines (83 loc) · 2.99 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# (c) @AbirHasan2005
import os
import asyncio
import traceback
from dotenv import (
load_dotenv
)
from pyrogram import (
Client,
filters,
idle
)
from pyrogram.types import (
Message,
InlineKeyboardMarkup,
InlineKeyboardButton
)
from pyrogram.errors import (
MessageNotModified
)
from core.search_video import search_pdisk_videos
if os.path.exists("configs.env"):
load_dotenv("configs.env")
class Configs(object):
API_ID = int(os.environ.get("API_ID", 0))
API_HASH = os.environ.get("API_HASH", "")
BOT_TOKEN = os.environ.get("BOT_TOKEN", "")
PDISK_USERNAME = os.environ.get("PDISK_USERNAME", "")
PDISK_PASSWORD = os.environ.get("PDISK_PASSWORD", "")
MAX_RESULTS = int(os.environ.get("MAX_RESULTS", 5))
# Which PDisk Domain?
PDISK_DOMAINS = [
"https://www.cofilink.com/",
"https://www.pdisk1.net/",
"https://www.pdisk.net/"
]
PDISK_DOMAIN = os.environ.get("PDISK_DOMAIN", PDISK_DOMAINS[2])
PDiskBot = Client(
session_name=":memory:",
api_id=Configs.API_ID,
api_hash=Configs.API_HASH,
bot_token=Configs.BOT_TOKEN
)
@PDiskBot.on_message(filters.command("start") & ~filters.edited)
async def start_handler(_, m: Message):
await m.reply_text("Hi, I am Alive!\n\nSearch using /request command.", quote=True)
@PDiskBot.on_message(filters.command("request", prefixes=["#", "/"]) & ~filters.edited, group=-1)
async def text_handler(_, m: Message):
if len(m.command) < 2:
return await m.reply_text("Search Query Missing!")
editable = await m.reply_text("Please Wait ...", quote=True)
response = await search_pdisk_videos(m.text.split(" ", 1)[-1], Configs.PDISK_USERNAME, Configs.PDISK_PASSWORD)
if isinstance(response, Exception):
traceback.print_exc()
try: await editable.edit("Failed to search!",
reply_markup=InlineKeyboardMarkup([
[InlineKeyboardButton("Sumpot Group", url="https://t.me/JoinOT")]
]))
except MessageNotModified: pass
elif not response["data"]["list"]:
try: await editable.edit("Not Found!")
except MessageNotModified: pass
else:
data = response["data"]["list"]
text = ""
count = 0
for i in range(len(data)):
if count > Configs.MAX_RESULTS:
break
count += 1
text += f"**Title:** `{data[i]['title']}`\n" \
f"**Description:** `{data[i]['description']}`\n" \
f"**PDisk Link:** {Configs.PDISK_DOMAIN + 'share-video?videoid=' + data[i]['share_link'].split('=', 1)[-1]}\n\n"
try: await editable.edit(text, disable_web_page_preview=True)
except MessageNotModified: pass
async def run():
await PDiskBot.start()
print("\n\nBot Started!\n\n")
await idle()
await PDiskBot.stop()
print("\n\nBot Stopped!\n\n")
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(run())