Skip to content

Commit

Permalink
fixed UnboundLocalError in eval
Browse files Browse the repository at this point in the history
  • Loading branch information
rking32 committed Aug 10, 2021
1 parent 395aa75 commit 7a22c5e
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 97 deletions.
21 changes: 5 additions & 16 deletions userge/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ async def _shutdown(_sig: signal.Signals) -> None:
sig, lambda _sig=sig: self.loop.create_task(_shutdown(_sig)))

def _close_loop() -> None:
self.loop.run_until_complete(_waiter())
try:
self.loop.run_until_complete(_waiter())
except RuntimeError:
pass
self.loop.close()
_LOG.info(_LOG_STR, "Loop Closed !")

Expand Down Expand Up @@ -318,21 +321,7 @@ def _wrapper(*args, **kwargs):
if (threading.current_thread() is not threading.main_thread()
and inspect.iscoroutine(coroutine)):
async def _():
fut = asyncio.run_coroutine_threadsafe(coroutine, loop)
_loop = asyncio.get_running_loop()
_fut = _loop.create_future()

def _on_done(_):
try:
res = fut.result()
except Exception as e: # pylint: disable=broad-except
_loop.call_soon_threadsafe(
lambda _e=e: None if _fut.done() else _fut.set_exception(_e))
else:
_loop.call_soon_threadsafe(
lambda _res=res: None if _fut.done() else _fut.set_result(_res))
fut.add_done_callback(_on_done)
return await _fut
return await asyncio.wrap_future(asyncio.run_coroutine_threadsafe(coroutine, loop))
return _()
return coroutine

Expand Down
13 changes: 13 additions & 0 deletions userge/core/types/bound/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,19 @@ def cancel_callback(self, callback: Optional[Callable[[], Any]] = None) -> None:
except (KeyError, ValueError):
pass

async def canceled(self, reply=False) -> None:
"""\nedit or reply that process canceled
Parameters:
reply (``bool``):
reply msg if True, else edit
"""
if reply:
func = self.reply
else:
func = self.edit
await func("`Process Canceled!`", del_in=5)

