Skip to content

Commit

Permalink
quora (poe) [gpt-4/3.5] api unpatch
Browse files Browse the repository at this point in the history
  • Loading branch information
xtekky committed Apr 16, 2023
1 parent 90f6f65 commit c38e367
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 98 deletions.
81 changes: 50 additions & 31 deletions quora/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,25 @@
from random import choice, choices, randint
from string import ascii_letters, digits
from urllib import parse
from os import urandom
from hashlib import md5
from json import dumps
from os import urandom
from hashlib import md5
from json import dumps

def extract_formkey(html):
script_regex = r'<script>if\(.+\)throw new Error;(.+)</script>'
script_text = search(script_regex, html).group(1)
key_regex = r'var .="([0-9a-f]+)",'
key_text = search(key_regex, script_text).group(1)
cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]'
cipher_pairs = findall(cipher_regex, script_text)

formkey_list = [""] * len(cipher_pairs)
for pair in cipher_pairs:
formkey_index, key_index = map(int, pair)
formkey_list[formkey_index] = key_text[key_index]
formkey = "".join(formkey_list)

return formkey

class PoeResponse:

Expand Down Expand Up @@ -79,15 +95,16 @@ def create(

client = Session()
client.cookies['p-b'] = token


formkey = extract_formkey(client.get('https://poe.com').text)
settings = client.get('https://poe.com/api/settings').json()

client.headers = {
"host" : "poe.com",
"origin" : "https://poe.com",
"referer" : "https://poe.com/",
"content-type" : "application/json",
"poe-formkey" : settings['formkey'],
"poe-formkey" : formkey,
"poe-tchannel" : settings['tchannelData']['channel'],
"user-agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"connection" : "keep-alive",
Expand All @@ -102,8 +119,8 @@ def create(
"accept-encoding" : "gzip, deflate, br",
"accept-language" : "en-GB,en-US;q=0.9,en;q=0.8",
}

response = client.post("https://poe.com/api/gql_POST", json = {
payload = dumps(separators=(',', ':'), obj = {
'queryName': 'CreateBotMain_poeBotCreate_Mutation',
'variables': {
'model' : models[model],
Expand All @@ -124,6 +141,11 @@ def create(
'query': 'mutation CreateBotMain_poeBotCreate_Mutation(\n $model: String!\n $handle: String!\n $prompt: String!\n $isPromptPublic: Boolean!\n $introduction: String!\n $description: String!\n $profilePictureUrl: String\n $apiUrl: String\n $apiKey: String\n $isApiBot: Boolean\n $hasLinkification: Boolean\n $hasMarkdownRendering: Boolean\n $hasSuggestedReplies: Boolean\n $isPrivateBot: Boolean\n) {\n poeBotCreate(model: $model, handle: $handle, promptPlaintext: $prompt, isPromptPublic: $isPromptPublic, introduction: $introduction, description: $description, profilePicture: $profilePictureUrl, apiUrl: $apiUrl, apiKey: $apiKey, isApiBot: $isApiBot, hasLinkification: $hasLinkification, hasMarkdownRendering: $hasMarkdownRendering, hasSuggestedReplies: $hasSuggestedReplies, isPrivateBot: $isPrivateBot) {\n status\n bot {\n id\n ...BotHeader_bot\n }\n }\n}\n\nfragment BotHeader_bot on Bot {\n displayName\n messageLimit {\n dailyLimit\n }\n ...BotImage_bot\n ...BotLink_bot\n ...IdAnnotation_node\n ...botHelpers_useViewerCanAccessPrivateBot\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotImage_bot on Bot {\n displayName\n ...botHelpers_useDeletion_bot\n ...BotImage_useProfileImage_bot\n}\n\nfragment BotImage_useProfileImage_bot on Bot {\n image {\n __typename\n ... on LocalBotImage {\n localName\n }\n ... on UrlBotImage {\n url\n }\n }\n ...botHelpers_useDeletion_bot\n}\n\nfragment BotLink_bot on Bot {\n displayName\n}\n\nfragment IdAnnotation_node on Node {\n __isNode: __typename\n id\n}\n\nfragment botHelpers_useDeletion_bot on Bot {\n deletionState\n}\n\nfragment botHelpers_useViewerCanAccessPrivateBot on Bot {\n isPrivateBot\n viewerIsCreator\n}\n',
})

base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k'
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()

response = client.post("https://poe.com/api/gql_POST", data = payload)

if not 'success' in response.text:
raise Exception('''
Bot creation Failed
Expand All @@ -136,15 +158,14 @@ def create(

class Account:
def create(proxy: None or str = None, logging: bool = False, enable_bot_creation: bool = False):

client = Session()
client.proxies = {
'http': f'http://{proxy}',
'https': f'http://{proxy}'} if proxy else None

mail = Mail(client.proxies)
mail_token = None
mail_address = mail.get_mail()
_, mail_address = mail.get_mail()

if logging: print('email', mail_address)

Expand All @@ -167,12 +188,9 @@ def create(proxy: None or str = None, logging: bool = False, enable_bot_creation
"upgrade-insecure-requests": "1",
}

init = client.get('https://poe.com/login')
next_data = loads(search(r'json">(.+?)</script>', init.text).group(1))
client.headers["poe-formkey"] = extract_formkey(client.get('https://poe.com/login').text)
client.headers["poe-tchannel"] = client.get('https://poe.com/api/settings').json()['tchannelData']['channel']

client.headers["poe-formkey"] = next_data['props']['formkey']
client.headers["poe-tchannel"] = client.get('https://poe.com/api/settings').json()['tchannelData']['channel']

payload = dumps(separators = (',', ':'), obj = {
'queryName': 'MainSignupLoginSection_sendVerificationCodeMutation_Mutation',
'variables': {
Expand All @@ -189,18 +207,18 @@ def create(proxy: None or str = None, logging: bool = False, enable_bot_creation
response = client.post('https://poe.com/api/gql_POST', data=payload)
if 'Bad Request' in response.text:
if logging: print('bad request, retrying...' , response.json())
Account.create(proxy = proxy, logging = logging)
quit()

if logging: print('send_code' ,response.json())

while True:
sleep(1)
inbox = mail.fetch_inbox()

for _ in inbox:
content = mail.get_message(_["id"])
mail_token = findall(r';">(\d{6,7})</div>', content['html'][0])[0]

messages = mail.fetch_inbox()
if len(messages["messages"]) > 0:
email_content = mail.get_message_content(messages["messages"][0]["_id"])
mail_token = findall(r';">(\d{6,7})</div>', email_content)[0]
if mail_token:
break

Expand All @@ -215,31 +233,32 @@ def create(proxy: None or str = None, logging: bool = False, enable_bot_creation
},
"query": "mutation SignupOrLoginWithCodeSection_signupWithVerificationCodeMutation_Mutation(\n $verificationCode: String!\n $emailAddress: String\n $phoneNumber: String\n) {\n signupWithVerificationCode(verificationCode: $verificationCode, emailAddress: $emailAddress, phoneNumber: $phoneNumber) {\n status\n errorMessage\n }\n}\n"
})

base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k'
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()

response = client.post('https://poe.com/api/gql_POST', data = payload)
if logging: print('verify_code', response.json())

token = parse.unquote(client.cookies.get_dict()['p-b'])

with open(Path(__file__).resolve().parent / 'cookies.txt', 'a') as f:
f.write(f'{token}\n')

if enable_bot_creation:
payload = {

payload = dumps(separators = (',', ':'), obj={
"queryName": "UserProfileConfigurePreviewModal_markMultiplayerNuxCompleted_Mutation",
"variables": {},
"query": "mutation UserProfileConfigurePreviewModal_markMultiplayerNuxCompleted_Mutation {\n markMultiplayerNuxCompleted {\n viewer {\n hasCompletedMultiplayerNux\n id\n }\n }\n}\n"
}
})

base_string = dumps(payload, separators = (',', ':')) + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k'
base_string = payload + client.headers["poe-formkey"] + 'WpuLMiXEKKE98j56k'
client.headers["poe-tag-id"] = md5(base_string.encode()).hexdigest()

client.post("https://poe.com/api/gql_POST", json = payload)


resp = client.post("https://poe.com/api/gql_POST", data = payload)
if logging: print(resp.json())

return token

def get():
Expand Down
52 changes: 38 additions & 14 deletions quora/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import queue
import threading
import traceback
import hashlib
import websocket
from pathlib import Path
from urllib.parse import urlparse


parent_path = Path(__file__).resolve().parent
queries_path = parent_path / "graphql"
queries = {}
Expand Down Expand Up @@ -75,12 +77,15 @@ def __init__(self, token, proxy=None):
"Referrer": "https://poe.com/",
"Origin": "https://poe.com",
}
self.ws_domain = f"tch{random.randint(1, 1e6)}"

self.session.headers.update(self.headers)

self.setup_connection()
self.connect_ws()

def setup_connection(self):
self.ws_domain = f"tch{random.randint(1, 1e6)}"
self.next_data = self.get_next_data(overwrite_vars=True)
self.channel = self.get_channel_data()
self.connect_ws()
self.bots = self.get_bots(download_next_data=False)
self.bot_names = self.get_bot_names()

Expand All @@ -91,6 +96,22 @@ def __init__(self, token, proxy=None):
self.gql_headers = {**self.gql_headers, **self.headers}
self.subscribe()

def extract_formkey(self, html):
script_regex = r'<script>if\(.+\)throw new Error;(.+)</script>'
script_text = re.search(script_regex, html).group(1)
key_regex = r'var .="([0-9a-f]+)",'
key_text = re.search(key_regex, script_text).group(1)
cipher_regex = r'.\[(\d+)\]=.\[(\d+)\]'
cipher_pairs = re.findall(cipher_regex, script_text)

formkey_list = [""] * len(cipher_pairs)
for pair in cipher_pairs:
formkey_index, key_index = map(int, pair)
formkey_list[formkey_index] = key_text[key_index]
formkey = "".join(formkey_list)

return formkey

def get_next_data(self, overwrite_vars=False):
logger.info("Downloading next_data...")

Expand All @@ -100,7 +121,7 @@ def get_next_data(self, overwrite_vars=False):
next_data = json.loads(json_text)

if overwrite_vars:
self.formkey = next_data["props"]["formkey"]
self.formkey = self.extract_formkey(r.text)
self.viewer = next_data["props"]["pageProps"]["payload"]["viewer"]

return next_data
Expand Down Expand Up @@ -145,7 +166,6 @@ def get_channel_data(self, channel=None):
r = request_with_retries(self.session.get, self.settings_url)
data = r.json()

self.formkey = data["formkey"]
return data["tchannelData"]

def get_websocket_url(self, channel=None):
Expand All @@ -157,19 +177,20 @@ def get_websocket_url(self, channel=None):
def send_query(self, query_name, variables):
for i in range(20):
json_data = generate_payload(query_name, variables)
payload = json.dumps(json_data, separators=(',', ':'))
base_string = payload + self.gql_headers['poe-formkey'] + 'WpuLMiXEKKE98j56k'

from hashlib import md5
headers = self.gql_headers |{
payload = json.dumps(json_data, separators=(",", ":"))

base_string = payload + \
self.gql_headers["poe-formkey"] + "WpuLMiXEKKE98j56k"

headers = {
"content-type": "application/json",
"poe-tag-id": md5(base_string.encode()).hexdigest()
"poe-tag-id": hashlib.md5(base_string.encode()).hexdigest()
}

headers = {**self.gql_headers, **headers}

r = request_with_retries(
self.session.post, self.gql_url, data=payload, headers=headers)

data = r.json()
if data["data"] == None:
logger.warn(
Expand Down Expand Up @@ -280,10 +301,13 @@ def send_message(self, chatbot, message, with_chat_break=False, timeout=20):
self.active_messages["pending"] = None

logger.info(f"Sending message to {chatbot}: {message}")

# reconnect websocket
if not self.ws_connected:
self.disconnect_ws()
self.setup_connection()
self.connect_ws()

message_data = self.send_query("SendMessageMutation", {
"bot": chatbot,
"query": message,
Expand Down
1 change: 1 addition & 0 deletions quora/cookies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ juCAh6kB0sUpXHvKik2woA==
nBvuNYRLaE4xE4HuzBPiIQ==
oyae3iClomSrk6RJywZ4iw==
1Z27Ul8BTdNOhncT5H6wdg==
wfUfJIlwQwUss8l-3kDt3w==
83 changes: 30 additions & 53 deletions quora/mail.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,39 @@
from requests import Session
from string import ascii_letters
from random import choices
import html
import json
from tls_client import Session

class Mail:
def __init__(self, proxies: dict = None) -> None:
self.client = Session()
self.client.proxies = None #proxies
self.client.headers = {
"host": "api.mail.tm",
"connection": "keep-alive",
"sec-ch-ua": "\"Google Chrome\";v=\"111\", \"Not(A:Brand\";v=\"8\", \"Chromium\";v=\"111\"",
"accept": "application/json, text/plain, */*",
"content-type": "application/json",
"sec-ch-ua-mobile": "?0",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
"sec-ch-ua-platform": "\"macOS\"",
"origin": "https://mail.tm",
"sec-fetch-site": "same-site",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
"referer": "https://mail.tm/",
"accept-encoding": "gzip, deflate, br",
"accept-language": "en-GB,en-US;q=0.9,en;q=0.8"
}
def __init__(self, proxies: str = None, timeout: int = 15, bearer_token: str or None = None) -> None:
self.session = Session(client_identifier='chrome110')
self.base_url = 'https://web2.temp-mail.org'
self.proxies = proxies
self.timeout = timeout

self.session.headers['authorization'] = f'Bearer {bearer_token}' if bearer_token else None

def get_mail(self) -> str:
token = ''.join(choices(ascii_letters, k=10)).lower()

init = self.client.post("https://api.mail.tm/accounts", json={
"address" : f"{token}@bugfoo.com",
"password": token
})

if init.status_code == 201:
resp = self.client.post("https://api.mail.tm/token", json = {
**init.json(),
"password": token
})

self.client.headers['authorization'] = 'Bearer ' + resp.json()['token']

return f"{token}@bugfoo.com"
status: html = self.session.get(self.base_url).status_code

else:
raise Exception("Failed to create email")

def fetch_inbox(self):
return self.client.get(f"https://api.mail.tm/messages").json()["hydra:member"]

def get_message(self, message_id: str):
return self.client.get(f"https://api.mail.tm/messages/{message_id}").json()
try:
if status == 200:
data = self.session.post(f'{self.base_url}/mailbox').json()

self.session.headers['authorization'] = f'Bearer {data["token"]}'
return data["token"], data["mailbox"]

except Exception as e:
print(e)
return f'Email creation error. {e} | use proxies', False

def fetch_inbox(self) -> json:
return self.session.get(f'{self.base_url}/messages').json()

def get_message_content(self, message_id: str):
return self.get_message(message_id)["text"]
return self.session.get(f'{self.base_url}/messages/{message_id}').json()["bodyHtml"]

# if __name__ == '__main__':

# if __name__ == "__main__":
# client = Mail()
# client.get_mail()


# email_client = TempMail()
# token, email = email_client.get_mail()
# print(email)
# print(token)

0 comments on commit c38e367

Please sign in to comment.