diff --git a/loki/__init__.py b/loki/__init__.py index 09562ce4..0e737b08 100644 --- a/loki/__init__.py +++ b/loki/__init__.py @@ -1,9 +1,12 @@ # coding=utf-8 +""" +Loki fuzzing library +""" # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. -from .loki import Loki +from .loki import Loki # noqa __all__ = ("Loki") __author__ = "Tyson Smith" diff --git a/loki/__main__.py b/loki/__main__.py new file mode 100644 index 00000000..3e238eb5 --- /dev/null +++ b/loki/__main__.py @@ -0,0 +1,11 @@ +# coding=utf-8 +""" +Loki fuzzing library +""" +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +from .loki import main + + +main() diff --git a/loki/loki.py b/loki/loki.py old mode 100755 new mode 100644 index fb6a7800..eb2773bb --- a/loki/loki.py +++ b/loki/loki.py @@ -1,8 +1,8 @@ +# coding: utf-8 """ -loki.py +Loki fuzzing library """ -__author__ = "Tyson Smith" - +from __future__ import print_function import argparse import logging import logging.handlers @@ -13,19 +13,15 @@ import tempfile import time -log = logging.getLogger("Loki") -log.propagate = False -class Loki(object): - def __init__(self, aggression=0.0, verbose=False): - self.aggr = min(max(aggression, 0.0), 1.0) +__author__ = "Tyson Smith" +LOG = logging.getLogger("loki") + - # setup logging - ch = logging.StreamHandler() - ch.setFormatter(logging.Formatter("[%(levelname).1s] %(message)s")) - log.addHandler(ch) - log.setLevel(logging.INFO if verbose else logging.ERROR) +class Loki(object): + def __init__(self, aggression=0.0): + self.aggr = min(max(aggression, 0.0), 1.0) @staticmethod def _fuzz_data(in_data, byte_order=None): @@ -75,17 +71,16 @@ def _fuzz_data(in_data, byte_order=None): return struct.pack(pack_unit, out_data & mask) - - def _fuzz(self, fp): - fp.seek(0, os.SEEK_END) - length = fp.tell() + def _fuzz(self, tgt_fp): + tgt_fp.seek(0, os.SEEK_END) + length = tgt_fp.tell() if length < 1: raise RuntimeError("Zero length file cannot be fuzzed.") # minimum number of max passes should be 1 max_passes = max(int(round(length * self.aggr)), 1) fuzz_passes = random.randint(1, max_passes) - log.debug("%d of a possible %d fuzz passes will be performed", fuzz_passes, max_passes) + LOG.debug("%d of a possible %d fuzz passes will be performed", fuzz_passes, max_passes) max_bytes = min(length, 2) if length < 4 else 4 for _ in range(fuzz_passes): @@ -95,39 +90,37 @@ def _fuzz(self, fp): fuzz_size = 1 target = random.randint(0, length-fuzz_size) - fp.seek(target) - out_data = self._fuzz_data(fp.read(fuzz_size)) - fp.seek(target) - fp.write(out_data) - + tgt_fp.seek(target) + out_data = self._fuzz_data(tgt_fp.read(fuzz_size)) + tgt_fp.seek(target) + tgt_fp.write(out_data) def fuzz_data(self, data): assert isinstance(data, bytes) # open a temp file in memory for fuzzing - with tempfile.SpooledTemporaryFile(max_size=0x800000, mode="r+b") as fp: - fp.write(data) - self._fuzz(fp) - fp.seek(0) - return fp.read() - + with tempfile.SpooledTemporaryFile(max_size=0x800000, mode="r+b") as tmp_fp: + tmp_fp.write(data) + self._fuzz(tmp_fp) + tmp_fp.seek(0) + return tmp_fp.read() def fuzz_file(self, in_file, count, ext=None, out_dir=None): start_time = time.time() - log.info("Starting Loki @ %s", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + LOG.info("Starting Loki @ %s", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) # Analyze input test case - log.info("Target template is %r", os.path.abspath(in_file)) + LOG.info("Target template is %r", os.path.abspath(in_file)) try: length = os.path.getsize(in_file) except OSError: - log.error("%r does not exists!", in_file) + LOG.error("%r does not exists!", in_file) return False if length < 1: - log.error("Input must be at least 1 byte long") + LOG.error("Input must be at least 1 byte long") return False - log.info("Template size in bytes is %d", length) + LOG.info("Template size in bytes is %d", length) if ext is None: ext = os.path.splitext(in_file)[1] @@ -138,52 +131,50 @@ def fuzz_file(self, in_file, count, ext=None, out_dir=None): dir=".") elif not os.path.isdir(out_dir): os.mkdir(out_dir) - log.info("Output directory is %s", os.path.abspath(out_dir)) + LOG.info("Output directory is %s", os.path.abspath(out_dir)) - log.info("Generating %s fuzzed test cases...", count) + LOG.info("Generating %s fuzzed test cases...", count) for i in range(count): out_file = os.path.join(out_dir, "".join(("%06d_fuzzed" % i, ext))) shutil.copy(in_file, out_file) - with open(out_file, "r+b") as fp: - self._fuzz(fp) + with open(out_file, "r+b") as out_fp: + self._fuzz(out_fp) finish_time = time.time()-start_time - log.info("Total run time %gs", finish_time) + LOG.info("Total run time %gs", finish_time) if count > 0: - log.info("About %gs per file", finish_time/count) + LOG.info("About %gs per file", finish_time/count) return True - @staticmethod def splice_data(data_chunks): if len(data_chunks) not in (1, 2): - return None # one or two data blobs are required (one truncates) + return None # one or two data blobs are required (one truncates) blob_pass = 1 - with tempfile.SpooledTemporaryFile(max_size=0x800000, mode="r+b") as fp: + with tempfile.SpooledTemporaryFile(max_size=0x800000, mode="r+b") as tmp_fp: for chunk in data_chunks: length = len(chunk) if length < 1: - return None # not enough data chunks to work with + return None # not enough data chunks to work with target = random.randint(0, length-1) if blob_pass == 1: - fp.write(chunk[:target]) + tmp_fp.write(chunk[:target]) elif blob_pass == 2: - fp.write(chunk[target:]) + tmp_fp.write(chunk[target:]) blob_pass += 1 - fp.seek(0) - return fp.read() - + tmp_fp.seek(0) + return tmp_fp.read() -if __name__ == "__main__": +def main(): parser = argparse.ArgumentParser(description="Loki fuzzing library") parser.add_argument( "input", @@ -203,8 +194,14 @@ def splice_data(data_chunks): args = parser.parse_args() - f = Loki(aggression=args.aggression, verbose=(not args.quiet)) + # setup logging + hnd = logging.StreamHandler() + hnd.setFormatter(logging.Formatter("[%(levelname).1s] %(message)s")) + LOG.addHandler(hnd) + LOG.setLevel(logging.INFO if not args.quiet else logging.ERROR) + + loki = Loki(aggression=args.aggression) try: - f.fuzz_file(args.input, args.count, out_dir=args.output) + loki.fuzz_file(args.input, args.count, out_dir=args.output) except KeyboardInterrupt: print("Ctrl+C detected.") diff --git a/loki/test_loki.py b/loki/test_loki.py index 174dd086..b20e25bd 100644 --- a/loki/test_loki.py +++ b/loki/test_loki.py @@ -1,175 +1,159 @@ +# coding: utf-8 # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. - import os import random import struct -import shutil -import sys -import tempfile -import unittest +import pytest from .loki import Loki -class TestCase(unittest.TestCase): - - if sys.version_info.major == 2: - - def assertRegex(self, *args, **kwds): - return self.assertRegexpMatches(*args, **kwds) - - def assertRaisesRegex(self, *args, **kwds): - return self.assertRaisesRegexp(*args, **kwds) - - -class LokiTests(TestCase): - - def setUp(self): - t_fd, self.tmpfn = tempfile.mkstemp(prefix="loki_") - os.close(t_fd) - self.tmpdir = tempfile.mkdtemp(prefix="loki_") - - - def tearDown(self): - if os.path.isfile(self.tmpfn): - os.unlink(self.tmpfn) - if os.path.isdir(self.tmpdir): - shutil.rmtree(self.tmpdir) - - - def test_01(self): - "test a missing file" - fuzzer = Loki(aggression=0.1, verbose=False) - fuzzer.fuzz_file("nofile.test", 1, out_dir=self.tmpdir) - self.assertFalse(os.listdir(self.tmpdir)) - - - def test_02(self): - "test an empty file" - fuzzer = Loki(aggression=0.1, verbose=False) - fuzzer.fuzz_file(self.tmpfn, 1, out_dir=self.tmpdir) - self.assertFalse(os.listdir(self.tmpdir)) - - - def test_03(self): - "test a single byte file" - in_data = b"A" - with open(self.tmpfn, "wb") as out_fp: - out_fp.write(in_data) - - fuzzer = Loki(aggression=0.1, verbose=False) - for _ in range(100): - fuzzer.fuzz_file(self.tmpfn, 1, out_dir=self.tmpdir) - out_files = os.listdir(self.tmpdir) - self.assertEqual(len(out_files), 1) - with open(os.path.join(self.tmpdir, out_files[0]), "rb") as out_fp: - out_data = out_fp.read() - self.assertEqual(len(out_data), 1) - if out_data != in_data: - break - self.assertNotEqual(out_data, in_data) - - - def test_04(self): - "test a two byte file" - in_data = b"AB" - with open(self.tmpfn, "wb") as out_fp: - out_fp.write(in_data) - - fuzzer = Loki(aggression=0.1, verbose=False) - for _ in range(100): - fuzzer.fuzz_file(self.tmpfn, 1, out_dir=self.tmpdir) - out_files = os.listdir(self.tmpdir) - self.assertEqual(len(out_files), 1) - with open(os.path.join(self.tmpdir, out_files[0]), "rb") as out_fp: - out_data = out_fp.read() - self.assertEqual(len(out_data), 2) - if out_data != in_data: - break - self.assertNotEqual(out_data, in_data) - - - def test_05(self): - "test a multi byte file" - in_size = 100 - in_byte = b"A" - in_data = in_byte * in_size - fuzz_found = False - with open(self.tmpfn, "wb") as out_fp: - out_fp.write(in_data) - - fuzzer = Loki(aggression=0.01, verbose=False) - for _ in range(100): - fuzzer.fuzz_file(self.tmpfn, 1, out_dir=self.tmpdir) - out_files = os.listdir(self.tmpdir) - self.assertEqual(len(out_files), 1) - with open(os.path.join(self.tmpdir, out_files[0]), "rb") as out_fp: - out_fp.seek(0, os.SEEK_END) - self.assertEqual(out_fp.tell(), in_size) - out_fp.seek(0) - for out_byte in out_fp: - if out_byte != in_byte: - fuzz_found = True - break - if fuzz_found: - break - self.assertTrue(fuzz_found) - - - def test_06(self): - "test fuzz_data()" - in_data = b"This is test DATA!" - in_size = len(in_data) - - fuzz_found = False - fuzzer = Loki(aggression=0.1, verbose=False) - for _ in range(100): - out_data = fuzzer.fuzz_data(in_data) - self.assertEqual(len(out_data), in_size) - if in_data not in out_data: - fuzz_found = True - break - self.assertTrue(fuzz_found) - - - def test_07(self): - "test invalid data sizes" - with self.assertRaisesRegex(RuntimeError, r"Unsupported data size:"): - Loki._fuzz_data(b"") # pylint: disable=protected-access - - with self.assertRaisesRegex(RuntimeError, r"Unsupported data size:"): - Loki._fuzz_data(b"123") # pylint: disable=protected-access - - with self.assertRaisesRegex(RuntimeError, r"Unsupported data size:"): - Loki._fuzz_data(b"12345") # pylint: disable=protected-access - - - def test_08(self): - "test endian support" - Loki._fuzz_data(b"1", ">") # pylint: disable=protected-access - Loki._fuzz_data(b"1", "<") # pylint: disable=protected-access - with self.assertRaisesRegex(RuntimeError, r"Unsupported byte order"): - Loki._fuzz_data(b"1", "BAD") # pylint: disable=protected-access - - -class LokiStressTests(TestCase): - def test_01(self): - "test with single byte" - for _ in range(1000): - in_data = struct.pack("B", random.getrandbits(8)) - self.assertEqual(len(Loki._fuzz_data(in_data)), 1) # pylint: disable=protected-access - - - def test_02(self): - "test with two bytes" - in_data = b"\xff\xff" - for _ in range(1000): - self.assertEqual(len(Loki._fuzz_data(in_data)), 2) # pylint: disable=protected-access - - - def test_03(self): - "test with four bytes" - in_data = b"TEST" - for _ in range(1000): - self.assertEqual(len(Loki._fuzz_data(in_data)), 4) # pylint: disable=protected-access + +def test_loki_01(tmp_path): + """test a missing file""" + fuzzer = Loki(aggression=0.1) + fuzzer.fuzz_file("nofile.test", 1, out_dir=str(tmp_path)) + assert not list(tmp_path.iterdir()) + + +def test_loki_02(tmp_path): + """test an empty file""" + tmp_fn = tmp_path / "input" + tmp_fn.touch() + + out_path = tmp_path / "out" + out_path.mkdir() + + fuzzer = Loki(aggression=0.1) + fuzzer.fuzz_file(str(tmp_fn), 1, out_dir=str(out_path)) + assert not list(out_path.iterdir()) + + +def test_loki_03(tmp_path): + """test a single byte file""" + in_data = b"A" + tmp_fn = tmp_path / "input" + tmp_fn.write_bytes(in_data) + + out_path = tmp_path / "out" + out_path.mkdir() + + fuzzer = Loki(aggression=0.1) + for _ in range(100): + fuzzer.fuzz_file(str(tmp_fn), 1, out_dir=str(out_path)) + out_files = list(out_path.iterdir()) + assert len(out_files) == 1 + out_data = out_files[0].read_bytes() + assert len(out_data) == 1 + if out_data != in_data: + break + assert out_data != in_data + + +def test_loki_04(tmp_path): + """test a two byte file""" + in_data = b"AB" + tmp_fn = tmp_path / "input" + tmp_fn.write_bytes(in_data) + + out_path = tmp_path / "out" + out_path.mkdir() + + fuzzer = Loki(aggression=0.1) + for _ in range(100): + fuzzer.fuzz_file(str(tmp_fn), 1, out_dir=str(out_path)) + out_files = list(out_path.iterdir()) + assert len(out_files) == 1 + out_data = out_files[0].read_bytes() + assert len(out_data) == 2 + if out_data != in_data: + break + assert out_data != in_data + + +def test_loki_05(tmp_path): + """test a multi byte file""" + in_size = 100 + in_byte = b"A" + in_data = in_byte * in_size + fuzz_found = False + tmp_fn = tmp_path / "input" + tmp_fn.write_bytes(in_data) + + out_path = tmp_path / "out" + out_path.mkdir() + + fuzzer = Loki(aggression=0.01) + for _ in range(100): + fuzzer.fuzz_file(str(tmp_fn), 1, out_dir=str(out_path)) + out_files = list(out_path.iterdir()) + assert len(out_files) == 1 + with out_files[0].open("rb") as out_fp: + out_fp.seek(0, os.SEEK_END) + assert out_fp.tell() == in_size + out_fp.seek(0) + for out_byte in out_fp: + if out_byte != in_byte: + fuzz_found = True + break + if fuzz_found: + break + assert fuzz_found + + +def test_loki_06(): + """test fuzz_data()""" + in_data = b"This is test DATA!" + in_size = len(in_data) + + fuzz_found = False + fuzzer = Loki(aggression=0.1) + for _ in range(100): + out_data = fuzzer.fuzz_data(in_data) + assert len(out_data) == in_size + if in_data not in out_data: + fuzz_found = True + break + assert fuzz_found + + +def test_loki_07(): + """test invalid data sizes""" + with pytest.raises(RuntimeError, match=r"Unsupported data size:"): + Loki._fuzz_data(b"") # pylint: disable=protected-access + + with pytest.raises(RuntimeError, match=r"Unsupported data size:"): + Loki._fuzz_data(b"123") # pylint: disable=protected-access + + with pytest.raises(RuntimeError, match=r"Unsupported data size:"): + Loki._fuzz_data(b"12345") # pylint: disable=protected-access + + +def test_loki_08(): + """test endian support""" + Loki._fuzz_data(b"1", ">") # pylint: disable=protected-access + Loki._fuzz_data(b"1", "<") # pylint: disable=protected-access + with pytest.raises(RuntimeError, match=r"Unsupported byte order"): + Loki._fuzz_data(b"1", "BAD") # pylint: disable=protected-access + + +def test_loki_stress_01(): + """test with single byte""" + for _ in range(1000): + in_data = struct.pack("B", random.getrandbits(8)) + assert len(Loki._fuzz_data(in_data)) == 1 # pylint: disable=protected-access + + +def test_loki_stress_02(): + """test with two bytes""" + in_data = b"\xff\xff" + for _ in range(1000): + assert len(Loki._fuzz_data(in_data)) == 2 # pylint: disable=protected-access + + +def test_loki_stress_03(): + """test with four bytes""" + in_data = b"TEST" + for _ in range(1000): + assert len(Loki._fuzz_data(in_data)) == 4 # pylint: disable=protected-access