-
Notifications
You must be signed in to change notification settings - Fork 32
/
FCGextractor.py
336 lines (287 loc) · 11.7 KB
/
FCGextractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
#!/usr/bin/python
# ADAGIO Android Application Graph-based Classification
# fcg_extractor.py >> Read all APKs in dir and save NX graphs as pickle objects
# Copyright (c) 2013 Hugo Gascon <[email protected]>
""" A module to build NX objects from APKs call graphs. """
import sys
import os
import random
import struct
import zipfile
import networkx as nx
import numpy as np
import pz
from hashlib import sha256
from progressbar import *
from optparse import OptionParser
from instructionSet import INSTRUCTION_SET_COLOR
from instructionSet import INSTRUCTION_CLASS_COLOR
from instructionSet import INSTRUCTION_CLASSES
from androguard import *
from androguard.core.analysis import *
from androlyze import *
##################################################################################
# APK to NX encoding functions #
##################################################################################
def process_apk_dir(dataset_dir):
""" Convert a series of APK into FCGNX objects
Load all APKs in a dir subtree and create FCG objects that are
pickled for later processing and learning.
Args:
dataset_dir: a directory containing a list of APK files.
"""
sys.setrecursionlimit(100000)
files = []
# check if fcg doesnt exist yet and mark the file to be processed
for dirName, subdirList, fileList in os.walk(dataset_dir):
for f in fileList:
files.append(os.path.join(dirName,f))
# set up progress bar
print "\nProcessing {0} APK files in dir {1}".format(len(files), dataset_dir)
widgets = ['Building CGs: ', Percentage(), ' ', Bar(marker='#',left='[',right=']'),
' ', ETA(), ' ']
pbar = ProgressBar(widgets=widgets, maxval=len(files))
pbar.start()
progress = 0
# loop through .apk files and save them in .fcg format
for f in files:
# f = os.path.join(dataset_dir, fn)
print "[] Loading {0}".format(f)
try:
g = build_fcg_nx(f)
# if an exception happens, save the .apk in the corresponding dir
except Exception as e:
err = e.__class__.__name__
err_dir = err + "/"
d = os.path.join(dataset_dir, err_dir)
if not os.path.exists(d):
os.makedirs(d)
cmd = "cp {0} {1}".format(f, d)
os.system(cmd)
print "[*] {0} error loading {1}".format(err, f)
continue
h = get_sha256(f)
fnx = os.path.join(os.path.split(f)[0], "{0}.fcg.pz".format(h))
pz.save(g, fnx)
print "[*] Saved {0}\n".format(fnx)
progress += 1
pbar.update(progress)
pbar.finish()
print "Done."
def get_sha256(filename):
""" Return sha256 of the file in the input path. """
f = open(filename)
s = sha256()
s.update(f.read())
digest = s.hexdigest()
f.close()
return digest
def build_fcg_nx(file):
""" Using NX and Androguard, build a directed graph NX object so that:
- node names are method names as: class name, method name and descriptor
- each node has a label that encodes the method behavior
"""
#nx graph for FCG extracted from APK: nodes = method_name, labels = encoded instructions
fcgnx = nx.DiGraph()
try:
a, d, dx = AnalyzeAPK(file)
except zipfile.BadZipfile:
#if file is not an APK, may be a dex object
d, dx = AnalyzeDex(file)
for method in d.get_methods():
node_name = get_node_name(method)
#find calls from this method
children = []
for cob in method.XREFto.items:
remote_method = cob[0]
children.append(get_node_name(remote_method))
#find all instructions in method and encode using coloring
instructions = []
for i in method.get_instructions():
instructions.append(i.get_name())
encoded_label = color_instructions(instructions)
#add node, children and label to nx graph
fcgnx.add_node(node_name, label = encoded_label)
fcgnx.add_edges_from([(node_name, child) for child in children])
return fcgnx
def get_node_name(method):
""" Build unique identifier for a method """
return (method.get_class_name(), method.get_name(), method.get_descriptor())
def color_instructions(instructions):
""" Node label based on coloring technique by Kruegel """
h = [0] * len(INSTRUCTION_CLASS_COLOR)
for i in instructions:
h[INSTRUCTION_SET_COLOR[i]] = 1
return np.array(h)
def get_classes_from_label(label):
idx = np.where(label==1)[0]
classes = [INSTRUCTION_CLASSES[i] for i in xrange(len(label)) if label[i]==1]
return classes
#####################################################################
# ICFG related functions #
#####################################################################
def build_icfg_nx(file):
""" Using NX and Androguard, build a directed graph NX object so that
node names are basic blocks names: (class name, method name, descriptor, bb)
"""
icgnx = nx.DiGraph()
print "Loading file {0}...".format(file)
a, d, dx = AnalyzeAPK(file)
methods = d.get_methods()
# set up progress bar
widgets = ['Building ICFG: ', Percentage(), ' ', Bar(marker='#',left='[',right=']'),
' ', ETA(), ' ']
pbar = ProgressBar(widgets=widgets, maxval=len(methods))
pbar.start()
progress = 0
for method in methods:
for bb in dx.get_method(method).basic_blocks.get():
children = []
label = get_bb_label(bb)
children = get_children(bb, dx)
icgnx.add_node(label)
icgnx.add_edges_from([(label, child) for child in children])
progress += 1
pbar.update(progress)
pbar.finish()
return icgnx
def get_bb_label(bb):
""" Return the descriptive name of a basic block
"""
return get_method_label(bb.method) + (bb.name,)
def get_method_label(method):
""" Return the descriptive name of a method
"""
return (method.get_class_name(), method.get_name(), method.get_descriptor())
def get_children(bb, dx):
""" Return the labels of the basic blocks that are children of the input
basic block in and out of its method
"""
return get_bb_intra_method_children(bb) + get_bb_extra_method_children(bb, dx)
def get_bb_intra_method_children(bb):
""" Return the labels of the basic blocks that are children of the input
basic block within a method
"""
child_labels = []
for call_in_bb in bb.get_next():
next_bb = call_in_bb[2]
child_labels.append(get_bb_label(next_bb))
return child_labels
def get_bb_extra_method_children(bb, dx):
""" Given a basic block, find the calls to external methods and
return the label of the first basic block in these methods
"""
call_labels = []
#iterate over calls from bb method to external methods
for cob in bb.method.XREFto.items:
remote_method = cob[0]
remote_method_analysis = dx.get_method(remote_method)
#iterate over the offsets of the call instructions and check
#if the offset is within the limits of the bb
for path in cob[1]:
if call_in_bb(bb, path.get_idx()):
try:
remote_bb = remote_method_analysis.basic_blocks.get().next()
call_labels.append(get_bb_label(remote_bb))
except StopIteration:
pass
return call_labels
def call_in_bb(bb, idx):
return bb.get_start() <= idx <= bb.get_end()
def list_XREF(file):
try:
a, d, dx = AnalyzeAPK(file)
except zipfile.BadZipfile:
#if file is not an APK, may be a dex object
d, dx = AnalyzeDex(file)
for method in d.get_methods():
print get_node_name(method)
print "XREFfrom:", [get_node_name(m[0]) for m in method.XREFfrom.items]
print "XREFto:", [get_node_name(m[0]) for m in method.XREFto.items]
##################################################################################
# Android API Calls related functions #
##################################################################################
def list_calls_apks_in_dir(dir, l):
""" Return a list with all API calls found in first l APK files in dir """
calls = []
for f in os.listdir(dir):
if f.lower().endswith("apk") and l>0:
calls.append(list_calls(os.path.join(dir, f)))
l -= 1
return calls
def list_calls(file):
""" Return a list with all API calls found in file (APK). Calls definition
are reformatted as in java declarations.
"""
apicalls = []
a, d, dx = AnalyzeAPK(file)
for method in d.get_methods():
for i in method.get_instructions():
if i.get_name()[:6] == "invoke":
call = i.get_output(0).split(',')[-1].strip() #get method desc
call = call[:call.index(')')+1] #remove return value
call = call.split('->') #split in class and method
method_class = get_type(call[0])
ins_method, params = call[1].split('(')
params = ','.join(parse_parameters(params.replace(')','')))
apicall = "{0}.{1}({2})".format(method_class, ins_method, params)
apicalls.append(apicall)
return apicalls
def list_calls_with_permissions(file, permission_map_file):
""" List all API calls which require a permissions in file (according the
mapping from Felt et al. CSS 2011 in APICalls.txt).
"""
df = DataFrame.from_csv(permission_map_file, sep='\t')
a, d, dx = AnalyzeAPK(file)
for method in d.get_methods():
for i in method.get_instructions():
if i.get_name()[:6] == "invoke":
#get method desc
call = i.get_output(0).split(',')[-1].strip()
#remove return value
call = call[:call.index(')')+1]
#split in class and method
call = call.split('->')
method_class = get_type(call[0])
ins_method, params = call[1].split('(')
params = ','.join(parse_parameters(params.replace(')','')))
apicall = "{0}.{1}({2})".format(method_class, ins_method, params)
try:
print df.ix[apicall]["Permission(s)"]
print apicall
except:
pass
def parse_parameters(p):
""" Parse and format parameters extracted from API
calls found in smali code
"""
types = ['S', 'B', 'D', 'F', 'I', 'J', 'Z', 'C']
parameters = []
buff = []
i = 0
while i < len(p):
if p[i] == '[':
buff.append(p[i])
if p[i] in types:
buff.append(p[i])
parameters.append(''.join(buff))
buff = []
if p[i] == 'L':
buff.append(p[i:][:p[i:].index(';')+1])
parameters.append(''.join(buff))
i += len(buff[0])-1
buff = []
i += 1
return [ get_type(param) for param in parameters ]
##################################################################################
# Main function to start encoding of APKs in directory #
##################################################################################
if __name__ == "__main__":
usage = "usage: %prog [<dataset dir>] "
parser = OptionParser(usage)
(options, args) = parser.parse_args()
if len(args) < 1:
parser.print_help()
sys.exit(1)
else:
sys.exit(process_apk_dir(args[0]))