forked from JusticeRage/Gepetto
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgepetto.py
303 lines (259 loc) · 13.2 KB
/
gepetto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
import functools
import json
import os
import re
import textwrap
import threading
import gettext
import idaapi
import ida_hexrays
import ida_kernwin
import idc
import openai
# =============================================================================
# EDIT VARIABLES IN THIS SECTION
# =============================================================================
# Set your API key here, or put it in the OPENAI_API_KEY environment variable.
openai.api_key = ""
# Specify the program language. It can be "fr_FR", "zh_CN", or any folder in gepetto-locales.
# Defaults to English.
language = ""
# =============================================================================
# END
# =============================================================================
# Set up translations
translate = gettext.translation('gepetto',
os.path.join(os.path.abspath(os.path.dirname(__file__)), "gepetto-locales"),
fallback=True,
languages=[language])
_ = translate.gettext
# =============================================================================
# Setup the context menu and hotkey in IDA
# =============================================================================
class GepettoPlugin(idaapi.plugin_t):
flags = 0
explain_action_name = "gepetto:explain_function"
explain_menu_path = "Edit/Gepetto/" + _("Explain function")
rename_action_name = "gepetto:rename_function"
rename_menu_path = "Edit/Gepetto/" + _("Rename variables")
wanted_name = 'Gepetto'
wanted_hotkey = ''
comment = _("Uses gpt-3.5-turbo to enrich the decompiler's output")
help = _("See usage instructions on GitHub")
menu = None
def init(self):
# Check whether the decompiler is available
if not ida_hexrays.init_hexrays_plugin():
return idaapi.PLUGIN_SKIP
# Function explaining action
explain_action = idaapi.action_desc_t(self.explain_action_name,
_('Explain function'),
ExplainHandler(),
"Ctrl+Alt+G",
_('Use gpt-3.5-turbo to explain the currently selected function'),
199)
idaapi.register_action(explain_action)
idaapi.attach_action_to_menu(self.explain_menu_path, self.explain_action_name, idaapi.SETMENU_APP)
# Variable renaming action
rename_action = idaapi.action_desc_t(self.rename_action_name,
_('Rename variables'),
RenameHandler(),
"Ctrl+Alt+R",
_("Use gpt-3.5-turbo to rename this function's variables"),
199)
idaapi.register_action(rename_action)
idaapi.attach_action_to_menu(self.rename_menu_path, self.rename_action_name, idaapi.SETMENU_APP)
# Register context menu actions
self.menu = ContextMenuHooks()
self.menu.hook()
return idaapi.PLUGIN_KEEP
def run(self, arg):
pass
def term(self):
idaapi.detach_action_from_menu(self.explain_menu_path, self.explain_action_name)
idaapi.detach_action_from_menu(self.rename_menu_path, self.rename_action_name)
if self.menu:
self.menu.unhook()
return
# -----------------------------------------------------------------------------
class ContextMenuHooks(idaapi.UI_Hooks):
def finish_populating_widget_popup(self, form, popup):
# Add actions to the context menu of the Pseudocode view
if idaapi.get_widget_type(form) == idaapi.BWN_PSEUDOCODE:
idaapi.attach_action_to_popup(form, popup, GepettoPlugin.explain_action_name, "Gepetto/")
idaapi.attach_action_to_popup(form, popup, GepettoPlugin.rename_action_name, "Gepetto/")
# -----------------------------------------------------------------------------
def comment_callback(address, view, response):
"""
Callback that sets a comment at the given address.
:param address: The address of the function to comment
:param view: A handle to the decompiler window
:param response: The comment to add
"""
response = "\n".join(textwrap.wrap(response, 80, replace_whitespace=False))
# Add the response as a comment in IDA, but preserve any existing non-Gepetto comment
comment = idc.get_func_cmt(address, 0)
comment = re.sub(r'----- ' + _("Comment generated by Gepetto") + ' -----.*?----------------------------------------',
r"",
comment,
flags=re.DOTALL)
idc.set_func_cmt(address, '----- ' + _("Comment generated by Gepetto") +
f" -----\n\n"
f"{response.strip()}\n\n"
f"----------------------------------------\n\n"
f"{comment.strip()}", 0)
# Refresh the window so the comment is displayed properly
if view:
view.refresh_view(False)
print(_("gpt-3.5-turbo query finished!"))
# -----------------------------------------------------------------------------
class ExplainHandler(idaapi.action_handler_t):
"""
This handler is tasked with querying gpt-3.5-turbo for an explanation of the
given function. Once the reply is received, it is added as a function
comment.
"""
def __init__(self):
idaapi.action_handler_t.__init__(self)
def activate(self, ctx):
decompiler_output = ida_hexrays.decompile(idaapi.get_screen_ea())
v = ida_hexrays.get_widget_vdui(ctx.widget)
query_model_async(_("Can you explain what the following C function does and suggest a better name for it?\n"
"{decompiler_output}").format(decompiler_output=str(decompiler_output)),
functools.partial(comment_callback, address=idaapi.get_screen_ea(), view=v))
return 1
# This action is always available.
def update(self, ctx):
return idaapi.AST_ENABLE_ALWAYS
# -----------------------------------------------------------------------------
def rename_callback(address, view, response, retries=0):
"""
Callback that extracts a JSON array of old names and new names from the
response and sets them in the pseudocode.
:param address: The address of the function to work on
:param view: A handle to the decompiler window
:param response: The response from gpt-3.5-turbo
:param retries: The number of times that we received invalid JSON
"""
j = re.search(r"\{[^}]*?\}", response)
if not j:
if retries >= 3: # Give up obtaining the JSON after 3 times.
print(_("Could not obtain valid data from the model, giving up. Dumping the response for manual import:"))
print(response)
return
print(_("Cannot extract valid JSON from the response. Asking the model to fix it..."))
query_model_async(_("The JSON document provided in this response is invalid. Can you fix it?\n"
"{response}").format(response=response),
functools.partial(rename_callback,
address=address,
view=view,
retries=retries + 1))
return
try:
names = json.loads(j.group(0))
except json.decoder.JSONDecodeError:
if retries >= 3: # Give up fixing the JSON after 3 times.
print(_("Could not obtain valid data from the model, giving up. Dumping the response for manual import:"))
print(response)
return
print(_("The JSON document returned is invalid. Asking the model to fix it..."))
query_model_async(_("Please fix the following JSON document:\n{json}").format(json=j.group(0)),
functools.partial(rename_callback,
address=address,
view=view,
retries=retries + 1))
return
# The rename function needs the start address of the function
function_addr = idaapi.get_func(address).start_ea
replaced = []
for n in names:
if ida_hexrays.rename_lvar(function_addr, n, names[n]):
replaced.append(n)
# Update possible names left in the function comment
comment = idc.get_func_cmt(address, 0)
if comment and len(replaced) > 0:
for n in replaced:
comment = re.sub(r'\b%s\b' % n, names[n], comment)
idc.set_func_cmt(address, comment, 0)
# Refresh the window to show the new names
if view:
view.refresh_view(True)
print(_("gpt-3.5-turbo query finished! {replaced} variable(s) renamed.").format(replaced=len(replaced)))
# -----------------------------------------------------------------------------
class RenameHandler(idaapi.action_handler_t):
"""
This handler requests new variable names from gpt-3.5-turbo and updates the
decompiler's output.
"""
def __init__(self):
idaapi.action_handler_t.__init__(self)
def activate(self, ctx):
decompiler_output = ida_hexrays.decompile(idaapi.get_screen_ea())
v = ida_hexrays.get_widget_vdui(ctx.widget)
query_model_async(_("Analyze the following C function:\n{decompiler_output}"
"\nSuggest better variable names, reply with a JSON array where keys are the original names "
"and values are the proposed names. Do not explain anything, only print the JSON "
"dictionary.").format(decompiler_output=str(decompiler_output)),
functools.partial(rename_callback, address=idaapi.get_screen_ea(), view=v))
return 1
# This action is always available.
def update(self, ctx):
return idaapi.AST_ENABLE_ALWAYS
# =============================================================================
# gpt-3.5-turbo interaction
# =============================================================================
def query_model(query, cb, max_tokens=2500):
"""
Function which sends a query to gpt-3.5-turbo and calls a callback when the response is available.
Blocks until the response is received
:param query: The request to send to gpt-3.5-turbo
:param cb: Tu function to which the response will be passed to.
"""
try:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": query}
]
)
ida_kernwin.execute_sync(functools.partial(cb, response=response.choices[0]["message"]["content"]),
ida_kernwin.MFF_WRITE)
except openai.InvalidRequestError as e:
# Context length exceeded. Determine the max number of tokens we can ask for and retry.
m = re.search(r'maximum context length is (\d+) tokens, however you requested \d+ tokens \((\d+) in your '
r'prompt;', str(e))
if not m:
print(_("gpt-3.5-turbo could not complete the request: {error}").format(error=str(e)))
return
(hard_limit, prompt_tokens) = (int(m.group(1)), int(m.group(2)))
max_tokens = hard_limit - prompt_tokens
if max_tokens >= 750:
print(_("Context length exceeded! Reducing the completion tokens to "
"{max_tokens}...").format(max_tokens=max_tokens))
query_model(query, cb, max_tokens)
else:
print("Unfortunately, this function is too big to be analyzed with the model's current API limits.")
except openai.OpenAIError as e:
print(_("gpt-3.5-turbo could not complete the request: {error}").format(error=str(e)))
except Exception as e:
print(_("General exception encountered while running the query: {error}").format(error=str(e)))
# -----------------------------------------------------------------------------
def query_model_async(query, cb):
"""
Function which sends a query to gpt-3.5-turbo and calls a callback when the response is available.
:param query: The request to send to gpt-3.5-turbo
:param cb: Tu function to which the response will be passed to.
"""
print(_("Request to gpt-3.5-turbo sent..."))
t = threading.Thread(target=query_model, args=[query, cb])
t.start()
# =============================================================================
# Main
# =============================================================================
def PLUGIN_ENTRY():
if not openai.api_key:
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
print(_("Please edit this script to insert your OpenAI API key!"))
raise ValueError("No valid OpenAI API key found")
return GepettoPlugin()