Skip to content

Commit

Permalink
add flood control and wildcard support to upload and closes 237
Browse files Browse the repository at this point in the history
  • Loading branch information
rking32 committed Jan 14, 2021
1 parent 0ad9034 commit b6d4878
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 77 deletions.
26 changes: 16 additions & 10 deletions userge/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from userge.plugins import get_all_plugins
from .methods import Methods
from .ext import RawClient, pool
from .database import _close_db

_LOG = logging.getLogger(__name__)
_LOG_STR = "<<<! ##### %s ##### !>>>"
Expand Down Expand Up @@ -103,7 +104,7 @@ async def reload_plugins(self) -> int:
return len(reloaded)


class _UsergeBot(_AbstractUserge):
class UsergeBot(_AbstractUserge):
""" UsergeBot, the bot """
def __init__(self, **kwargs) -> None:
_LOG.info(_LOG_STR, "Setting UsergeBot Configs")
Expand Down Expand Up @@ -131,12 +132,13 @@ def __init__(self, **kwargs) -> None:
kwargs['bot_token'] = Config.BOT_TOKEN
if Config.HU_STRING_SESSION and Config.BOT_TOKEN:
RawClient.DUAL_MODE = True
kwargs['bot'] = _UsergeBot(bot=self, **kwargs)
kwargs['bot'] = UsergeBot(bot=self, **kwargs)
kwargs['session_name'] = Config.HU_STRING_SESSION or ":memory:"
super().__init__(**kwargs)
self.executor.shutdown()

@property
def bot(self) -> Union['_UsergeBot', 'Userge']:
def bot(self) -> Union['UsergeBot', 'Userge']:
""" returns usergebot """
if self._bot is None:
if Config.BOT_TOKEN:
Expand All @@ -146,7 +148,7 @@ def bot(self) -> Union['_UsergeBot', 'Userge']:

async def start(self) -> None:
""" start client and bot """
pool._start() # pylint: disable=protected-access
self.executor = pool._get() # pylint: disable=protected-access
_LOG.info(_LOG_STR, "Starting Userge")
await super().start()
if self._bot is not None:
Expand All @@ -161,7 +163,8 @@ async def stop(self) -> None: # pylint: disable=arguments-differ
await self._bot.stop()
_LOG.info(_LOG_STR, "Stopping Userge")
await super().stop()
await pool._stop() # pylint: disable=protected-access
_close_db()
pool._stop() # pylint: disable=protected-access

def begin(self, coro: Optional[Awaitable[Any]] = None) -> None:
""" start userge """
Expand All @@ -174,11 +177,14 @@ async def _finalize() -> None:
task.cancel()
if self.is_initialized:
await self.stop()
# pylint: disable=expression-not-assigned
[t.cancel() for t in asyncio.all_tasks() if t is not asyncio.current_task()]
await self.loop.shutdown_asyncgens()
self.loop.stop()
_LOG.info(_LOG_STR, "Loop Stopped !")
else:
_close_db()
pool._stop() # pylint: disable=protected-access
# pylint: disable=expression-not-assigned
[t.cancel() for t in asyncio.all_tasks() if t is not asyncio.current_task()]
await self.loop.shutdown_asyncgens()
self.loop.stop()
_LOG.info(_LOG_STR, "Loop Stopped !")

async def _shutdown(sig: signal.Signals) -> None:
_LOG.info(_LOG_STR, f"Received Stop Signal [{sig.name}], Exiting Userge ...")
Expand Down
4 changes: 4 additions & 0 deletions userge/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ def get_collection(name: str) -> AgnosticCollection:
return _DATABASE[name]


def _close_db() -> None:
_MGCLIENT.close()


logbot.del_last_msg()
54 changes: 16 additions & 38 deletions userge/core/ext/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,66 +8,44 @@
#
# All rights reserved.

__all__ = ['submit_task', 'submit_thread', 'run_in_thread']
__all__ = ['submit_thread', 'run_in_thread']

import asyncio
from typing import Any, Callable, List
from typing import Any, Callable
from concurrent.futures import ThreadPoolExecutor, Future
from functools import wraps, partial

