Skip to content

Commit

Permalink
fix Katbin + add Spacebin
Browse files Browse the repository at this point in the history
  • Loading branch information
rking32 committed Oct 17, 2021
1 parent 5d9b488 commit c20aa31
Showing 1 changed file with 76 additions and 58 deletions.
134 changes: 76 additions & 58 deletions userge/plugins/utils/paste.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import re
from typing import Optional, Dict

import aiohttp
import aiofiles
import aiohttp
from aiohttp import client_exceptions

from userge import userge, Message, Config

Expand All @@ -33,6 +34,17 @@ def is_supported(self, url: str) -> bool:
return True
return False

async def _get_token(self, ses: aiohttp.ClientSession, ptn: str) -> Optional[str]:
token = None
async with ses.get(self._url) as resp:
if resp.status != 200:
return None
content = await resp.text()
for i in re.finditer(ptn, content):
token = i.group(1)
break
return token

# pylint: disable=W0613
async def paste(self, ses: aiohttp.ClientSession,
text: str, file_type: Optional[str]) -> Optional[str]:
Expand Down Expand Up @@ -61,19 +73,15 @@ async def paste(self, ses: aiohttp.ClientSession,
async with ses.post(self._url + "api/documents", json={"content": text}) as resp:
if resp.status != 201:
return None
response = await resp.json()
key = response['result']['key']
final_url = self._url + key
if file_type:
final_url += "." + file_type
return final_url
data = await resp.json()
return _get_url(self._url + data['result']['key'], file_type)

async def get_paste(self, ses: aiohttp.ClientSession, code: str) -> Optional[str]:
async with ses.get(self._url + "api/documents/" + code) as resp:
if resp.status != 200:
return None
response = await resp.json()
return response['result']['content']
data = await resp.json()
return data['result']['content']


class HasteBin(PasteService):
Expand All @@ -85,12 +93,8 @@ async def paste(self, ses: aiohttp.ClientSession,
async with ses.post(self._url + "documents", data=text) as resp:
if resp.status != 200:
return None
response = await resp.json()
key = response['key']
final_url = self._url + key
if file_type:
final_url += "." + file_type
return final_url
data = await resp.json()
return _get_url(self._url + data['key'], file_type)


class Rentry(PasteService):
Expand All @@ -99,22 +103,14 @@ def __init__(self) -> None:

async def paste(self, ses: aiohttp.ClientSession,
text: str, file_type: Optional[str]) -> Optional[str]:
token = None
async with ses.get(self._url) as resp:
if resp.status != 200:
return None
content = await resp.text()
for i in re.finditer(r'name="csrfmiddlewaretoken" value="(.+)"', content):
token = i.group(1)
break
if not token:
return None
token = await self._get_token(ses, r'name="csrfmiddlewaretoken" value="(.+)"')
if not token:
return None
if file_type:
text = f"```{file_type}\n" + text + "\n```"
async with ses.post(self._url,
data=dict(csrfmiddlewaretoken=token, text=text),
headers=dict(Referer=self._url),
allow_redirects=True) as resp:
headers=dict(Referer=self._url)) as resp:
if resp.status != 200:
return None
return str(resp.url)
Expand All @@ -132,8 +128,7 @@ async def paste(self, ses: aiohttp.ClientSession,
async with ses.post(self._url + "api", json=data) as resp:
if resp.status != 200:
return None
code = await resp.text()
return self._url + code
return self._url + await resp.text()


class PastyLus(PasteService):
Expand All @@ -142,60 +137,83 @@ def __init__(self) -> None:

async def paste(self, ses: aiohttp.ClientSession,
text: str, file_type: Optional[str]) -> Optional[str]:
async with ses.post(self._url + "api/v2/pastes/", json={"content": text}) as resp:
if resp.status != 201:
return None
content = await resp.text()
code = ""
for i in re.finditer(r'"id":"(.+?)"', content):
code = i.group(1)
break
if not code:
try:
async with ses.post(self._url + "api/v2/pastes/", json={"content": text}) as resp:
if resp.status != 201:
return None
code = _get_id(await resp.text())
if not code:
return None
return _get_url(self._url + code, file_type)
except client_exceptions.TooManyRedirects:
return None


class KatBin(PasteService):
def __init__(self) -> None:
super().__init__("katbin", "https://katb.in/")

async def paste(self, ses: aiohttp.ClientSession,
text: str, file_type: Optional[str]) -> Optional[str]:
token = await self._get_token(ses, r'name="_csrf_token".+value="(.+)"')
if not token:
return None
async with ses.post(self._url, data={"_csrf_token": token, "paste[content]": text}) as resp:
if resp.status != 200:
return None
final_url = self._url + code
if file_type:
final_url += '.' + file_type
return final_url
return _get_url(str(resp.url), file_type)


class Katbin(PasteService):
class SpaceBin(PasteService):
def __init__(self) -> None:
self._api_url = "https://api.katb.in/api/paste/"
super().__init__("katbin", "https://katb.in/")
self._api_url = "https://spaceb.in/api/v1/documents/"
super().__init__("spacebin", "https://spaceb.in/")

async def paste(self, ses: aiohttp.ClientSession,
text: str, file_type: Optional[str]) -> Optional[str]:
async with ses.post(self._api_url, json={"content": text}) as resp:
ext = "python" if file_type == "py" else file_type or "markdown"
async with ses.post(self._api_url, data=dict(content=text, extension=ext)) as resp:
if resp.status != 201:
return None
response = await resp.json()
key = response['paste_id']
final_url = self._url + key
if file_type:
final_url += "." + file_type
return final_url
code = _get_id(await resp.text())
if not code:
return None
return self._url + code

async def get_paste(self, ses: aiohttp.ClientSession, code: str) -> Optional[str]:
async with ses.get(self._api_url + code) as resp:
async with ses.get(self._api_url + code + "/raw") as resp:
if resp.status != 200:
return None
response = await resp.json()
return response['content']
return await resp.text()


_SERVICES: Dict[str, PasteService] = {
'-n': NekoBin(), '-h': HasteBin(), '-r': Rentry(),
'-p': Pasting(), '-pl': PastyLus(), '-k': Katbin()}
'-n': NekoBin(), '-h': HasteBin(), '-r': Rentry(), '-p': Pasting(),
'-pl': PastyLus(), '-k': KatBin(), '-s': SpaceBin()}

if Config.HEROKU_ENV:
_DEFAULT_SERVICE = '-pl'
_DEFAULT_SERVICE = '-k'
else:
_DEFAULT_SERVICE = '-n'

_HEADERS = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 ('
'KHTML, like Gecko) Cafari/537.36'}


def _get_id(text: str) -> Optional[str]:
code = None
for i in re.finditer(r'"id":\s?"(.+?)"', text):
code = i.group(1)
break
return code


def _get_url(url: str, file_type: Optional[str]) -> str:
if file_type:
url += "." + file_type
return url


def _get_code(url: str) -> Optional[str]:
parts = list(filter(None, url.split('/')))
if len(parts) < 2:
Expand Down

0 comments on commit c20aa31

Please sign in to comment.