-
Notifications
You must be signed in to change notification settings - Fork 14
/
my_debugger.py
141 lines (114 loc) · 5.1 KB
/
my_debugger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
__author__ = 'Dave'
from ctypes import *
from my_debugger_defines import *
kernel32 = windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = None
self.h_thread = None
self.context = None
self.exception = None
self.exception_address = None
def load(self, path_to_exe):
creation_flags = DEBUG_PROCESS
startupinfo = STARTUPINFO()
process_information = PROCESS_INFORMATION()
startupinfo.dwFlags = 0x01
process_information.wShowWindow = 0x0
startupinfo.cb = sizeof(startupinfo)
if kernel32.CreateProcessA(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
byref(startupinfo),
byref(process_information)):
print("[*] We have successfully launched the process!")
print("[*] PID: %d" % process_information.dwProcessId)
else:
print("[*] Error: 0x%08x." % kernel32.GetLastError())
def open_process(self, pid):
h_process = kernel32.OpenProcess(PROCESS_ALL_ACCESS, pid, False)
return h_process
def attach(self, pid):
self.h_process = self.open_process(pid)
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
# self.run()
else:
# Don't forget that you can't attach to a 64 bit process from
# this program. calc.exe is 64 bit for example
print "[*] Unable to attach to the process"
print "[*] Error: %d" % kernel32.GetLastError()
def run(self):
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
debug_event = DEBUG_EVENT()
continue_status = DBG_CONTINUE
if kernel32.WaitForDebugEvent(byref(debug_event), INFINITE):
# raw_input("Press button to continue")
# self.debugger_active = False
self.h_thread = self.open_thread(debug_event.dwThreadId)
self.context = self.get_thread_context(self.h_thread)
print "Event Code: %d Thread ID: %d" % (debug_event.dwDebugEventCode, debug_event.dwThreadId)
# kernel32.ContinueDebugEvent(debug_event.dwProcessId, debug_event.dwThreadId, continue_status)
if debug_event.dwDebugEventCode == EXCEPTION_DEBUG_EVENT:
exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
if exception == EXCEPTION_ACCESS_VIOLATION:
print "Access Violation Detected."
elif exception == EXCEPTION_BREAKPOINT:
continue_status = self.exception_handler_breakpoint()
elif exception == EXCEPTION_GUARD_PAGE:
print "Guard Page Access Detected."
elif exception == EXCEPTION_SINGLE_STEP:
print "Single Stepping"
kernel32.ContinueDebugEvent(debug_event.dwProcessId, debug_event.dwThreadId, continue_status)
def exception_handler_breakpoint(self):
print "[*] Inside breakpoint handler."
print "Exception Address: 0x%08x" % (self.exception_address)
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print "[*] Finished debugging. Exiting..."
return True
else:
print "There was an error!"
return False
def open_thread(self, thread_id):
h_thread = kernel32.OpenThread(THREAD_ALL_ACCESS, None, thread_id)
if h_thread is not None:
return h_thread
else:
print "[*] Could not obtain a valid thread handle"
return False
def enumerate_threads(self):
thread_entry = THREADENTRY32()
thread_list = []
snapshot = kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
thread_entry.dwSize = sizeof(thread_entry)
success = kernel32.Thread32First(snapshot, byref(thread_entry))
while success:
if thread_entry.th32OwnerProcessID == self.pid:
thread_list.append(thread_entry.th32ThreadID)
success = kernel32.Thread32Next(snapshot, byref(thread_entry))
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
def get_thread_context(self, thread_id):
context = CONTEXT()
context.ContextFlags = CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS
h_thread = self.open_thread(thread_id)
if kernel32.GetThreadContext(h_thread, byref(context)):
kernel32.CloseHandle(h_thread)
return context
else:
return False