-
Notifications
You must be signed in to change notification settings - Fork 9
/
main.py
160 lines (141 loc) · 6.12 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import re
import json
import random
from prompt.prompts import *
from collections import Counter
from macm.executor import Execute_steps
from macm.judge import Judge_statement, Judge_answer, Judge_condition
from macm.thinker import Analysis_conditions, Think_thoughts, Think_Steps
def check_condition(question,condition, n):
"""
Use several Judges to check the statement
Input:
conditions, unchecked_conditions, the number of the inspectors (List, Str, int)
Output:
True/False (bool)
"""
for _ in range(n):
if Judge_condition(question,condition).strip() == "False":
return False
return True
def check_statement(conditions,statement, n):
"""
Use several Judges to check the statement
Input:
conditions, unchecked_conditions, the number of the inspectors (List, Str, int)
Output:
True/False (bool)
"""
for _ in range(n):
answer = Judge_statement(conditions,statement)
if "False" in answer or "false" in answer:
return False
return True
def check_answer(conditions,statement):
"""
Use several Judges to check the answer
Input:
unchecked_conditions, the number of the inspectors (Str, int)
Output:
True/False (bool)
"""
if_got_answer = Judge_answer(conditions,statement)
if "False" in if_got_answer or "false" in if_got_answer:
return False
return True
def check_if_got_answer(conditions,statement,n):
for _ in range(n):
if check_answer(conditions,statement) == False:
return False
return True
def main(question, times, n, min_voters, max_voters):
"""
Input question and get the final answer from muti-Agent got
Input:
quesion, the number of times new conditions are identified, the number of the inspectors (Str, int, int)
Output:
final answer (Str)
"""
possible_answers = []
try:
voter_count = 0
tie = True
# Vote
while tie or voter_count < min_voters:
voter_count += 1
print(f"\n# {voter_count} Thinker is analyzing the question...")
conditions,objectives = Analysis_conditions(question)
Initial_condition_numbers = len(conditions) # This line will be used for the $while$ mode
# Think thoughts
# while len(conditions) - Initial_condition_numbers <= times:
for time in range(times): # Try to reduce the LLM queries.
print(f"\n# {voter_count} Thinker is thinking new thoughts...")
unchecked_conditions = Think_thoughts(conditions,objectives)
checked_conditions = []
for unchecked_condition in unchecked_conditions:
print(f"\n# {voter_count} Judge is checking conditions...")
if check_statement(conditions,unchecked_condition,n):
start = unchecked_condition.find("we can get: ")
if start != -1:
unchecked_condition = unchecked_condition[start + len("we can get: "):]
unchecked_condition = unchecked_condition.split("Reason:")[0]
checked_conditions.append(unchecked_condition)
conditions = conditions + checked_conditions
if_got_answer = check_if_got_answer(conditions,objectives,1)
if if_got_answer:
break
print(f"\n# {voter_count} thinker is thinking steps...")
steps = Think_Steps(conditions,objectives)
print(f"\n# {voter_count} Executor is trying to calculate the answer...")
final_answer = Execute_steps(conditions,objectives,steps)
# Achieve one potiential answer
Answer = re.search(r'\\boxed\{(.*)(?=\})', final_answer)
if Answer:
Answer_boxed = Answer.group(1)
else:
Answer_boxed = "No match found"
possible_answers.append(Answer_boxed)
if voter_count >= min_voters:
counter = Counter(possible_answers)
most_votes = counter.most_common(1)[0][1]
tie_count = len(list(filter(lambda x: x[1] == most_votes, counter.items())))
tie = tie_count > 1
print("\nThere is a tie vote. We need to add another voter.")
if voter_count >= max_voters:
print("\nReached maximum voter limit.")
break
most_possible_answer, count = counter.most_common(1)[0]
print(f"\nThe final answer is {most_possible_answer}")
return most_possible_answer
except Exception as e:
print(f"Error processing file: {e}")
def evaluate_dataset(folder_path, times, n, limit=5):
all_files = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.endswith('.json'):
file_path = os.path.join(root, file)
all_files.append(file_path)
random.shuffle(all_files) # Shuffle the order of files randomly
for count, file_path in enumerate(all_files[:limit]):
with open(file_path, 'r') as json_file:
try:
data = json.load(json_file)
problem = data.get("problem")
if problem:
print(f"#{count} Problem:\n", problem)
solution = data.get("solution")
print(f"#{count} Solution\n", solution)
main(problem, times, n)
except json.JSONDecodeError:
print(f"Error reading file {file_path}")
except Exception as e:
print(f"Error processing file {file_path}: {e}")
if __name__ == "__main__":
n = 1 # verification times
times = 5 # The upper limit of the mining times
min_voters = 5 # min number of voters
max_voters = 7 # max number of voters
question = "" # Input your own question
main(question, times, n, min_voters, max_voters) # Assuming these are defined elsewhere