Skip to content

Commit

Permalink
extends userge.utils (UsergeTeam#122)
Browse files Browse the repository at this point in the history
* let me add short way

* added shortcut for _parse_button

* fixing pep8 😅

* extends userge.utils (thks @Krishna-Singhal idea)

Co-authored-by: rking32 <[email protected]>
  • Loading branch information
Krishna-Singhal and rking32 authored Aug 9, 2020
1 parent 7d33e5a commit 036e6f8
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 104 deletions.
80 changes: 8 additions & 72 deletions userge/core/types/new/channel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,86 +10,24 @@

__all__ = ['ChannelLogger']

import re
import asyncio
from typing import Optional, List, Tuple, Union
from typing import Optional, Union

from pyrogram import Message as RawMessage, InlineKeyboardButton, InlineKeyboardMarkup
from pyrogram import Message as RawMessage

from userge import logging, Config
from userge.utils import SafeDict
from userge.utils import SafeDict, get_file_id_and_ref, parse_buttons
from ..bound import message as _message # pylint: disable=unused-import
from ... import client as _client # pylint: disable=unused-import

_LOG = logging.getLogger(__name__)
_LOG_STR = "<<<! ::::: %s ::::: !>>>"
_BTN_URL_REGEX = re.compile(r"(\[([^\[]+?)\]\[buttonurl:(?:/{0,2})(.+?)(:same)?\])")


def _gen_string(name: str) -> str:
return "**logger** : #" + name.split('.')[-1].upper() + "\n\n{}"


def _get_file_id_and_ref(
message: '_message.Message') -> Tuple[Optional[str], Optional[str]]:
if message.audio:
file_ = message.audio
elif message.animation:
file_ = message.animation
elif message.photo:
file_ = message.photo
elif message.sticker:
file_ = message.sticker
elif message.voice:
file_ = message.voice
elif message.video_note:
file_ = message.video_note
elif message.video:
file_ = message.video
else:
file_ = message.document
if file_:
return file_.file_id, file_.file_ref
return None, None


def _parse_buttons(
markdown_note: str) -> Tuple[str, List[Optional[List[InlineKeyboardButton]]]]:
prev = 0
note_data = ""
buttons: List[Tuple[str, str, str]] = []
for match in _BTN_URL_REGEX.finditer(markdown_note):
# Check if btnurl is escaped
n_escapes = 0
to_check = match.start(1) - 1
while to_check > 0 and markdown_note[to_check] == "\\":
n_escapes += 1
to_check -= 1
# if even, not escaped -> create button
if n_escapes % 2 == 0:
# create a tuple with button label, url, and newline status
buttons.append((match.group(2), match.group(3), bool(match.group(4))))
note_data += markdown_note[prev:match.start(1)]
prev = match.end(1)
# if odd, escaped -> move along
else:
note_data += markdown_note[prev:to_check]
prev = match.start(1) - 1
note_data += markdown_note[prev:]
return note_data.strip(), _build_keyboard(buttons)


def _build_keyboard(
buttons: List[Tuple[str, str, str]]) -> List[Optional[List[InlineKeyboardButton]]]:
keyb: List[List[InlineKeyboardButton]] = []
for btn in buttons:
if btn[2] and keyb:
keyb[-1].append(InlineKeyboardButton(btn[0], url=btn[1]))
else:
keyb.append([InlineKeyboardButton(btn[0], url=btn[1])])
return keyb


class ChannelLogger:
""" Channel logger for Userge """
def __init__(self, client: Union['_client.Userge', '_client._UsergeBot'], name: str) -> None:
Expand Down Expand Up @@ -218,7 +156,7 @@ async def store(self,
if message and message.caption:
caption = caption + message.caption.html
if message:
file_id, file_ref = _get_file_id_and_ref(message)
file_id, file_ref = get_file_id_and_ref(message)
if message and message.media and file_id and file_ref:
if caption:
caption = self._string.format(caption.strip())
Expand Down Expand Up @@ -278,25 +216,23 @@ async def forward_stored(self,
'chat': chat.title if chat.title else "this group",
'count': chat.members_count})
caption = caption.format_map(SafeDict(**u_dict))
file_id, file_ref = _get_file_id_and_ref(message)
caption, buttons = _parse_buttons(caption)
file_id, file_ref = get_file_id_and_ref(message)
caption, buttons = parse_buttons(caption)
if message.media and file_id and file_ref:
msg = await client.send_cached_media(
chat_id=chat_id,
file_id=file_id,
file_ref=file_ref,
caption=caption,
reply_to_message_id=reply_to_message_id,
reply_markup=InlineKeyboardMarkup(buttons)
if hasattr(client, 'ubot') and buttons else None)
reply_markup=buttons if client.is_bot and buttons else None)
else:
msg = await client.send_message(
chat_id=chat_id,
text=caption,
reply_to_message_id=reply_to_message_id,
disable_web_page_preview=True,
reply_markup=InlineKeyboardMarkup(buttons)
if hasattr(client, 'ubot') and buttons else None)
reply_markup=buttons if client.is_bot and buttons else None)
if del_in and msg:
await asyncio.sleep(del_in)
await msg.delete()
11 changes: 7 additions & 4 deletions userge/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
# All rights reserved.

from .progress import progress # noqa
from .tools import (take_screen_shot, # noqa
SafeDict,
runcmd,
from .sys_tools import SafeDict, get_import_path # noqa
from .tools import (demojify, # noqa
get_file_id_and_ref,
humanbytes,
time_formatter,
get_import_path)
post_to_telegraph,
runcmd,
take_screen_shot,
parse_buttons)
33 changes: 33 additions & 0 deletions userge/utils/sys_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# pylint: disable=missing-module-docstring
#
# Copyright (C) 2020 by UsergeTeam@Github, < https://github.com/UsergeTeam >.
#
# This file is part of < https://github.com/UsergeTeam/Userge > project,
# and is released under the "GNU v3.0 License Agreement".
# Please see < https://github.com/uaudith/Userge/blob/master/LICENSE >
#
# All rights reserved.

from glob import glob
from os.path import isfile, relpath
from typing import Dict, List, Union


class SafeDict(Dict[str, str]):
""" modded dict """
def __missing__(self, key: str) -> str:
return '{' + key + '}'


def get_import_path(root: str, path: str) -> Union[str, List[str]]:
""" return import path """
seperator = '\\' if '\\' in root else '/'
if isfile(path):
return '.'.join(relpath(path, root).split(seperator))[:-3]
all_paths = glob(root + path.rstrip(seperator) + f"{seperator}*.py", recursive=True)
return sorted(
[
'.'.join(relpath(f, root).split(seperator))[:-3] for f in all_paths
if not f.endswith("__init__.py")
]
)
105 changes: 81 additions & 24 deletions userge/utils/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,49 @@
# All rights reserved.

import os
import re
import shlex
import asyncio
from glob import glob
from os.path import isfile, relpath, basename
from typing import Tuple, Dict, List, Union, Optional
from os.path import basename
from typing import Tuple, List, Optional

from userge import logging, Config
from html_telegraph_poster import TelegraphPoster
from pyrogram import InlineKeyboardButton, InlineKeyboardMarkup

_LOG = logging.getLogger(__name__)
import userge

_LOG = userge.logging.getLogger(__name__)
_EMOJI_PATTERN = re.compile(
"["
"\U0001F1E0-\U0001F1FF" # flags (iOS)
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F600-\U0001F64F" # emoticons
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F700-\U0001F77F" # alchemical symbols
"\U0001F780-\U0001F7FF" # Geometric Shapes Extended
"\U0001F800-\U0001F8FF" # Supplemental Arrows-C
"\U0001F900-\U0001F9FF" # Supplemental Symbols and Pictographs
"\U0001FA00-\U0001FA6F" # Chess Symbols
"\U0001FA70-\U0001FAFF" # Symbols and Pictographs Extended-A
"\U00002702-\U000027B0" # Dingbats
"]+")
_BTN_URL_REGEX = re.compile(r"(\[([^\[]+?)\]\[buttonurl:(?:/{0,2})(.+?)(:same)?\])")


# https://github.com/UsergeTeam/Userge-Plugins/blob/master/plugins/tweet.py
def demojify(string: str) -> str:
""" Remove emojis and other non-safe characters from string """
return re.sub(_EMOJI_PATTERN, '', string)


