#!/usr/bin/env python # Tool for running fuzz tests # # Copyright (C) 2014 Maria Kustova # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import sys import os import signal import subprocess import random import shutil from itertools import count import time import getopt import StringIO import resource try: import json except ImportError: try: import simplejson as json except ImportError: print >>sys.stderr, \ "Warning: Module for JSON processing is not found.\n" \ "'--config' and '--command' options are not supported." # Backing file sizes in MB MAX_BACKING_FILE_SIZE = 10 MIN_BACKING_FILE_SIZE = 1 def multilog(msg, *output): """ Write an object to all of specified file descriptors.""" for fd in output: fd.write(msg) fd.flush() def str_signal(sig): """ Convert a numeric value of a system signal to the string one defined by the current operational system. """ for k, v in signal.__dict__.items(): if v == sig: return k def run_app(fd, q_args): """Start an application with specified arguments and return its exit code or kill signal depending on the result of execution. """ class Alarm(Exception): """Exception for signal.alarm events.""" pass def handler(*args): """Notify that an alarm event occurred.""" raise Alarm signal.signal(signal.SIGALRM, handler) signal.alarm(600) term_signal = signal.SIGKILL devnull = open('/dev/null', 'r+') process = subprocess.Popen(q_args, stdin=devnull, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: out, err = process.communicate() signal.alarm(0) fd.write(out) fd.write(err) fd.flush() return process.returncode except Alarm: os.kill(process.pid, term_signal) fd.write('The command was terminated by timeout.\n') fd.flush() return -term_signal class TestException(Exception): """Exception for errors risen by TestEnv objects.""" pass class TestEnv(object): """Test object. The class sets up test environment, generates backing and test images and executes application under tests with specified arguments and a test image provided. All logs are collected. The summary log will contain short descriptions and statuses of tests in a run. The test log will include application (e.g. 'qemu-img') logs besides info sent to the summary log. """ def __init__(self, test_id, seed, work_dir, run_log, cleanup=True, log_all=False): """Set test environment in a specified work directory. Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and 'QEMU_IO' environment variables. """ if seed is not None: self.seed = seed else: self.seed = str(random.randint(0, sys.maxint)) random.seed(self.seed) self.init_path = os.getcwd() self.work_dir = work_dir self.current_dir = os.path.join(work_dir, 'test-' + test_id) self.qemu_img = \ os.environ.get('QEMU_IMG', 'qemu-img').strip().split(' ') self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ') self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'], ['qemu-img', 'info', '-f', 'qcow2', '$test_img'], ['qemu-io', '$test_img', '-c', 'read $off $len'], ['qemu-io', '$test_img', '-c', 'write $off $len'], ['qemu-io', '$test_img', '-c', 'aio_read $off $len'], ['qemu-io', '$test_img', '-c', 'aio_write $off $len'], ['qemu-io', '$test_img', '-c', 'flush'], ['qemu-io', '$test_img', '-c', 'discard $off $len'], ['qemu-io', '$test_img', '-c', 'truncate $off']] for fmt in ['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']: self.commands.append( ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt, '$test_img', 'converted_image.' + fmt]) try: os.makedirs(self.current_dir) except OSError as e: print >>sys.stderr, \ "Error: The working directory '%s' cannot be used. Reason: %s"\ % (self.work_dir, e[1]) raise TestException self.log = open(os.path.join(self.current_dir, "test.log"), "w") self.parent_log = open(run_log, "a") self.failed = False self.cleanup = cleanup self.log_all = log_all def _create_backing_file(self): """Create a backing file in the current directory. Return a tuple of a backing file name and format. Format of a backing file is randomly chosen from all formats supported by 'qemu-img create'. """ # All formats supported by the 'qemu-img create' command. backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'qcow2', 'file', 'qed', 'vpc']) backing_file_name = 'backing_img.' + backing_file_fmt backing_file_size = random.randint(MIN_BACKING_FILE_SIZE, MAX_BACKING_FILE_SIZE) * (1 << 20) cmd = self.qemu_img + ['create', '-f', backing_file_fmt, backing_file_name, str(backing_file_size)] temp_log = StringIO.StringIO() retcode = run_app(temp_log, cmd) if retcode == 0: temp_log.close() return (backing_file_name, backing_file_fmt) else: multilog("Warning: The %s backing file was not created.\n\n" % backing_file_fmt, sys.stderr, self.log, self.parent_log) self.log.write("Log for the failure:\n" + temp_log.getvalue() + '\n\n') temp_log.close() return (None, None) def execute(self, input_commands=None, fuzz_config=None): """ Execute a test. The method creates backing and test images, runs test app and analyzes its exit status. If the application was killed by a signal, the test is marked as failed. """ if input_commands is None: commands = self.commands else: commands = input_commands os.chdir(self.current_dir) backing_file_name, backing_file_fmt = self._create_backing_file() img_size = image_generator.create_image( 'test.img', backing_file_name, backing_file_fmt, fuzz_config) for item in commands: shutil.copy('test.img', 'copy.img') # 'off' and 'len' are multiple of the sector size sector_size = 512 start = random.randrange(0, img_size + 1, sector_size) end = random.randrange(start, img_size + 1, sector_size) if item[0] == 'qemu-img': current_cmd = list(self.qemu_img) elif item[0] == 'qemu-io': current_cmd = list(self.qemu_io) else: multilog("Warning: test command '%s' is not defined.\n" % item[0], sys.stderr, self.log, self.parent_log) continue # Replace all placeholders with their real values for v in item[1:]: c = (v .replace('$test_img', 'copy.img') .replace('$off', str(start)) .replace('$len', str(end - start))) current_cmd.append(c) # Log string with the test header test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \ "Backing file: %s\n" \ % (self.seed, " ".join(current_cmd), self.current_dir, backing_file_name) temp_log = StringIO.StringIO() try: retcode = run_app(temp_log, current_cmd) except OSError as e: multilog("%sError: Start of '%s' failed. Reason: %s\n\n" % (test_summary, os.path.basename(current_cmd[0]), e[1]), sys.stderr, self.log, self.parent_log) raise TestException if retcode < 0: self.log.write(temp_log.getvalue()) multilog("%sFAIL: Test terminated by signal %s\n\n" % (test_summary, str_signal(-retcode)), sys.stderr, self.log, self.parent_log) self.failed = True else: if self.log_all: self.log.write(temp_log.getvalue()) multilog("%sPASS: Application exited with the code " \ "'%d'\n\n" % (test_summary, retcode), sys.stdout, self.log, self.parent_log) temp_log.close() os.remove('copy.img') def finish(self): """Restore the test environment after a test execution.""" self.log.close() self.parent_log.close() os.chdir(self.init_path) if self.cleanup and not self.failed: shutil.rmtree(self.current_dir) if __name__ == '__main__': def usage(): print """ Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR Set up test environment in TEST_DIR and run a test in it. A module for test image generation should be specified via IMG_GENERATOR. Example: runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2 Optional arguments: -h, --help display this help and exit -d, --duration=NUMBER finish tests after NUMBER of seconds -c, --command=JSON run tests for all commands specified in the JSON array -s, --seed=STRING seed for a test image generation, by default will be generated randomly --config=JSON take fuzzer configuration from the JSON array -k, --keep_passed don't remove folders of passed tests -v, --verbose log information about passed tests JSON: '--command' accepts a JSON array of commands. Each command presents an application under test with all its parameters as a list of strings, e.g. ["qemu-io", "$test_img", "-c", "write $off $len"]. Supported application aliases: 'qemu-img' and 'qemu-io'. Supported argument aliases: $test_img for the fuzzed image, $off for an offset, $len for length. Values for $off and $len will be generated based on the virtual disk size of the fuzzed image. Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and 'QEMU_IO' environment variables. '--config' accepts a JSON array of fields to be fuzzed, e.g. '[["header"], ["header", "version"]]'. Each of the list elements can consist of a complex image element only as ["header"] or ["feature_name_table"] or an exact field as ["header", "version"]. In the first case random portion of the element fields will be fuzzed, in the second one the specified field will be fuzzed always. If '--config' argument is specified, fields not listed in the configuration array will not be fuzzed. """ def run_test(test_id, seed, work_dir, run_log, cleanup, log_all, command, fuzz_config): """Setup environment for one test and execute this test.""" try: test = TestEnv(test_id, seed, work_dir, run_log, cleanup, log_all) except TestException: sys.exit(1) # Python 2.4 doesn't support 'finally' and 'except' in the same 'try' # block try: try: test.execute(command, fuzz_config) except TestException: sys.exit(1) finally: test.finish() def should_continue(duration, start_time): """Return True if a new test can be started and False otherwise.""" current_time = int(time.time()) return (duration is None) or (current_time - start_time < duration) try: opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kvd:', ['command=', 'help', 'seed=', 'config=', 'keep_passed', 'verbose', 'duration=']) except getopt.error as e: print >>sys.stderr, \ "Error: %s\n\nTry 'runner.py --help' for more information" % e sys.exit(1) command = None cleanup = True log_all = False seed = None config = None duration = None for opt, arg in opts: if opt in ('-h', '--help'): usage() sys.exit() elif opt in ('-c', '--command'): try: command = json.loads(arg) except (TypeError, ValueError, NameError) as e: print >>sys.stderr, \ "Error: JSON array of test commands cannot be loaded.\n" \ "Reason: %s" % e sys.exit(1) elif opt in ('-k', '--keep_passed'): cleanup = False elif opt in ('-v', '--verbose'): log_all = True elif opt in ('-s', '--seed'): seed = arg elif opt in ('-d', '--duration'): duration = int(arg) elif opt == '--config': try: config = json.loads(arg) except (TypeError, ValueError, NameError) as e: print >>sys.stderr, \ "Error: JSON array with the fuzzer configuration cannot" \ " be loaded\nReason: %s" % e sys.exit(1) if not len(args) == 2: print >>sys.stderr, \ "Expected two parameters\nTry 'runner.py --help'" \ " for more information." sys.exit(1) work_dir = os.path.realpath(args[0]) # run_log is created in 'main', because multiple tests are expected to # log in it run_log = os.path.join(work_dir, 'run.log') # Add the path to the image generator module to sys.path sys.path.append(os.path.realpath(os.path.dirname(args[1]))) # Remove a script extension from image generator module if any generator_name = os.path.splitext(os.path.basename(args[1]))[0] try: image_generator = __import__(generator_name) except ImportError as e: print >>sys.stderr, \ "Error: The image generator '%s' cannot be imported.\n" \ "Reason: %s" % (generator_name, e) sys.exit(1) # Enable core dumps resource.setrlimit(resource.RLIMIT_CORE, (-1, -1)) # If a seed is specified, only one test will be executed. # Otherwise runner will terminate after a keyboard interruption start_time = int(time.time()) test_id = count(1) while should_continue(duration, start_time): try: run_test(str(test_id.next()), seed, work_dir, run_log, cleanup, log_all, command, config) except (KeyboardInterrupt, SystemExit): sys.exit(1) if seed is not None: break