forked from nonebot/nonebot2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
168 lines (127 loc) · 3.96 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import re
import json
import asyncio
import inspect
import dataclasses
from functools import wraps, partial
from contextlib import asynccontextmanager
from typing_extensions import ParamSpec, get_args, get_origin
from typing import (
Any,
Type,
Tuple,
Union,
TypeVar,
Callable,
Optional,
Awaitable,
AsyncGenerator,
ContextManager,
)
from nonebot.log import logger
from nonebot.typing import overrides
P = ParamSpec("P")
R = TypeVar("R")
T = TypeVar("T")
K = TypeVar("K")
V = TypeVar("V")
def escape_tag(s: str) -> str:
"""
:说明:
用于记录带颜色日志时转义 ``<tag>`` 类型特殊标签
:参数:
* ``s: str``: 需要转义的字符串
:返回:
- ``str``
"""
return re.sub(r"</?((?:[fb]g\s)?[^<>\s]*)>", r"\\\g<0>", s)
def generic_check_issubclass(
cls: Any, class_or_tuple: Union[Type[Any], Tuple[Type[Any], ...]]
) -> bool:
try:
return issubclass(cls, class_or_tuple)
except TypeError:
origin = get_origin(cls)
if origin is Union:
for type_ in get_args(cls):
if type_ is not type(None) and not generic_check_issubclass(
type_, class_or_tuple
):
return False
return True
elif origin:
return issubclass(origin, class_or_tuple)
return False
def is_coroutine_callable(call: Callable[..., Any]) -> bool:
if inspect.isroutine(call):
return inspect.iscoroutinefunction(call)
if inspect.isclass(call):
return False
func_ = getattr(call, "__call__", None)
return inspect.iscoroutinefunction(func_)
def is_gen_callable(call: Callable[..., Any]) -> bool:
if inspect.isgeneratorfunction(call):
return True
func_ = getattr(call, "__call__", None)
return inspect.isgeneratorfunction(func_)
def is_async_gen_callable(call: Callable[..., Any]) -> bool:
if inspect.isasyncgenfunction(call):
return True
func_ = getattr(call, "__call__", None)
return inspect.isasyncgenfunction(func_)
def run_sync(call: Callable[P, R]) -> Callable[P, Awaitable[R]]:
"""
:说明:
一个用于包装 sync function 为 async function 的装饰器
:参数:
* ``call: Callable[P, R]``: 被装饰的同步函数
:返回:
- ``Callable[P, Awaitable[R]]``
"""
@wraps(call)
async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
loop = asyncio.get_running_loop()
pfunc = partial(call, *args, **kwargs)
result = await loop.run_in_executor(None, pfunc)
return result
return _wrapper
@asynccontextmanager
async def run_sync_ctx_manager(
cm: ContextManager[T],
) -> AsyncGenerator[T, None]:
try:
yield await run_sync(cm.__enter__)()
except Exception as e:
ok = await run_sync(cm.__exit__)(type(e), e, None)
if not ok:
raise e
else:
await run_sync(cm.__exit__)(None, None, None)
def get_name(obj: Any) -> str:
if inspect.isfunction(obj) or inspect.isclass(obj):
return obj.__name__
return obj.__class__.__name__
class DataclassEncoder(json.JSONEncoder):
"""
:说明:
在JSON序列化 ``Message`` (List[Dataclass]) 时使用的 ``JSONEncoder``
"""
@overrides(json.JSONEncoder)
def default(self, o):
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)
def logger_wrapper(logger_name: str):
"""
:说明:
用于打印 adapter 的日志。
:log 参数:
* ``level: Literal["CRITICAL", "WARNING", "INFO", "DEBUG", "TRACE"]``: 日志等级
* ``message: str``: 日志信息
* ``exception: Optional[Exception]``: 异常信息
"""
def log(level: str, message: str, exception: Optional[Exception] = None):
logger.opt(colors=True, exception=exception).log(
level, f"<m>{escape_tag(logger_name)}</m> | " + message
)
return log