forked from Ljzd-PRO/KToolBox
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
119 lines (93 loc) · 3.13 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import cgi
import os.path
import urllib.parse
from typing import Generic, TypeVar, Optional, Dict
from loguru import logger
from pydantic import BaseModel, ConfigDict
from ktoolbox.configuration import config
from ktoolbox.enum import RetCodeEnum, DataStorageNameEnum
__all__ = ["BaseRet", "file_name_from_headers", "generate_message", "logger_init"]
_T = TypeVar('_T')
class BaseRet(BaseModel, Generic[_T]):
"""Base data model of function return value"""
code: int = RetCodeEnum.Success.value
message: str = ''
exception: Optional[Exception] = None
data: Optional[_T] = None
model_config = ConfigDict(arbitrary_types_allowed=True)
def __bool__(self):
return self.code == RetCodeEnum.Success
def parse_header(line: str) -> Dict[str, Optional[str]]:
"""
Alternative resolution for parsing header line.
Apply when `cgi.parse_header` is unable to use due to the deprecation of `cgi` module.
https://peps.python.org/pep-0594/#cgi
:param line: Header line
:return: Dict of header line
* Example:
```
parse_header("text/html; charset=utf-8")
```
* Return:
```
{'text/html': None, 'charset': 'utf-8'}
```
"""
dict_value: Dict[str, Optional[str]] = {}
for item in line.split(";"):
if len(pair := item.split("=")) == 1:
dict_value[pair[0]] = None
else:
dict_value.setdefault(*pair)
return dict_value
def file_name_from_headers(headers: Dict[str, str]) -> Optional[str]:
"""
Get file name from headers.
Parse from `Content-Disposition`.
:param headers: HTTP headers
:return: File name
* Example:
```
file_name_from_headers('attachment;filename*=utf-8\\'\\'README%2Emd;filename="README.md"')
```
* Return:
```
README.md
```
"""
if not (disposition := headers.get("Content-Disposition")):
if not (disposition := headers.get("content-disposition")):
return None
_, options = cgi.parse_header(disposition) # alternative: `parse_header` in `utils.py`
if file_name := options.get("filename*"):
if len(name_with_charset := file_name.split("''")) == 2:
charset, name = name_with_charset
return urllib.parse.unquote(name, charset)
if file_name := options.get("filename"):
return urllib.parse.unquote(file_name, config.downloader.encoding)
return None
def generate_message(title: str = None, **kwargs):
"""
Generate message for `BaseRet` and logger
:param title: Message title
:param kwargs: Extra data
"""
title: str = title or ""
return f"{title} - {kwargs}" if kwargs else title
def logger_init(disable_stdout: bool = False):
"""
Initialize `loguru` logger
:param disable_stdout: Disable default output stream
"""
if disable_stdout:
logger.remove()
path = config.logger.path
if not os.path.isdir(path):
os.makedirs(path)
if path is not None:
logger.add(
path / DataStorageNameEnum.LogData,
level=config.logger.level,
rotation=config.logger.rotation,
diagnose=True
)