forked from jrient/cpt-zsxq-auto-replay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
zsxq.py
90 lines (79 loc) · 2.55 KB
/
zsxq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import requests
import openai
import os
group_id = os.getenv("ZSXQ_GROUP_ID")
cookie = os.getenv("ZSXQ_COOKIE")
openai.api_key = os.getenv("OPENAI_API_KEY")
if group_id is None:
print("请设置环境变量 ZSXQ_GROUP_ID")
exit()
if cookie is None:
print("请设置环境变量 ZSXQ_COOKIE")
exit()
if openai.api_key is None:
print("请设置环境变量 OPENAI_API_KEY")
exit()
# 访问知识星球API,获取问题列表
def get_questions():
questions_url = f"https://api.zsxq.com/v2/groups/{group_id}/topics?scope=unanswered_questions&count=20"
response = requests.get(questions_url, headers={"cookie": f"{cookie}"})
if response.status_code == 200:
try:
questions_data = response.json()["resp_data"]["topics"]
except:
questions_data = []
return questions_data
else:
print("Unable to get questions data.")
return []
# 使用ChatGPT API,回答问题
def get_answer(question):
response = openai.Completion.create(
model="text-davinci-003",
prompt=f"{question}",
temperature=0,
max_tokens=2048,
top_p=1.0,
frequency_penalty=0.0,
presence_penalty=0.0,
stop=["\"\"\""]
)
if response:
answer = response["choices"][0]["text"]
return answer
else:
print("Unable to get answer.")
return ""
# 访问知识星球API,回答问题
def post_answer(question_id, answer):
print(f"问题id:{question_id}")
post_answer_url = f"https://api.zsxq.com/v2/topics/{question_id}/answer"
response = requests.post(post_answer_url, headers={"Cookie": cookie}, json={
"req_data":{
"image_ids": [],
"text": f"ChatGPT: {answer}"
}
})
if (response.status_code == 200) and (response.json()['succeeded']):
print("Answer posted successfully!")
else:
print("Unable to post answer.:"+response.text)
# 主函数
def main():
# 获取问题列表
questions_data = get_questions()
if questions_data == []:
print("未发现新问题")
exit()
# 遍历问题列表,回答问题
for question in questions_data:
question_id = question["topic_id"]
question_text = question["question"]['text']
print(f"发现新的提问:{question_text} \n")
# 获取答案
answer_text = get_answer(question_text)
print(f"生成问题的答案:{answer_text}")
# 发布答案
post_answer(question_id, answer_text)
if __name__ == '__main__':
main()