diff --git a/userge/plugins/utils/paste.py b/userge/plugins/utils/paste.py index 7248a4ae1..20866cc67 100644 --- a/userge/plugins/utils/paste.py +++ b/userge/plugins/utils/paste.py @@ -12,8 +12,9 @@ import re from typing import Optional, Dict -import aiohttp import aiofiles +import aiohttp +from aiohttp import client_exceptions from userge import userge, Message, Config @@ -33,6 +34,17 @@ def is_supported(self, url: str) -> bool: return True return False + async def _get_token(self, ses: aiohttp.ClientSession, ptn: str) -> Optional[str]: + token = None + async with ses.get(self._url) as resp: + if resp.status != 200: + return None + content = await resp.text() + for i in re.finditer(ptn, content): + token = i.group(1) + break + return token + # pylint: disable=W0613 async def paste(self, ses: aiohttp.ClientSession, text: str, file_type: Optional[str]) -> Optional[str]: @@ -61,19 +73,15 @@ async def paste(self, ses: aiohttp.ClientSession, async with ses.post(self._url + "api/documents", json={"content": text}) as resp: if resp.status != 201: return None - response = await resp.json() - key = response['result']['key'] - final_url = self._url + key - if file_type: - final_url += "." + file_type - return final_url + data = await resp.json() + return _get_url(self._url + data['result']['key'], file_type) async def get_paste(self, ses: aiohttp.ClientSession, code: str) -> Optional[str]: async with ses.get(self._url + "api/documents/" + code) as resp: if resp.status != 200: return None - response = await resp.json() - return response['result']['content'] + data = await resp.json() + return data['result']['content'] class HasteBin(PasteService): @@ -85,12 +93,8 @@ async def paste(self, ses: aiohttp.ClientSession, async with ses.post(self._url + "documents", data=text) as resp: if resp.status != 200: return None - response = await resp.json() - key = response['key'] - final_url = self._url + key - if file_type: - final_url += "." + file_type - return final_url + data = await resp.json() + return _get_url(self._url + data['key'], file_type) class Rentry(PasteService): @@ -99,22 +103,14 @@ def __init__(self) -> None: async def paste(self, ses: aiohttp.ClientSession, text: str, file_type: Optional[str]) -> Optional[str]: - token = None - async with ses.get(self._url) as resp: - if resp.status != 200: - return None - content = await resp.text() - for i in re.finditer(r'name="csrfmiddlewaretoken" value="(.+)"', content): - token = i.group(1) - break - if not token: - return None + token = await self._get_token(ses, r'name="csrfmiddlewaretoken" value="(.+)"') + if not token: + return None if file_type: text = f"```{file_type}\n" + text + "\n```" async with ses.post(self._url, data=dict(csrfmiddlewaretoken=token, text=text), - headers=dict(Referer=self._url), - allow_redirects=True) as resp: + headers=dict(Referer=self._url)) as resp: if resp.status != 200: return None return str(resp.url) @@ -132,8 +128,7 @@ async def paste(self, ses: aiohttp.ClientSession, async with ses.post(self._url + "api", json=data) as resp: if resp.status != 200: return None - code = await resp.text() - return self._url + code + return self._url + await resp.text() class PastyLus(PasteService): @@ -142,53 +137,62 @@ def __init__(self) -> None: async def paste(self, ses: aiohttp.ClientSession, text: str, file_type: Optional[str]) -> Optional[str]: - async with ses.post(self._url + "api/v2/pastes/", json={"content": text}) as resp: - if resp.status != 201: - return None - content = await resp.text() - code = "" - for i in re.finditer(r'"id":"(.+?)"', content): - code = i.group(1) - break - if not code: + try: + async with ses.post(self._url + "api/v2/pastes/", json={"content": text}) as resp: + if resp.status != 201: + return None + code = _get_id(await resp.text()) + if not code: + return None + return _get_url(self._url + code, file_type) + except client_exceptions.TooManyRedirects: + return None + + +class KatBin(PasteService): + def __init__(self) -> None: + super().__init__("katbin", "https://katb.in/") + + async def paste(self, ses: aiohttp.ClientSession, + text: str, file_type: Optional[str]) -> Optional[str]: + token = await self._get_token(ses, r'name="_csrf_token".+value="(.+)"') + if not token: + return None + async with ses.post(self._url, data={"_csrf_token": token, "paste[content]": text}) as resp: + if resp.status != 200: return None - final_url = self._url + code - if file_type: - final_url += '.' + file_type - return final_url + return _get_url(str(resp.url), file_type) -class Katbin(PasteService): +class SpaceBin(PasteService): def __init__(self) -> None: - self._api_url = "https://api.katb.in/api/paste/" - super().__init__("katbin", "https://katb.in/") + self._api_url = "https://spaceb.in/api/v1/documents/" + super().__init__("spacebin", "https://spaceb.in/") async def paste(self, ses: aiohttp.ClientSession, text: str, file_type: Optional[str]) -> Optional[str]: - async with ses.post(self._api_url, json={"content": text}) as resp: + ext = "python" if file_type == "py" else file_type or "markdown" + async with ses.post(self._api_url, data=dict(content=text, extension=ext)) as resp: if resp.status != 201: return None - response = await resp.json() - key = response['paste_id'] - final_url = self._url + key - if file_type: - final_url += "." + file_type - return final_url + code = _get_id(await resp.text()) + if not code: + return None + return self._url + code async def get_paste(self, ses: aiohttp.ClientSession, code: str) -> Optional[str]: - async with ses.get(self._api_url + code) as resp: + async with ses.get(self._api_url + code + "/raw") as resp: if resp.status != 200: return None - response = await resp.json() - return response['content'] + return await resp.text() _SERVICES: Dict[str, PasteService] = { - '-n': NekoBin(), '-h': HasteBin(), '-r': Rentry(), - '-p': Pasting(), '-pl': PastyLus(), '-k': Katbin()} + '-n': NekoBin(), '-h': HasteBin(), '-r': Rentry(), '-p': Pasting(), + '-pl': PastyLus(), '-k': KatBin(), '-s': SpaceBin()} if Config.HEROKU_ENV: - _DEFAULT_SERVICE = '-pl' + _DEFAULT_SERVICE = '-k' else: _DEFAULT_SERVICE = '-n' @@ -196,6 +200,20 @@ async def get_paste(self, ses: aiohttp.ClientSession, code: str) -> Optional[str 'KHTML, like Gecko) Cafari/537.36'} +def _get_id(text: str) -> Optional[str]: + code = None + for i in re.finditer(r'"id":\s?"(.+?)"', text): + code = i.group(1) + break + return code + + +def _get_url(url: str, file_type: Optional[str]) -> str: + if file_type: + url += "." + file_type + return url + + def _get_code(url: str) -> Optional[str]: parts = list(filter(None, url.split('/'))) if len(parts) < 2: