forked from CyniteOfficial/Auto-Filter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinline.py
104 lines (83 loc) · 3.41 KB
/
inline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
from urllib.parse import quote
from pyrogram import Client, emoji, filters
from pyrogram.errors import UserNotParticipant
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultCachedDocument
from utils import get_search_results
from info import MAX_RESULTS, CACHE_TIME, SHARE_BUTTON_TEXT, AUTH_USERS, AUTH_CHANNEL
logger = logging.getLogger(__name__)
@Client.on_inline_query(filters.user(AUTH_USERS) if AUTH_USERS else None)
async def answer(bot, query):
"""Show search results for given inline query"""
if AUTH_CHANNEL and not await is_subscribed(bot, query):
await query.answer(results=[],
cache_time=0,
switch_pm_text='You have to subscribe channel',
switch_pm_parameter="subscribe")
return
results = []
if '|' in query.query:
string, file_type = query.query.split('|', maxsplit=1)
string = string.strip()
file_type = file_type.strip().lower()
else:
string = query.query.strip()
file_type = None
offset = int(query.offset or 0)
reply_markup = get_reply_markup(bot.username, query=string)
files, next_offset = await get_search_results(string,
file_type=file_type,
max_results=MAX_RESULTS,
offset=offset)
for file in files:
results.append(
InlineQueryResultCachedDocument(
title=file.file_name,
file_id=file.file_id,
caption=file.caption or "",
description=f'Size: {get_size(file.file_size)}\nType: {file.file_type}',
reply_markup=reply_markup))
if results:
switch_pm_text = f"{emoji.FILE_FOLDER} Results"
if string:
switch_pm_text += f" for {string}"
await query.answer(results=results,
cache_time=CACHE_TIME,
switch_pm_text=switch_pm_text,
switch_pm_parameter="start",
next_offset=str(next_offset))
else:
switch_pm_text = f'{emoji.CROSS_MARK} No results'
if string:
switch_pm_text += f' for "{string}"'
await query.answer(results=[],
cache_time=CACHE_TIME,
switch_pm_text=switch_pm_text,
switch_pm_parameter="okay")
def get_reply_markup(username, query):
url = 't.me/share/url?url=' + quote(SHARE_BUTTON_TEXT.format(username=username))
buttons = [[
InlineKeyboardButton('Search again', switch_inline_query_current_chat=query),
InlineKeyboardButton('Share bot', url=url),
]]
return InlineKeyboardMarkup(buttons)
def get_size(size):
"""Get size in readable format"""
units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"]
size = float(size)
i = 0
while size >= 1024.0 and i < len(units):
i += 1
size /= 1024.0
return "%.2f %s" % (size, units[i])
async def is_subscribed(bot, query):
try:
user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id)
except UserNotParticipant:
pass
except Exception as e:
logger.exception(e)
else:
if not user.status == 'kicked':
return True
return False