forked from volatilityfoundation/community
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathantianalysis.py
190 lines (162 loc) · 8.48 KB
/
antianalysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# Volatility
# Copyright (C) 2007-2013 Volatility Foundation
#
# This file is part of Volatility.
#
# Volatility is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License Version 2 as
# published by the Free Software Foundation. You may not use, modify or
# distribute this program under any other version of the GNU General
# Public License.
#
# Volatility is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Volatility. If not, see <http://www.gnu.org/licenses/>.
#
"""
@author: Itay k
@license: GNU General Public License 2.0
@contact: itaykrk [at] Gmail [dot] com
@organization: CyberHat.co.il
@description: Antianalysis find windows api calls which identified with anti-debugging\analysis techniques.
"""
import string
import distorm3
import sys
from volatility.plugins.procdump import ProcDump
from volatility import obj
from volatility.plugins import taskmods
from volatility.plugins.malware import impscan
class AntiAnalysis(taskmods.DllList):
def unified_output(self, data):
pass
def __init__(self, config, *args, **kwargs):
taskmods.DllList.__init__(self, config, *args, **kwargs)
self._instructions_history = []
self._api_address = {}
self._anti_analysis_functions = ("isdebuggerpresent", "checkremotedebuggerpresent", "outputdebugstring", "ntqueryinformationprocess", "findwindow", "loadlibrary", "getprocaddress", "gettickcount", "queryperformancecounter",
"blockinput", "continuedebugevent", "setunhandledexceptionfilter", "ntqueryobject", "debugactiveprocess", "isprocessorfeaturepresent")
config.add_option('DUMP-DIR', short_option='D', default=None, cache_invalidator=False, help='Directory in which to dump the process in PE format')
def calculate(self):
return taskmods.DllList.calculate(self)
def _get_function_addresses(self, task):
dlls = ("kernel32.dll", "user32.dll", "ntdll.dll", str(task.ImageFileName))
modules = []
task_space = task.get_process_address_space()
for mod in task.get_load_modules():
if str(mod.BaseDllName).startswith(dlls):
modules.append(mod)
apis = impscan.ImpScan.enum_apis(modules)
base_address = modules[0].DllBase
size_to_read = modules[0].SizeOfImage
data = task_space.zread(base_address, size_to_read)
calls_imported = dict(
(iat, call)
for (_, iat, call) in impscan.ImpScan(self._config).call_scan(task_space, base_address, data)
if call in apis
)
for iat, call in sorted(calls_imported.items()):
if apis[call][1].lower().startswith(self._anti_analysis_functions):
yield hex(iat), apis[call][1]
return
def render_text(self, outfd, data):
for task in data:
if self._config.PID is not None:
if int(task.UniqueProcessId) != int(self._config.PID):
continue
if str(task.ImageFileName) == "System":
continue
task_space = task.get_process_address_space()
#parse pe
dos_header = obj.Object("_IMAGE_DOS_HEADER", offset=task.Peb.ImageBaseAddress, vm=task_space)
nt_header = dos_header.get_nt_header()
code_section = None
data_section = None
#find code and data sections
for sec in nt_header.get_sections():
if ".text" in str(sec.Name or '').lower():
code_section = sec
elif "data" in str(sec.Name or '').lower():
data_section = sec
break
code_section_start_offset = task.Peb.ImageBaseAddress + code_section.VirtualAddress
codeSection = task_space.read(code_section_start_offset, code_section.Misc.VirtualSize)
dataSection = task_space.read(task.Peb.ImageBaseAddress + data_section.VirtualAddress, data_section.Misc.VirtualSize)
self._api_address = {}
for address, func_name in self._get_function_addresses(task):
self._api_address[address] = func_name
# Disassemble process code section
save_file = False
for offset, hexdump, instruction in self.dis(code=codeSection):
self._instructions_history.append((offset + code_section_start_offset, hexdump, instruction))
try:
if instruction.startswith(("CALL", "JMP")):
address = instruction.split()[-1].strip("[").strip("]")
if address.startswith("0x"):
if self._api_address.has_key(address):
save_file = True
outfd.write("Process: {0} Pid: {1} Call Address: {2:<#8x}\nApi Call: {3}\n\n".format(str(task.ImageFileName), task.UniqueProcessId, offset + code_section_start_offset, self._api_address[address]))
last_instruction = self._instructions_history[-5:-1]
for i in last_instruction:
if "PUSH DWORD" in i[2] and not i[2].endswith("]"):
dword_pointer = int(i[2].split()[-1], 16)
dword_start = dword_pointer - (task.Peb.ImageBaseAddress + data_section.VirtualAddress)
dword = self._extract_string(dataSection, dword_start)
outfd.write("{0:<#8x} {1:<32} {2} #{3}\n".format(i[0], i[1], i[2], dword))
else:
outfd.write("{0:<#8x} {1:<32} {2}\n".format(i[0], i[1], i[2]))
outfd.write("{0:<#8x} {1:<32} {2} #{3}\n".format(offset + code_section_start_offset, hexdump, instruction, self._api_address[address]))
outfd.write("\n")
else:
continue
except Exception, e:
outfd.write("%s: %s" % (str(task.ImageFileName), e.message))
if self._config.DUMP_DIR is not None and save_file:
dump_file = "%s.%s.exe" % (str(task.ImageFileName), str(task.UniqueProcessId))
ProcDump(self._config).dump_pe(task_space, task.Peb.ImageBaseAddress, dump_file)
def dis(self, address=0, length = 128, code = None, mode = None):
"""Disassemble code at a given address.
Disassembles code starting at address for a number of bytes
given by the length parameter (default: 128).
Note: This feature requires distorm, available at
http://www.ragestorm.net/distorm/
The mode is '16bit', '32bit' or '64bit'. If not supplied, the disasm
mode is taken from the profile.
"""
if not sys.modules.has_key("distorm3"):
print "ERROR: Disassembly unavailable, distorm not found"
return
data = code
# if mode == None:
# mode = space.profile.metadata.get('memory_model', '32bit')
# we'll actually allow the possiblility that someone passed a correct mode
# if mode not in [distorm3.Decode16Bits, distorm3.Decode32Bits, distorm3.Decode64Bits]:
# if mode == '16bit':
# mode = distorm3.Decode16Bits
# elif mode == '32bit':
mode = distorm3.Decode32Bits
# else:
# mode = distorm3.Decode64Bits
distorm_mode = mode
iterable = distorm3.DecodeGenerator(address, data, distorm_mode)
for (offset, _size, instruction, hexdump) in iterable:
# print "{0:<#8x} {1:<32} {2}".format(offset, hexdump, instruction)
yield offset, hexdump, instruction
def _extract_string(self, dataSection, dword_start):
result = ""
unprintable_flag = False
for c in dataSection[dword_start:]:
if c in string.printable:
result += c
unprintable_flag = False
continue
elif unprintable_flag:
return result
else:
unprintable_flag = True
continue
return result