Skip to content

Commit

Permalink
minor fixes in default template
Browse files Browse the repository at this point in the history
  • Loading branch information
rking32 committed Aug 6, 2020
1 parent 0b2cd3f commit 75661b5
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 49 deletions.
163 changes: 115 additions & 48 deletions userge/core/methods/decorators/raw_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,47 @@
_U_NM_CHT: Dict[int, ChatMember] = {}


async def _update_u_cht(r_m: RawMessage) -> ChatMember:
if r_m.chat.id not in {**_U_AD_CHT, **_U_NM_CHT}:
user = await r_m.chat.get_member(_U_ID)
if user.status in ("creator", "administrator"):
_U_AD_CHT[r_m.chat.id] = user
else:
_U_NM_CHT[r_m.chat.id] = user
elif r_m.chat.id in _U_AD_CHT:
user = _U_AD_CHT[r_m.chat.id]
else:
user = _U_NM_CHT[r_m.chat.id]
return user


async def _update_b_cht(r_m: RawMessage) -> ChatMember:
if r_m.chat.id not in {**_B_AD_CHT, **_B_NM_CHT}:
bot = await r_m.chat.get_member(_B_ID)
if bot.status == "administrator":
_B_AD_CHT[r_m.chat.id] = bot
else:
_B_NM_CHT[r_m.chat.id] = bot
elif r_m.chat.id in _B_AD_CHT:
bot = _B_AD_CHT[r_m.chat.id]
else:
bot = _B_NM_CHT[r_m.chat.id]
return bot


def _clear_cht() -> None:
global _TASK_1_START_TO # pylint: disable=global-statement
_U_AD_CHT.clear()
_U_NM_CHT.clear()
_B_AD_CHT.clear()
_B_NM_CHT.clear()
_TASK_1_START_TO = time.time()


async def _init(r_c: Union['_client.Userge', '_client._UsergeBot']) -> None:
global _U_ID, _B_ID # pylint: disable=global-statement
if _U_ID and _B_ID:
return
if isinstance(r_c, _client.Userge):
if not _U_ID:
_U_ID = (await r_c.get_me()).id
Expand Down Expand Up @@ -73,50 +112,17 @@ async def _raise_func(r_c: Union['_client.Userge', '_client._UsergeBot'],

async def _is_admin(r_c: Union['_client.Userge', '_client._UsergeBot'],
r_m: RawMessage) -> bool:
global _TASK_1_START_TO # pylint: disable=global-statement
if r_m.chat.type in ("private", "bot"):
return False
if round(time.time() - _TASK_1_START_TO) > 10:
_U_AD_CHT.clear()
_U_NM_CHT.clear()
_B_AD_CHT.clear()
_B_NM_CHT.clear()
_TASK_1_START_TO = time.time()
_clear_cht()
if isinstance(r_c, _client.Userge):
if r_m.chat.id not in {**_U_AD_CHT, **_U_NM_CHT}:
user = await r_m.chat.get_member(_U_ID)
if user.status in ("creator", "administrator"):
_U_AD_CHT[r_m.chat.id] = user
else:
_U_NM_CHT[r_m.chat.id] = user
await _update_u_cht(r_m)
return r_m.chat.id in _U_AD_CHT
if r_m.chat.id not in {**_B_AD_CHT, **_B_NM_CHT}:
bot = await r_m.chat.get_member(_B_ID)
if bot.status == "administrator":
_B_AD_CHT[r_m.chat.id] = bot
else:
_B_NM_CHT[r_m.chat.id] = bot
await _update_b_cht(r_m)
return r_m.chat.id in _B_AD_CHT


def _get_chat_member(r_c: Union['_client.Userge', '_client._UsergeBot'],
r_m: RawMessage) -> Optional[ChatMember]:
if r_m.chat.type in ("private", "bot"):
return None
if isinstance(r_c, _client.Userge):
if r_m.chat.id in _U_AD_CHT:
return _U_AD_CHT[r_m.chat.id]
return _U_NM_CHT[r_m.chat.id]
if r_m.chat.id in _B_AD_CHT:
return _B_AD_CHT[r_m.chat.id]
return _B_NM_CHT[r_m.chat.id]


def _only_one_is_admin(r_m: RawMessage) -> bool:
return ((r_m.chat.id in _B_AD_CHT and r_m.chat.id not in _U_AD_CHT)
or (r_m.chat.id not in _B_AD_CHT and r_m.chat.id in _U_AD_CHT))


async def _bot_is_present(r_c: Union['_client.Userge', '_client._UsergeBot'],
r_m: RawMessage) -> bool:
global _TASK_2_START_TO # pylint: disable=global-statement
Expand All @@ -136,6 +142,64 @@ async def _bot_is_present(r_c: Union['_client.Userge', '_client._UsergeBot'],
return r_m.chat.id in _B_CMN_CHT


def _get_chat_member(r_c: Union['_client.Userge', '_client._UsergeBot'],
r_m: RawMessage) -> Optional[ChatMember]:
if r_m.chat.type in ("private", "bot"):
return None
if isinstance(r_c, _client.Userge):
if r_m.chat.id in _U_AD_CHT:
return _U_AD_CHT[r_m.chat.id]
return _U_NM_CHT[r_m.chat.id]
if r_m.chat.id in _B_AD_CHT:
return _B_AD_CHT[r_m.chat.id]
return _B_NM_CHT[r_m.chat.id]


async def _both_are_admins(r_c: Union['_client.Userge', '_client._UsergeBot'],
r_m: RawMessage) -> bool:
if not await _bot_is_present(r_c, r_m):
return False
return r_m.chat.id in _B_AD_CHT and r_m.chat.id in _U_AD_CHT


async def _both_have_perm(flt: Union['types.raw.Command', 'types.raw.Filter'],
r_c: Union['_client.Userge', '_client._UsergeBot'],
r_m: RawMessage) -> bool:
if not await _bot_is_present(r_c, r_m):
return False
try:
user = await _update_u_cht(r_m)
bot = await _update_b_cht(r_m)
except PeerIdInvalid:
return False
if user.status == "creator":
user.can_all = True
else:
user.can_all = False
if flt.check_change_info_perm and not (
(user.can_all or user.can_change_info) and bot.can_change_info):
return False
if flt.check_edit_perm and not (
(user.can_all or user.can_edit_messages) and bot.can_edit_messages):
return False
if flt.check_delete_perm and not (
(user.can_all or user.can_delete_messages) and bot.can_delete_messages):
return False
if flt.check_restrict_perm and not (
(user.can_all or user.can_restrict_members) and bot.can_restrict_members):
return False
if flt.check_promote_perm and not (
(user.can_all or user.can_promote_members) and bot.can_promote_members):
return False
if flt.check_invite_perm and not (
(user.can_all or user.can_invite_users) and bot.can_invite_users):
return False
if flt.check_pin_perm and not (
(user.can_all or user.can_pin_messages) and bot.can_pin_messages):
return False
return True


class RawDecorator(RawClient):
""" userge raw decoretor """
_PYRORETTYPE = Callable[[_PYROFUNC], _PYROFUNC]
Expand All @@ -161,12 +225,11 @@ async def template(r_c: Union['_client.Userge', '_client._UsergeBot'],
if isinstance(flt, types.raw.Command):
await _raise(f"`invalid chat type [{r_m.chat.type}]`")
return
if (r_m.chat and r_m.from_user
and flt.only_admins and not await _is_admin(r_c, r_m)):
if r_m.chat and flt.only_admins and not await _is_admin(r_c, r_m):
if isinstance(flt, types.raw.Command):
await _raise("`chat admin required`")
return
if r_m.chat and r_m.from_user and flt.check_perm:
if r_m.chat and flt.check_perm:
is_admin = await _is_admin(r_c, r_m)
c_m = _get_chat_member(r_c, r_m)
if not c_m:
Expand Down Expand Up @@ -209,15 +272,19 @@ async def template(r_c: Union['_client.Userge', '_client._UsergeBot'],
await _raise("`required permisson [pin_messages]`")
return
if RawClient.DUAL_MODE:
if ((flt.check_client
or (r_m.from_user and r_m.from_user.id in Config.SUDO_USERS))
and not (flt.only_admins and _only_one_is_admin(r_m))):
if Config.USE_USER_FOR_CLIENT_CHECKS:
# pylint: disable=protected-access
if isinstance(r_c, _client._UsergeBot):
return
else:
if await _bot_is_present(r_c, r_m):
if (flt.check_client
or (r_m.from_user and r_m.from_user.id in Config.SUDO_USERS)):
cond = True
if flt.only_admins:
cond = cond and await _both_are_admins(r_m, r_m)
if flt.check_perm:
cond = cond and await _both_have_perm(flt, r_c, r_m)
if cond:
if Config.USE_USER_FOR_CLIENT_CHECKS:
# pylint: disable=protected-access
if isinstance(r_c, _client._UsergeBot):
return
elif await _bot_is_present(r_c, r_m):
if isinstance(r_c, _client.Userge):
return
if flt.check_downpath and not os.path.isdir(Config.DOWN_PATH):
Expand Down
2 changes: 1 addition & 1 deletion userge/plugins/misc/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def explorer(path: Path) -> None:
elif path.is_dir():
for i in sorted(path.iterdir()):
explorer(i)
explorer(Path(path))
explorer(path)
current = 0
for p_t in file_paths:
current += 1
Expand Down

0 comments on commit 75661b5

Please sign in to comment.