forked from Rishikesh-Sharma09/Auto-Filter-Bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathia_filterdb.py
141 lines (125 loc) · 4.15 KB
/
ia_filterdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import logging
from struct import pack
import re
import base64
from pyrogram.file_id import FileId
from pymongo.errors import DuplicateKeyError
from umongo import Instance, Document, fields
from motor.motor_asyncio import AsyncIOMotorClient
from marshmallow.exceptions import ValidationError
from info import DATABASE_URL, DATABASE_NAME, COLLECTION_NAME, MAX_BTN
client = AsyncIOMotorClient(DATABASE_URL)
db = client[DATABASE_NAME]
instance = Instance.from_db(db)
@instance.register
class Media(Document):
file_id = fields.StrField(attribute='_id')
file_name = fields.StrField(required=True)
file_size = fields.IntField(required=True)
caption = fields.StrField(allow_none=True)
class Meta:
indexes = ('$file_name', )
collection_name = COLLECTION_NAME
async def save_file(media):
"""Save file in database"""
# TODO: Find better way to get same file_id for same media to avoid duplicates
file_id = unpack_new_file_id(media.file_id)
file_name = re.sub(r"@\w+|(_|\-|\.|\+)", " ", str(media.file_name))
file_caption = re.sub(r"@\w+|(_|\-|\.|\+)", " ", str(media.caption))
try:
file = Media(
file_id=file_id,
file_name=file_name,
file_size=media.file_size,
caption=file_caption
)
except ValidationError:
print(f'Saving Error - {file_name}')
return 'err'
else:
try:
await file.commit()
except DuplicateKeyError:
print(f'Already Saved - {file_name}')
return 'dup'
else:
print(f'Saved - {file_name}')
return 'suc'
async def get_search_results(query, max_results=MAX_BTN, offset=0, lang=None):
query = query.strip()
if not query:
raw_pattern = '.'
elif ' ' not in query:
raw_pattern = r'(\b|[\.\+\-_])' + query + r'(\b|[\.\+\-_])'
else:
raw_pattern = query.replace(' ', r'.*[\s\.\+\-_]')
try:
regex = re.compile(raw_pattern, flags=re.IGNORECASE)
except:
regex = query
filter = {'file_name': regex}
cursor = Media.find(filter)
# Sort by recent
cursor.sort('$natural', -1)
if lang:
lang_files = [file async for file in cursor if lang in file.file_name.lower()]
files = lang_files[offset:][:max_results]
total_results = len(lang_files)
next_offset = offset + max_results
if next_offset >= total_results:
next_offset = ''
return files, next_offset, total_results
# Slice files according to offset and max results
cursor.skip(offset).limit(max_results)
# Get list of files
files = await cursor.to_list(length=max_results)
total_results = await Media.count_documents(filter)
next_offset = offset + max_results
if next_offset >= total_results:
next_offset = ''
return files, next_offset, total_results
async def delete_files(query):
query = query.strip()
if not query:
raw_pattern = '.'
elif ' ' not in query:
raw_pattern = r'(\b|[\.\+\-_])' + query + r'(\b|[\.\+\-_])'
else:
raw_pattern = query.replace(' ', r'.*[\s\.\+\-_]')
try:
regex = re.compile(raw_pattern, flags=re.IGNORECASE)
except:
regex = query
filter = {'file_name': regex}
total = await Media.count_documents(filter)
files = Media.find(filter)
return total, files
async def get_file_details(query):
filter = {'file_id': query}
cursor = Media.find(filter)
filedetails = await cursor.to_list(length=1)
return filedetails
def encode_file_id(s: bytes) -> str:
r = b""
n = 0
for i in s + bytes([22]) + bytes([4]):
if i == 0:
n += 1
else:
if n:
r += b"\x00" + bytes([n])
n = 0
r += bytes([i])
return base64.urlsafe_b64encode(r).decode().rstrip("=")
def unpack_new_file_id(new_file_id):
decoded = FileId.decode(new_file_id)
file_id = encode_file_id(
pack(
"<iiqq",
int(decoded.file_type),
decoded.dc_id,
decoded.media_id,
decoded.access_hash
)
)
return file_id