async def send_as_file(self,
text: str,
filename: str = "output.txt",
Expand Down
2 changes: 1 addition & 1 deletion userge/plugins/misc/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def down_load_media(message: Message):
try:
dl_loc, d_in = await handle_download(message, resource)
except ProcessCanceled:
await message.edit("`Process Canceled!`", del_in=5)
await message.canceled()
except Exception as e_e: # pylint: disable=broad-except
await message.err(str(e_e))
else:
Expand Down
4 changes: 2 additions & 2 deletions userge/plugins/misc/gdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ async def upload(self) -> None:
try:
dl_loc, _ = await tg_download(self._message, replied)
except ProcessCanceled:
await self._message.edit("`Process Canceled!`", del_in=5)
await self._message.canceled()
return
except Exception as e_e:
await self._message.err(str(e_e))
Expand All @@ -746,7 +746,7 @@ async def upload(self) -> None:
try:
dl_loc, _ = await url_download(self._message, self._message.input_str)
except ProcessCanceled:
await self._message.edit("`Process Canceled!`", del_in=5)
await self._message.canceled()
return
except Exception as e_e:
await self._message.err(str(e_e))
Expand Down
16 changes: 8 additions & 8 deletions userge/plugins/misc/pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _zip(self,
p_f.add(file_, relpath(file_, root))
self._current += 1
except ProcessCanceled:
self._output = "`process canceled!`"
self._output = "`Process Canceled!`"
except Exception as z_e:
_LOG.exception(z_e)
self._output = str(z_e)
Expand All @@ -138,7 +138,7 @@ def _unpack(self, file_names: List[str]) -> None:
for file_name in file_names:
if self._is_canceled:
if not self._output:
self._output = "`process canceled!`"
self._output = "`Process Canceled!`"
if not self._is_finished:
self._is_finished = True
break
Expand Down Expand Up @@ -284,7 +284,7 @@ def _split_worker(self, times: int) -> None:
s_f.write(chunk)
self._cmp_size += len(chunk)
except ProcessCanceled:
self._output = "`process canceled!`"
self._output = "`Process Canceled!`"
except Exception as s_e:
_LOG.exception(s_e)
self._output = str(s_e)
Expand All @@ -308,7 +308,7 @@ def _combine_worker(self, file_list: List[str]) -> None:
self._cmp_size += len(chunk)
self._current += 1
except ProcessCanceled:
self._output = "`process canceled!`"
self._output = "`Process Canceled!`"
except Exception as c_e:
_LOG.exception(c_e)
self._output = str(c_e)
Expand Down Expand Up @@ -504,7 +504,7 @@ def _on_cancel():
try:
await task
except asyncio.CancelledError:
await message.edit("`process canceled!`")
await message.canceled()
return

if s_obj.output:
Expand Down Expand Up @@ -572,7 +572,7 @@ def _on_cancel():
try:
await task
except asyncio.CancelledError:
await message.edit("`process canceled!`")
await message.canceled()
return

if c_obj.output:
Expand Down Expand Up @@ -639,7 +639,7 @@ def _on_cancel():
try:
await task
except asyncio.CancelledError:
await message.edit("`process canceled!`")
await message.canceled()
return

if p_obj.output:
Expand Down Expand Up @@ -698,7 +698,7 @@ def _on_cancel():
try:
await task
except asyncio.CancelledError:
await message.edit("`process canceled!`")
await message.canceled()
return

if p_obj.output:
Expand Down
6 changes: 3 additions & 3 deletions userge/plugins/misc/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def upload_to_tg(message: Message):
try:
path_, _ = await url_download(message, path_)
except ProcessCanceled:
await message.edit("`Process Canceled!`", del_in=5)
await message.canceled()
return
except Exception as e_e: # pylint: disable=broad-except
await message.err(str(e_e))
Expand All @@ -113,7 +113,7 @@ async def _handle_message(message: Message) -> None:
try:
dl_loc, _ = await tg_download(message, message.reply_to_message)
except ProcessCanceled:
await message.edit("`Process Canceled!`", del_in=5)
await message.canceled()
except Exception as e_e: # pylint: disable=broad-except
await message.err(str(e_e))
else:
Expand Down Expand Up @@ -374,7 +374,7 @@ async def finalize(message: Message, msg: Message, start_t):
await CHANNEL.fwd_msg(msg)
await message.client.send_chat_action(message.chat.id, "cancel")
if message.process_is_canceled:
await message.edit("`Process Canceled!`", del_in=5)
await message.canceled()
else:
end_t = datetime.now()
m_s = (end_t - start_t).seconds
Expand Down
137 changes: 70 additions & 67 deletions userge/plugins/tools/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
from enum import Enum
from getpass import getuser
from os import geteuid
from typing import Awaitable, Any, Callable, Dict, Optional
from typing import Awaitable, Any, Callable, Dict, Optional, Tuple, Iterable

from pyrogram.types.messages_and_media.message import Str

from userge import userge, Message, Config, pool
from userge.utils import runcmd

CHANNEL = userge.getCLogger()
_EVAL_TASKS: Dict[asyncio.Future, str] = {}


@userge.on_cmd("exec", about={
Expand Down Expand Up @@ -58,6 +57,10 @@ async def exec_(message: Message):
caption=cmd)


_KEY = '_OLD'
_EVAL_TASKS: Dict[asyncio.Future, str] = {}


@userge.on_cmd("eval", about={
'header': "run python code line | lines",
'flags': {
Expand Down Expand Up @@ -169,23 +172,25 @@ async def _callback(output: Optional[str], errored: bool):

await msg.edit("`Executing eval ...`", parse_mode='md')

_g, _l = _context(
context_type, userge=userge, message=message, replied=message.reply_to_message)
l_d = {}
try:
exec(_wrap_code(cmd), _context( # nosec pylint: disable=W0122
context_type, userge=userge, message=message, replied=message.reply_to_message), l_d)
exec(_wrap_code(cmd, _l.keys()), _g, l_d) # nosec pylint: disable=W0122
except Exception: # pylint: disable=broad-except
_g[_KEY] = _l
await _callback(traceback.format_exc(), True)
return
future = asyncio.get_event_loop().create_future()
pool.submit_thread(_run_coro, future, l_d['__aexec'](), _callback)
pool.submit_thread(_run_coro, future, l_d['__aexec'](*_l.values()), _callback)
hint = cmd.split('\n')[0]
_EVAL_TASKS[future] = hint[:20] + "..." if len(hint) > 20 else hint
_EVAL_TASKS[future] = hint[:25] + "..." if len(hint) > 25 else hint

with msg.cancel_callback(future.cancel):
try:
await future
except asyncio.CancelledError:
await msg.edit("`process canceled!`")
await msg.canceled()
finally:
_EVAL_TASKS.pop(future, None)

Expand Down Expand Up @@ -229,7 +234,7 @@ def _on_cancel():
try:
await task
except asyncio.CancelledError:
await message.reply("`process canceled!`")
await message.canceled(reply=True)
return

out_data = f"<pre>{output}{await t_obj.get_output()}</pre>"
Expand All @@ -248,79 +253,49 @@ async def init_func(message: Message):
return cmd


_PROXIES = {}


class _Wrapper:
def __init__(self, original):
self._original = original

def __getattr__(self, name: str):
return getattr(_PROXIES.get(threading.currentThread().ident, self._original), name)


sys.stdout = _Wrapper(sys.stdout)
sys.__stdout__ = _Wrapper(sys.__stdout__)
sys.stderr = _Wrapper(sys.stderr)
sys.__stderr__ = _Wrapper(sys.__stderr__)


@contextmanager
def redirect() -> io.StringIO:
ident = threading.currentThread().ident
source = io.StringIO()
_PROXIES[ident] = source
try:
yield source
finally:
del _PROXIES[ident]
source.close()


def _wrap_code(code: str) -> str:
head = "async def __aexec():\n try:\n "
tail = "\n finally: globals()['_OLD'] = locals()"
if '\n' in code:
code = '\n '.join(iter(code.split('\n')))
elif (any(True for k_ in keyword.kwlist
if k_ not in ('True', 'False', 'None', 'await') and code.startswith(f"{k_} "))
or ('=' in code and '==' not in code)):
code = f"\n {code}"
else:
code = f"\n return {code}"
return head + code + tail


class _ContextType(Enum):
GLOBAL = 0
PRIVATE = 1
NEW = 2


def _context(context_type: _ContextType, **kwargs) -> dict:
def _context(context_type: _ContextType, **kwargs) -> Tuple[Dict[str, Any], Dict[str, Any]]:
if context_type == _ContextType.NEW:
try:
del globals()['_OLD']
del globals()[_KEY]
except KeyError:
pass
if '_OLD' not in globals():
globals()['_OLD'] = globals().copy()
_data = globals()['_OLD']
if _KEY not in globals():
globals()[_KEY] = globals().copy()
_g = globals()[_KEY]
if context_type == _ContextType.PRIVATE:
_data = _data.copy()
_data.update(_data.pop('_OLD', {}))
_data.update(kwargs)
return _data
_g = _g.copy()
_l = _g.pop(_KEY, {})
_l.update(kwargs)
return _g, _l


def _wrap_code(code: str, args: Iterable[str]) -> str:
head = "async def __aexec(" + ', '.join(args) + "):\n try:\n "
tail = "\n finally: globals()['" + _KEY + "'] = locals()"
if '\n' in code:
code = code.replace('\n', '\n ')
elif (any(True for k_ in keyword.kwlist if k_ not in (
'True', 'False', 'None', 'lambda', 'await') and code.startswith(f"{k_} "))
or ('=' in code and '==' not in code)):
code = f"\n {code}"
else:
code = f"\n return {code}"
return head + code + tail


def _run_coro(future: asyncio.Future, coro: Awaitable[Any],
callback: Callable[[str, bool], Awaitable[Any]]):
callback: Callable[[str, bool], Awaitable[Any]]) -> None:
loop = asyncio.new_event_loop()
task = loop.create_task(coro)
userge.loop.call_soon_threadsafe(
future.add_done_callback,
lambda _: loop.call_soon_threadsafe(task.cancel)
if loop.is_running() and future.cancelled() else None)
userge.loop.call_soon_threadsafe(future.add_done_callback,
lambda _: (loop.is_running() and future.cancelled()
and loop.call_soon_threadsafe(task.cancel)))
try:
ret, exc = None, None
with redirect() as out:
Expand All @@ -337,8 +312,36 @@ def _run_coro(future: asyncio.Future, coro: Awaitable[Any],
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
userge.loop.call_soon_threadsafe(
lambda: future.set_result(None) if not future.done() else None)
userge.loop.call_soon_threadsafe(lambda: future.done() or future.set_result(None))


_PROXIES = {}


class _Wrapper:
def __init__(self, original):
self._original = original

def __getattr__(self, name: str):
return getattr(_PROXIES.get(threading.currentThread().ident, self._original), name)


sys.stdout = _Wrapper(sys.stdout)
sys.__stdout__ = _Wrapper(sys.__stdout__)
sys.stderr = _Wrapper(sys.stderr)
sys.__stderr__ = _Wrapper(sys.__stderr__)


@contextmanager
def redirect() -> io.StringIO:
ident = threading.currentThread().ident
source = io.StringIO()
_PROXIES[ident] = source
try:
yield source
finally:
del _PROXIES[ident]
source.close()


class Term:
Expand Down

0 comments on commit 7a22c5e

Please sign in to comment.