def get_file_id_and_ref(message: 'userge.Message') -> Tuple[Optional[str], Optional[str]]:
""" get file_id and file_ref """
file_ = message.audio or message.animation or message.photo \
or message.sticker or message.voice or message.video_note \
or message.video or message.document
if file_:
return file_.file_id, file_.file_ref
return None, None


def humanbytes(size: float) -> str:
Expand Down Expand Up @@ -45,6 +79,21 @@ def time_formatter(seconds: float) -> str:
return tmp[:-2]


# https://github.com/UsergeTeam/Userge-Plugins/blob/master/plugins/anilist.py
def post_to_telegraph(a_title: str, content: str) -> str:
""" Create a Telegram Post using HTML Content """
post_client = TelegraphPoster(use_api=True)
auth_name = "@theUserge"
post_client.create_api_token(auth_name)
post_page = post_client.post(
title=a_title,
author=auth_name,
author_url="https://t.me/theUserge",
text=content
)
return post_page['url']


async def runcmd(cmd: str) -> Tuple[str, str, int, int]:
""" run command in terminal """
args = shlex.split(cmd)
Expand All @@ -62,29 +111,37 @@ async def take_screen_shot(video_file: str, duration: int, path: str = '') -> Op
""" take a screenshot """
_LOG.info('[[[Extracting a frame from %s ||| Video duration => %s]]]', video_file, duration)
ttl = duration // 2
thumb_image_path = path or os.path.join(Config.DOWN_PATH, f"{basename(video_file)}.jpg")
thumb_image_path = path or os.path.join(userge.Config.DOWN_PATH, f"{basename(video_file)}.jpg")
command = f"ffmpeg -ss {ttl} -i '{video_file}' -vframes 1 '{thumb_image_path}'"
err = (await runcmd(command))[1]
if err:
_LOG.error(err)
return thumb_image_path if os.path.exists(thumb_image_path) else None


class SafeDict(Dict[str, str]):
""" modded dict """
def __missing__(self, key: str) -> str:
return '{' + key + '}'


def get_import_path(root: str, path: str) -> Union[str, List[str]]:
""" return import path """
seperator = '\\' if '\\' in root else '/'
if isfile(path):
return '.'.join(relpath(path, root).split(seperator))[:-3]
all_paths = glob(root + path.rstrip(seperator) + f"{seperator}*.py", recursive=True)
return sorted(
[
'.'.join(relpath(f, root).split(seperator))[:-3] for f in all_paths
if not f.endswith("__init__.py")
]
)
def parse_buttons(markdown_note: str) -> Tuple[str, Optional[InlineKeyboardButton]]:
""" markdown_note to string and buttons """
prev = 0
note_data = ""
buttons: List[Tuple[str, str, str]] = []
for match in _BTN_URL_REGEX.finditer(markdown_note):
n_escapes = 0
to_check = match.start(1) - 1
while to_check > 0 and markdown_note[to_check] == "\\":
n_escapes += 1
to_check -= 1
if n_escapes % 2 == 0:
buttons.append((match.group(2), match.group(3), bool(match.group(4))))
note_data += markdown_note[prev:match.start(1)]
prev = match.end(1)
else:
note_data += markdown_note[prev:to_check]
prev = match.start(1) - 1
note_data += markdown_note[prev:]
keyb: List[List[InlineKeyboardButton]] = []
for btn in buttons:
if btn[2] and keyb:
keyb[-1].append(InlineKeyboardButton(btn[0], url=btn[1]))
else:
keyb.append([InlineKeyboardButton(btn[0], url=btn[1])])
return note_data.strip(), InlineKeyboardMarkup(keyb) if keyb else None
5 changes: 1 addition & 4 deletions userge/versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,5 @@
__micro__ = 7

__python_version__ = f"{version_info[0]}.{version_info[1]}.{version_info[2]}"

__license__ = ("[GNU GPL v3.0]"
"(https://github.com/UsergeTeam/Userge/blob/master/LICENSE)")

__license__ = "[GNU GPL v3.0](https://github.com/UsergeTeam/Userge/blob/master/LICENSE)"
__copyright__ = "[UsergeTeam](https://github.com/UsergeTeam)"

0 comments on commit 036e6f8

Please sign in to comment.