from userge import logging, Config
from motor.frameworks.asyncio import _EXECUTOR # pylint: disable=protected-access

from userge import logging

_WORKERS = Config.WORKERS
_THREAD_POOL: ThreadPoolExecutor
_ASYNC_QUEUE = asyncio.Queue()
_TASKS: List[asyncio.Task] = []
_LOG = logging.getLogger(__name__)
_LOG_STR = "<<<! |||| %s |||| !>>>"


def submit_task(task: asyncio.coroutines.CoroWrapper) -> None:
""" submit task to task pool """
_ASYNC_QUEUE.put_nowait(task)


def submit_thread(func: Callable[[Any], Any], *args: Any, **kwargs: Any) -> Future:
""" submit thread to thread pool """
return _THREAD_POOL.submit(func, *args, **kwargs)
return _EXECUTOR.submit(func, *args, **kwargs)


def run_in_thread(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
""" run in a thread """
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(_THREAD_POOL, partial(func, *args, **kwargs))
return await loop.run_in_executor(_EXECUTOR, partial(func, *args, **kwargs))
return wrapper


def _start():
global _THREAD_POOL # pylint: disable=global-statement
_THREAD_POOL = ThreadPoolExecutor(_WORKERS)
def _get() -> ThreadPoolExecutor:
return _EXECUTOR


async def _task_worker():
while True:
coro = await _ASYNC_QUEUE.get()
if coro is None:
break
await coro
loop = asyncio.get_event_loop()
for _ in range(_WORKERS):
_TASKS.append(loop.create_task(_task_worker()))
_LOG.info(_LOG_STR, f"Started Pool : {_WORKERS} Workers")
def _stop():
_EXECUTOR.shutdown()
# pylint: disable=protected-access
_LOG.info(_LOG_STR, f"Stopped Pool : {_EXECUTOR._max_workers} Workers")


async def _stop():
_THREAD_POOL.shutdown()
for _ in range(_WORKERS):
_ASYNC_QUEUE.put_nowait(None)
for task in _TASKS:
try:
await asyncio.wait_for(task, timeout=0.3)
except asyncio.TimeoutError:
task.cancel()
_TASKS.clear()
_LOG.info(_LOG_STR, f"Stopped Pool : {_WORKERS} Workers")
# pylint: disable=protected-access
_LOG.info(_LOG_STR, f"Started Pool : {_EXECUTOR._max_workers} Workers")
88 changes: 86 additions & 2 deletions userge/core/ext/raw_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,105 @@

__all__ = ['RawClient']

import asyncio
import time
from typing import Optional
from typing import Optional, Dict

import pyrogram.raw.functions as funcs
import pyrogram.raw.types as types
from pyrogram import Client
from pyrogram.session import Session
from pyrogram.raw.core import TLObject

import userge # pylint: disable=unused-import

_LOG = userge.logging.getLogger(__name__)
_LOG_STR = "<<<! { (FLOOD CONTROL) sleeping %.2fs in %d } !>>>"


class RawClient(Client):
""" userge raw client """
DUAL_MODE = False
LAST_OUTGOING_TIME = time.time()

def __init__(self, bot: Optional['userge.core.client._UsergeBot'] = None, **kwargs) -> None:
REQ_LOGS: Dict[int, 'ChatReq'] = {}
DELAY_BET_MSG_REQ = 1
MSG_REQ_PER_MIN = 20
REQ_LOCK = asyncio.Lock()

def __init__(self, bot: Optional['userge.core.client.UsergeBot'] = None, **kwargs) -> None:
self._bot = bot
super().__init__(**kwargs)
self._channel = userge.core.types.new.ChannelLogger(self, "CORE")
userge.core.types.new.Conversation.init(self)

async def send(self, data: TLObject, retries: int = Session.MAX_RETRIES,
timeout: float = Session.WAIT_TIMEOUT, sleep_threshold: float = None):
key = 0
if isinstance(data, (funcs.messages.SendMessage,
funcs.messages.EditMessage,
funcs.messages.ForwardMessages)):
if isinstance(data, funcs.messages.ForwardMessages):
tmp = data.to_peer
else:
tmp = data.peer
if isinstance(tmp, (types.InputPeerChannel, types.InputPeerChannelFromMessage)):
key = int(tmp.channel_id)
elif isinstance(tmp, types.InputPeerChat):
key = int(tmp.chat_id)
elif isinstance(tmp, (types.InputPeerUser, types.InputPeerUserFromMessage)):
key = int(tmp.user_id)
elif isinstance(data, funcs.channels.DeleteMessages):
if isinstance(data.channel, (types.InputChannel, types.InputChannelFromMessage)):
key = int(data.channel.channel_id)
if key:
async def slp(to_sl: float) -> None:
if to_sl > 0.1:
if to_sl > 1:
_LOG.info(_LOG_STR, to_sl, key)
else:
_LOG.debug(_LOG_STR, to_sl, key)
await asyncio.sleep(to_sl)
async with self.REQ_LOCK:
if key in self.REQ_LOGS:
chat_req = self.REQ_LOGS[key]
else:
chat_req = self.REQ_LOGS[key] = ChatReq()
diff = chat_req.small_diff
if 0 < diff < self.DELAY_BET_MSG_REQ:
await slp(1 - diff)
diff = chat_req.big_diff
if diff >= 60:
chat_req.reset()
elif chat_req.count > self.MSG_REQ_PER_MIN:
await slp(60 - diff)
chat_req.reset()
else:
chat_req.update()
return await super().send(data, retries, timeout, sleep_threshold)


class ChatReq:
def __init__(self) -> None:
self._first = self._last = time.time()
self._count = 0

@property
def small_diff(self) -> float:
return time.time() - self._last

@property
def big_diff(self) -> float:
return time.time() - self._first

@property
def count(self) -> float:
return self._count

def reset(self) -> None:
self._first = self._last = time.time()
self._count = 1

def update(self) -> None:
self._last = time.time()
self._count += 1
18 changes: 9 additions & 9 deletions userge/core/methods/decorators/raw_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _clear_cht() -> None:
_TASK_1_START_TO = time.time()


async def _init(r_c: Union['_client.Userge', '_client._UsergeBot'],
async def _init(r_c: Union['_client.Userge', '_client.UsergeBot'],
r_m: RawMessage) -> None:
global _U_ID, _B_ID # pylint: disable=global-statement
if r_m.from_user and (
Expand All @@ -110,7 +110,7 @@ async def _init(r_c: Union['_client.Userge', '_client._UsergeBot'],
_U_ID = (await r_c.ubot.get_me()).id


async def _raise_func(r_c: Union['_client.Userge', '_client._UsergeBot'],
async def _raise_func(r_c: Union['_client.Userge', '_client.UsergeBot'],
chat_id: int, message_id: int, text: str) -> None:
try:
_sent = await r_c.send_message(
Expand All @@ -123,7 +123,7 @@ async def _raise_func(r_c: Union['_client.Userge', '_client._UsergeBot'],
pass


async def _is_admin(r_c: Union['_client.Userge', '_client._UsergeBot'],
async def _is_admin(r_c: Union['_client.Userge', '_client.UsergeBot'],
r_m: RawMessage) -> bool:
if r_m.chat.type in ("private", "bot"):
return False
Expand All @@ -136,7 +136,7 @@ async def _is_admin(r_c: Union['_client.Userge', '_client._UsergeBot'],
return r_m.chat.id in _B_AD_CHT


def _get_chat_member(r_c: Union['_client.Userge', '_client._UsergeBot'],
def _get_chat_member(r_c: Union['_client.Userge', '_client.UsergeBot'],
r_m: RawMessage) -> Optional[ChatMember]:
if r_m.chat.type in ("private", "bot"):
return None
Expand All @@ -156,7 +156,7 @@ async def _get_lock(key: str) -> asyncio.Lock:
return _CH_LKS[key]


async def _bot_is_present(r_c: Union['_client.Userge', '_client._UsergeBot'],
async def _bot_is_present(r_c: Union['_client.Userge', '_client.UsergeBot'],
r_m: RawMessage) -> bool:
global _TASK_2_START_TO # pylint: disable=global-statement
if isinstance(r_c, _client.Userge):
Expand All @@ -175,15 +175,15 @@ async def _bot_is_present(r_c: Union['_client.Userge', '_client._UsergeBot'],
return r_m.chat.id in _B_CMN_CHT


async def _both_are_admins(r_c: Union['_client.Userge', '_client._UsergeBot'],
async def _both_are_admins(r_c: Union['_client.Userge', '_client.UsergeBot'],
r_m: RawMessage) -> bool:
if not await _bot_is_present(r_c, r_m):
return False
return r_m.chat.id in _B_AD_CHT and r_m.chat.id in _U_AD_CHT


async def _both_have_perm(flt: Union['types.raw.Command', 'types.raw.Filter'],
r_c: Union['_client.Userge', '_client._UsergeBot'],
r_c: Union['_client.Userge', '_client.UsergeBot'],
r_m: RawMessage) -> bool:
if not await _bot_is_present(r_c, r_m):
return False
Expand Down Expand Up @@ -233,7 +233,7 @@ def _build_decorator(self,
flt: Union['types.raw.Command', 'types.raw.Filter'],
**kwargs: Union[str, bool]) -> 'RawDecorator._PYRORETTYPE':
def decorator(func: _PYROFUNC) -> _PYROFUNC:
async def template(r_c: Union['_client.Userge', '_client._UsergeBot'],
async def template(r_c: Union['_client.Userge', '_client.UsergeBot'],
r_m: RawMessage) -> None:
if Config.DISABLED_ALL and r_m.chat.id != Config.LOG_CHANNEL_ID:
return
Expand Down Expand Up @@ -303,7 +303,7 @@ async def template(r_c: Union['_client.Userge', '_client._UsergeBot'],
if cond:
if Config.USE_USER_FOR_CLIENT_CHECKS:
# pylint: disable=protected-access
if isinstance(r_c, _client._UsergeBot):
if isinstance(r_c, _client.UsergeBot):
return
elif await _bot_is_present(r_c, r_m):
if isinstance(r_c, _client.Userge):
Expand Down
6 changes: 3 additions & 3 deletions userge/core/types/bound/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
class Message(RawMessage):
""" Modded Message Class For Userge """
def __init__(self,
client: Union['_client.Userge', '_client._UsergeBot'],
client: Union['_client.Userge', '_client.UsergeBot'],
mvars: Dict[str, object], module: str, **kwargs: Union[str, bool]) -> None:
self._filtered = False
self._filtered_input_str = ''
Expand All @@ -44,7 +44,7 @@ def __init__(self,
super().__init__(client=client, **mvars)

@classmethod
def parse(cls, client: Union['_client.Userge', '_client._UsergeBot'],
def parse(cls, client: Union['_client.Userge', '_client.UsergeBot'],
message: RawMessage, **kwargs: Union[str, bool]) -> 'Message':
""" parse message """
mvars = vars(message)
Expand All @@ -57,7 +57,7 @@ def parse(cls, client: Union['_client.Userge', '_client._UsergeBot'],
return cls(client, mvars, **kwargs)

@property
def client(self) -> Union['_client.Userge', '_client._UsergeBot']:
def client(self) -> Union['_client.Userge', '_client.UsergeBot']:
""" returns client """
return self._client

Expand Down
4 changes: 2 additions & 2 deletions userge/core/types/new/channel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def _gen_string(name: str) -> str:

class ChannelLogger:
""" Channel logger for Userge """
def __init__(self, client: Union['_client.Userge', '_client._UsergeBot'], name: str) -> None:
def __init__(self, client: Union['_client.Userge', '_client.UsergeBot'], name: str) -> None:
self._id = Config.LOG_CHANNEL_ID
self._client = client
self._string = _gen_string(name)
Expand Down Expand Up @@ -148,7 +148,7 @@ async def store(self,
return message_id

async def forward_stored(self,
client: Union['_client.Userge', '_client._UsergeBot'],
client: Union['_client.Userge', '_client.UsergeBot'],
message_id: int,
chat_id: int,
user_id: int,
Expand Down
Loading

0 comments on commit b6d4878

Please sign in to comment.