From 7a22c5ea6f1888c260eeecdd9bab7e85c17d89c1 Mon Sep 17 00:00:00 2001 From: rking32 Date: Wed, 11 Aug 2021 00:52:15 +0530 Subject: [PATCH] fixed UnboundLocalError in eval --- userge/core/client.py | 21 ++--- userge/core/types/bound/message.py | 13 +++ userge/plugins/misc/download.py | 2 +- userge/plugins/misc/gdrive.py | 4 +- userge/plugins/misc/pathlib.py | 16 ++-- userge/plugins/misc/upload.py | 6 +- userge/plugins/tools/executor.py | 137 +++++++++++++++-------------- 7 files changed, 102 insertions(+), 97 deletions(-) diff --git a/userge/core/client.py b/userge/core/client.py index 36a3708a0..1270c3fff 100644 --- a/userge/core/client.py +++ b/userge/core/client.py @@ -276,7 +276,10 @@ async def _shutdown(_sig: signal.Signals) -> None: sig, lambda _sig=sig: self.loop.create_task(_shutdown(_sig))) def _close_loop() -> None: - self.loop.run_until_complete(_waiter()) + try: + self.loop.run_until_complete(_waiter()) + except RuntimeError: + pass self.loop.close() _LOG.info(_LOG_STR, "Loop Closed !") @@ -318,21 +321,7 @@ def _wrapper(*args, **kwargs): if (threading.current_thread() is not threading.main_thread() and inspect.iscoroutine(coroutine)): async def _(): - fut = asyncio.run_coroutine_threadsafe(coroutine, loop) - _loop = asyncio.get_running_loop() - _fut = _loop.create_future() - - def _on_done(_): - try: - res = fut.result() - except Exception as e: # pylint: disable=broad-except - _loop.call_soon_threadsafe( - lambda _e=e: None if _fut.done() else _fut.set_exception(_e)) - else: - _loop.call_soon_threadsafe( - lambda _res=res: None if _fut.done() else _fut.set_result(_res)) - fut.add_done_callback(_on_done) - return await _fut + return await asyncio.wrap_future(asyncio.run_coroutine_threadsafe(coroutine, loop)) return _() return coroutine diff --git a/userge/core/types/bound/message.py b/userge/core/types/bound/message.py index a683f73a0..3e6a6692c 100644 --- a/userge/core/types/bound/message.py +++ b/userge/core/types/bound/message.py @@ -219,6 +219,19 @@ def cancel_callback(self, callback: Optional[Callable[[], Any]] = None) -> None: except (KeyError, ValueError): pass + async def canceled(self, reply=False) -> None: + """\nedit or reply that process canceled + + Parameters: + reply (``bool``): + reply msg if True, else edit + """ + if reply: + func = self.reply + else: + func = self.edit + await func("`Process Canceled!`", del_in=5) + async def send_as_file(self, text: str, filename: str = "output.txt", diff --git a/userge/plugins/misc/download.py b/userge/plugins/misc/download.py index eed607567..a700026fc 100644 --- a/userge/plugins/misc/download.py +++ b/userge/plugins/misc/download.py @@ -41,7 +41,7 @@ async def down_load_media(message: Message): try: dl_loc, d_in = await handle_download(message, resource) except ProcessCanceled: - await message.edit("`Process Canceled!`", del_in=5) + await message.canceled() except Exception as e_e: # pylint: disable=broad-except await message.err(str(e_e)) else: diff --git a/userge/plugins/misc/gdrive.py b/userge/plugins/misc/gdrive.py index b9d4e637f..01692c58b 100644 --- a/userge/plugins/misc/gdrive.py +++ b/userge/plugins/misc/gdrive.py @@ -737,7 +737,7 @@ async def upload(self) -> None: try: dl_loc, _ = await tg_download(self._message, replied) except ProcessCanceled: - await self._message.edit("`Process Canceled!`", del_in=5) + await self._message.canceled() return except Exception as e_e: await self._message.err(str(e_e)) @@ -746,7 +746,7 @@ async def upload(self) -> None: try: dl_loc, _ = await url_download(self._message, self._message.input_str) except ProcessCanceled: - await self._message.edit("`Process Canceled!`", del_in=5) + await self._message.canceled() return except Exception as e_e: await self._message.err(str(e_e)) diff --git a/userge/plugins/misc/pathlib.py b/userge/plugins/misc/pathlib.py index 2de31b75c..149fe6168 100644 --- a/userge/plugins/misc/pathlib.py +++ b/userge/plugins/misc/pathlib.py @@ -120,7 +120,7 @@ def _zip(self, p_f.add(file_, relpath(file_, root)) self._current += 1 except ProcessCanceled: - self._output = "`process canceled!`" + self._output = "`Process Canceled!`" except Exception as z_e: _LOG.exception(z_e) self._output = str(z_e) @@ -138,7 +138,7 @@ def _unpack(self, file_names: List[str]) -> None: for file_name in file_names: if self._is_canceled: if not self._output: - self._output = "`process canceled!`" + self._output = "`Process Canceled!`" if not self._is_finished: self._is_finished = True break @@ -284,7 +284,7 @@ def _split_worker(self, times: int) -> None: s_f.write(chunk) self._cmp_size += len(chunk) except ProcessCanceled: - self._output = "`process canceled!`" + self._output = "`Process Canceled!`" except Exception as s_e: _LOG.exception(s_e) self._output = str(s_e) @@ -308,7 +308,7 @@ def _combine_worker(self, file_list: List[str]) -> None: self._cmp_size += len(chunk) self._current += 1 except ProcessCanceled: - self._output = "`process canceled!`" + self._output = "`Process Canceled!`" except Exception as c_e: _LOG.exception(c_e) self._output = str(c_e) @@ -504,7 +504,7 @@ def _on_cancel(): try: await task except asyncio.CancelledError: - await message.edit("`process canceled!`") + await message.canceled() return if s_obj.output: @@ -572,7 +572,7 @@ def _on_cancel(): try: await task except asyncio.CancelledError: - await message.edit("`process canceled!`") + await message.canceled() return if c_obj.output: @@ -639,7 +639,7 @@ def _on_cancel(): try: await task except asyncio.CancelledError: - await message.edit("`process canceled!`") + await message.canceled() return if p_obj.output: @@ -698,7 +698,7 @@ def _on_cancel(): try: await task except asyncio.CancelledError: - await message.edit("`process canceled!`") + await message.canceled() return if p_obj.output: diff --git a/userge/plugins/misc/upload.py b/userge/plugins/misc/upload.py index d33d15b8a..d37582877 100644 --- a/userge/plugins/misc/upload.py +++ b/userge/plugins/misc/upload.py @@ -87,7 +87,7 @@ async def upload_to_tg(message: Message): try: path_, _ = await url_download(message, path_) except ProcessCanceled: - await message.edit("`Process Canceled!`", del_in=5) + await message.canceled() return except Exception as e_e: # pylint: disable=broad-except await message.err(str(e_e)) @@ -113,7 +113,7 @@ async def _handle_message(message: Message) -> None: try: dl_loc, _ = await tg_download(message, message.reply_to_message) except ProcessCanceled: - await message.edit("`Process Canceled!`", del_in=5) + await message.canceled() except Exception as e_e: # pylint: disable=broad-except await message.err(str(e_e)) else: @@ -374,7 +374,7 @@ async def finalize(message: Message, msg: Message, start_t): await CHANNEL.fwd_msg(msg) await message.client.send_chat_action(message.chat.id, "cancel") if message.process_is_canceled: - await message.edit("`Process Canceled!`", del_in=5) + await message.canceled() else: end_t = datetime.now() m_s = (end_t - start_t).seconds diff --git a/userge/plugins/tools/executor.py b/userge/plugins/tools/executor.py index 7198497b0..0c9f2954f 100644 --- a/userge/plugins/tools/executor.py +++ b/userge/plugins/tools/executor.py @@ -18,7 +18,7 @@ from enum import Enum from getpass import getuser from os import geteuid -from typing import Awaitable, Any, Callable, Dict, Optional +from typing import Awaitable, Any, Callable, Dict, Optional, Tuple, Iterable from pyrogram.types.messages_and_media.message import Str @@ -26,7 +26,6 @@ from userge.utils import runcmd CHANNEL = userge.getCLogger() -_EVAL_TASKS: Dict[asyncio.Future, str] = {} @userge.on_cmd("exec", about={ @@ -58,6 +57,10 @@ async def exec_(message: Message): caption=cmd) +_KEY = '_OLD' +_EVAL_TASKS: Dict[asyncio.Future, str] = {} + + @userge.on_cmd("eval", about={ 'header': "run python code line | lines", 'flags': { @@ -169,23 +172,25 @@ async def _callback(output: Optional[str], errored: bool): await msg.edit("`Executing eval ...`", parse_mode='md') + _g, _l = _context( + context_type, userge=userge, message=message, replied=message.reply_to_message) l_d = {} try: - exec(_wrap_code(cmd), _context( # nosec pylint: disable=W0122 - context_type, userge=userge, message=message, replied=message.reply_to_message), l_d) + exec(_wrap_code(cmd, _l.keys()), _g, l_d) # nosec pylint: disable=W0122 except Exception: # pylint: disable=broad-except + _g[_KEY] = _l await _callback(traceback.format_exc(), True) return future = asyncio.get_event_loop().create_future() - pool.submit_thread(_run_coro, future, l_d['__aexec'](), _callback) + pool.submit_thread(_run_coro, future, l_d['__aexec'](*_l.values()), _callback) hint = cmd.split('\n')[0] - _EVAL_TASKS[future] = hint[:20] + "..." if len(hint) > 20 else hint + _EVAL_TASKS[future] = hint[:25] + "..." if len(hint) > 25 else hint with msg.cancel_callback(future.cancel): try: await future except asyncio.CancelledError: - await msg.edit("`process canceled!`") + await msg.canceled() finally: _EVAL_TASKS.pop(future, None) @@ -229,7 +234,7 @@ def _on_cancel(): try: await task except asyncio.CancelledError: - await message.reply("`process canceled!`") + await message.canceled(reply=True) return out_data = f"
{output}{await t_obj.get_output()}
" @@ -248,79 +253,49 @@ async def init_func(message: Message): return cmd -_PROXIES = {} - - -class _Wrapper: - def __init__(self, original): - self._original = original - - def __getattr__(self, name: str): - return getattr(_PROXIES.get(threading.currentThread().ident, self._original), name) - - -sys.stdout = _Wrapper(sys.stdout) -sys.__stdout__ = _Wrapper(sys.__stdout__) -sys.stderr = _Wrapper(sys.stderr) -sys.__stderr__ = _Wrapper(sys.__stderr__) - - -@contextmanager -def redirect() -> io.StringIO: - ident = threading.currentThread().ident - source = io.StringIO() - _PROXIES[ident] = source - try: - yield source - finally: - del _PROXIES[ident] - source.close() - - -def _wrap_code(code: str) -> str: - head = "async def __aexec():\n try:\n " - tail = "\n finally: globals()['_OLD'] = locals()" - if '\n' in code: - code = '\n '.join(iter(code.split('\n'))) - elif (any(True for k_ in keyword.kwlist - if k_ not in ('True', 'False', 'None', 'await') and code.startswith(f"{k_} ")) - or ('=' in code and '==' not in code)): - code = f"\n {code}" - else: - code = f"\n return {code}" - return head + code + tail - - class _ContextType(Enum): GLOBAL = 0 PRIVATE = 1 NEW = 2 -def _context(context_type: _ContextType, **kwargs) -> dict: +def _context(context_type: _ContextType, **kwargs) -> Tuple[Dict[str, Any], Dict[str, Any]]: if context_type == _ContextType.NEW: try: - del globals()['_OLD'] + del globals()[_KEY] except KeyError: pass - if '_OLD' not in globals(): - globals()['_OLD'] = globals().copy() - _data = globals()['_OLD'] + if _KEY not in globals(): + globals()[_KEY] = globals().copy() + _g = globals()[_KEY] if context_type == _ContextType.PRIVATE: - _data = _data.copy() - _data.update(_data.pop('_OLD', {})) - _data.update(kwargs) - return _data + _g = _g.copy() + _l = _g.pop(_KEY, {}) + _l.update(kwargs) + return _g, _l + + +def _wrap_code(code: str, args: Iterable[str]) -> str: + head = "async def __aexec(" + ', '.join(args) + "):\n try:\n " + tail = "\n finally: globals()['" + _KEY + "'] = locals()" + if '\n' in code: + code = code.replace('\n', '\n ') + elif (any(True for k_ in keyword.kwlist if k_ not in ( + 'True', 'False', 'None', 'lambda', 'await') and code.startswith(f"{k_} ")) + or ('=' in code and '==' not in code)): + code = f"\n {code}" + else: + code = f"\n return {code}" + return head + code + tail def _run_coro(future: asyncio.Future, coro: Awaitable[Any], - callback: Callable[[str, bool], Awaitable[Any]]): + callback: Callable[[str, bool], Awaitable[Any]]) -> None: loop = asyncio.new_event_loop() task = loop.create_task(coro) - userge.loop.call_soon_threadsafe( - future.add_done_callback, - lambda _: loop.call_soon_threadsafe(task.cancel) - if loop.is_running() and future.cancelled() else None) + userge.loop.call_soon_threadsafe(future.add_done_callback, + lambda _: (loop.is_running() and future.cancelled() + and loop.call_soon_threadsafe(task.cancel))) try: ret, exc = None, None with redirect() as out: @@ -337,8 +312,36 @@ def _run_coro(future: asyncio.Future, coro: Awaitable[Any], finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() - userge.loop.call_soon_threadsafe( - lambda: future.set_result(None) if not future.done() else None) + userge.loop.call_soon_threadsafe(lambda: future.done() or future.set_result(None)) + + +_PROXIES = {} + + +class _Wrapper: + def __init__(self, original): + self._original = original + + def __getattr__(self, name: str): + return getattr(_PROXIES.get(threading.currentThread().ident, self._original), name) + + +sys.stdout = _Wrapper(sys.stdout) +sys.__stdout__ = _Wrapper(sys.__stdout__) +sys.stderr = _Wrapper(sys.stderr) +sys.__stderr__ = _Wrapper(sys.__stderr__) + + +@contextmanager +def redirect() -> io.StringIO: + ident = threading.currentThread().ident + source = io.StringIO() + _PROXIES[ident] = source + try: + yield source + finally: + del _PROXIES[ident] + source.close() class Term: