Skip to content

Commit

Permalink
[loki] Port all tests to pytest style, and fix flake8/pylint issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwartzentruber committed May 1, 2019
1 parent d5a00d0 commit 3c24776
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 219 deletions.
5 changes: 4 additions & 1 deletion loki/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# coding=utf-8
"""
Loki fuzzing library
"""
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from .loki import Loki
from .loki import Loki # noqa

__all__ = ("Loki")
__author__ = "Tyson Smith"
Expand Down
11 changes: 11 additions & 0 deletions loki/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# coding=utf-8
"""
Loki fuzzing library
"""
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from .loki import main


main()
101 changes: 49 additions & 52 deletions loki/loki.py
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# coding: utf-8
"""
loki.py
Loki fuzzing library
"""
__author__ = "Tyson Smith"

from __future__ import print_function
import argparse
import logging
import logging.handlers
Expand All @@ -13,19 +13,15 @@
import tempfile
import time

log = logging.getLogger("Loki")
log.propagate = False

class Loki(object):
def __init__(self, aggression=0.0, verbose=False):
self.aggr = min(max(aggression, 0.0), 1.0)
__author__ = "Tyson Smith"
LOG = logging.getLogger("loki")


# setup logging
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter("[%(levelname).1s] %(message)s"))
log.addHandler(ch)
log.setLevel(logging.INFO if verbose else logging.ERROR)
class Loki(object):

def __init__(self, aggression=0.0):
self.aggr = min(max(aggression, 0.0), 1.0)

@staticmethod
def _fuzz_data(in_data, byte_order=None):
Expand Down Expand Up @@ -75,17 +71,16 @@ def _fuzz_data(in_data, byte_order=None):

return struct.pack(pack_unit, out_data & mask)


def _fuzz(self, fp):
fp.seek(0, os.SEEK_END)
length = fp.tell()
def _fuzz(self, tgt_fp):
tgt_fp.seek(0, os.SEEK_END)
length = tgt_fp.tell()
if length < 1:
raise RuntimeError("Zero length file cannot be fuzzed.")

# minimum number of max passes should be 1
max_passes = max(int(round(length * self.aggr)), 1)
fuzz_passes = random.randint(1, max_passes)
log.debug("%d of a possible %d fuzz passes will be performed", fuzz_passes, max_passes)
LOG.debug("%d of a possible %d fuzz passes will be performed", fuzz_passes, max_passes)

max_bytes = min(length, 2) if length < 4 else 4
for _ in range(fuzz_passes):
Expand All @@ -95,39 +90,37 @@ def _fuzz(self, fp):
fuzz_size = 1
target = random.randint(0, length-fuzz_size)

fp.seek(target)
out_data = self._fuzz_data(fp.read(fuzz_size))
fp.seek(target)
fp.write(out_data)

tgt_fp.seek(target)
out_data = self._fuzz_data(tgt_fp.read(fuzz_size))
tgt_fp.seek(target)
tgt_fp.write(out_data)

def fuzz_data(self, data):
assert isinstance(data, bytes)
# open a temp file in memory for fuzzing
with tempfile.SpooledTemporaryFile(max_size=0x800000, mode="r+b") as fp:
fp.write(data)
self._fuzz(fp)
fp.seek(0)
return fp.read()

with tempfile.SpooledTemporaryFile(max_size=0x800000, mode="r+b") as tmp_fp:
tmp_fp.write(data)
self._fuzz(tmp_fp)
tmp_fp.seek(0)
return tmp_fp.read()

def fuzz_file(self, in_file, count, ext=None, out_dir=None):
start_time = time.time()
log.info("Starting Loki @ %s", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
LOG.info("Starting Loki @ %s", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

# Analyze input test case
log.info("Target template is %r", os.path.abspath(in_file))
LOG.info("Target template is %r", os.path.abspath(in_file))
try:
length = os.path.getsize(in_file)
except OSError:
log.error("%r does not exists!", in_file)
LOG.error("%r does not exists!", in_file)
return False

if length < 1:
log.error("Input must be at least 1 byte long")
LOG.error("Input must be at least 1 byte long")
return False

log.info("Template size in bytes is %d", length)
LOG.info("Template size in bytes is %d", length)
if ext is None:
ext = os.path.splitext(in_file)[1]

Expand All @@ -138,52 +131,50 @@ def fuzz_file(self, in_file, count, ext=None, out_dir=None):
dir=".")
elif not os.path.isdir(out_dir):
os.mkdir(out_dir)
log.info("Output directory is %s", os.path.abspath(out_dir))
LOG.info("Output directory is %s", os.path.abspath(out_dir))

log.info("Generating %s fuzzed test cases...", count)
LOG.info("Generating %s fuzzed test cases...", count)
for i in range(count):
out_file = os.path.join(out_dir, "".join(("%06d_fuzzed" % i, ext)))
shutil.copy(in_file, out_file)

with open(out_file, "r+b") as fp:
self._fuzz(fp)
with open(out_file, "r+b") as out_fp:
self._fuzz(out_fp)

finish_time = time.time()-start_time
log.info("Total run time %gs", finish_time)
LOG.info("Total run time %gs", finish_time)
if count > 0:
log.info("About %gs per file", finish_time/count)
LOG.info("About %gs per file", finish_time/count)

return True


@staticmethod
def splice_data(data_chunks):
if len(data_chunks) not in (1, 2):
return None # one or two data blobs are required (one truncates)
return None # one or two data blobs are required (one truncates)

blob_pass = 1
with tempfile.SpooledTemporaryFile(max_size=0x800000, mode="r+b") as fp:
with tempfile.SpooledTemporaryFile(max_size=0x800000, mode="r+b") as tmp_fp:
for chunk in data_chunks:
length = len(chunk)

if length < 1:
return None # not enough data chunks to work with
return None # not enough data chunks to work with

target = random.randint(0, length-1)

if blob_pass == 1:
fp.write(chunk[:target])
tmp_fp.write(chunk[:target])
elif blob_pass == 2:
fp.write(chunk[target:])
tmp_fp.write(chunk[target:])

blob_pass += 1

fp.seek(0)
return fp.read()

tmp_fp.seek(0)
return tmp_fp.read()


if __name__ == "__main__":
def main():
parser = argparse.ArgumentParser(description="Loki fuzzing library")
parser.add_argument(
"input",
Expand All @@ -203,8 +194,14 @@ def splice_data(data_chunks):

args = parser.parse_args()

f = Loki(aggression=args.aggression, verbose=(not args.quiet))
# setup logging
hnd = logging.StreamHandler()
hnd.setFormatter(logging.Formatter("[%(levelname).1s] %(message)s"))
LOG.addHandler(hnd)
LOG.setLevel(logging.INFO if not args.quiet else logging.ERROR)

loki = Loki(aggression=args.aggression)
try:
f.fuzz_file(args.input, args.count, out_dir=args.output)
loki.fuzz_file(args.input, args.count, out_dir=args.output)
except KeyboardInterrupt:
print("Ctrl+C detected.")
Loading

0 comments on commit 3c24776

Please sign in to comment.