diff options
Diffstat (limited to 'core/tests/overlaytests/testrunner.py')
| -rwxr-xr-x | core/tests/overlaytests/testrunner.py | 679 |
1 files changed, 679 insertions, 0 deletions
diff --git a/core/tests/overlaytests/testrunner.py b/core/tests/overlaytests/testrunner.py new file mode 100755 index 0000000..4f94373 --- /dev/null +++ b/core/tests/overlaytests/testrunner.py @@ -0,0 +1,679 @@ +#!/usr/bin/python +import hashlib +import optparse +import os +import re +import shlex +import subprocess +import sys +import threading +import time + +TASK_COMPILATION = 'compile' +TASK_DISABLE_OVERLAYS = 'disable overlays' +TASK_ENABLE_MULTIPLE_OVERLAYS = 'enable multiple overlays' +TASK_ENABLE_SINGLE_OVERLAY = 'enable single overlay' +TASK_FILE_EXISTS_TEST = 'test (file exists)' +TASK_GREP_IDMAP_TEST = 'test (grep idmap)' +TASK_MD5_TEST = 'test (md5)' +TASK_IDMAP_PATH = 'idmap --path' +TASK_IDMAP_SCAN = 'idmap --scan' +TASK_INSTRUMENTATION = 'instrumentation' +TASK_INSTRUMENTATION_TEST = 'test (instrumentation)' +TASK_MKDIR = 'mkdir' +TASK_PUSH = 'push' +TASK_ROOT = 'root' +TASK_REMOUNT = 'remount' +TASK_RM = 'rm' +TASK_SETUP_IDMAP_PATH = 'setup idmap --path' +TASK_SETUP_IDMAP_SCAN = 'setup idmap --scan' +TASK_START = 'start' +TASK_STOP = 'stop' + +adb = 'adb' + +def _adb_shell(cmd): + argv = shlex.split(adb + " shell '" + cmd + "; echo $?'") + proc = subprocess.Popen(argv, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate() + (stdout, stderr) = (stdout.replace('\r', ''), stderr.replace('\r', '')) + tmp = stdout.rsplit('\n', 2) + if len(tmp) == 2: + stdout == '' + returncode = int(tmp[0]) + else: + stdout = tmp[0] + '\n' + returncode = int(tmp[1]) + return returncode, stdout, stderr + +class VerbosePrinter: + class Ticker(threading.Thread): + def _print(self): + s = '\r' + self.text + '[' + '.' * self.i + ' ' * (4 - self.i) + ']' + sys.stdout.write(s) + sys.stdout.flush() + self.i = (self.i + 1) % 5 + + def __init__(self, cond_var, text): + threading.Thread.__init__(self) + self.text = text + self.setDaemon(True) + self.cond_var = cond_var + self.running = False + self.i = 0 + self._print() + self.running = True + + def run(self): + self.cond_var.acquire() + while True: + self.cond_var.wait(0.25) + running = self.running + if not running: + break + self._print() + self.cond_var.release() + + def stop(self): + self.cond_var.acquire() + self.running = False + self.cond_var.notify_all() + self.cond_var.release() + + def _start_ticker(self): + self.ticker = VerbosePrinter.Ticker(self.cond_var, self.text) + self.ticker.start() + + def _stop_ticker(self): + self.ticker.stop() + self.ticker.join() + self.ticker = None + + def _format_begin(self, type, name): + N = self.width - len(type) - len(' [ ] ') + fmt = '%%s %%-%ds ' % N + return fmt % (type, name) + + def __init__(self, use_color): + self.cond_var = threading.Condition() + self.ticker = None + if use_color: + self.color_RED = '\033[1;31m' + self.color_red = '\033[0;31m' + self.color_reset = '\033[0;37m' + else: + self.color_RED = '' + self.color_red = '' + self.color_reset = '' + + argv = shlex.split('stty size') # get terminal width + proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate() + if proc.returncode == 0: + (h, w) = stdout.split() + self.width = int(w) + else: + self.width = 72 # conservative guesstimate + + def begin(self, type, name): + self.text = self._format_begin(type, name) + sys.stdout.write(self.text + '[ ]') + sys.stdout.flush() + self._start_ticker() + + def end_pass(self, type, name): + self._stop_ticker() + sys.stdout.write('\r' + self.text + '[ OK ]\n') + sys.stdout.flush() + + def end_fail(self, type, name, msg): + self._stop_ticker() + sys.stdout.write('\r' + self.color_RED + self.text + '[FAIL]\n') + sys.stdout.write(self.color_red) + sys.stdout.write(msg) + sys.stdout.write(self.color_reset) + sys.stdout.flush() + +class QuietPrinter: + def begin(self, type, name): + pass + + def end_pass(self, type, name): + sys.stdout.write('PASS ' + type + ' ' + name + '\n') + sys.stdout.flush() + + def end_fail(self, type, name, msg): + sys.stdout.write('FAIL ' + type + ' ' + name + '\n') + sys.stdout.flush() + +class CompilationTask: + def __init__(self, makefile): + self.makefile = makefile + + def get_type(self): + return TASK_COMPILATION + + def get_name(self): + return self.makefile + + def execute(self): + os.putenv('ONE_SHOT_MAKEFILE', os.getcwd() + "/" + self.makefile) + argv = shlex.split('make -C "../../../../../" files') + proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate() + return proc.returncode, stdout, stderr + +class InstrumentationTask: + def __init__(self, instrumentation_class): + self.instrumentation_class = instrumentation_class + + def get_type(self): + return TASK_INSTRUMENTATION + + def get_name(self): + return self.instrumentation_class + + def execute(self): + return _adb_shell('am instrument -r -w -e class %s com.android.overlaytest/android.test.InstrumentationTestRunner' % self.instrumentation_class) + +class PushTask: + def __init__(self, src, dest): + self.src = src + self.dest = dest + + def get_type(self): + return TASK_PUSH + + def get_name(self): + return "%s -> %s" % (self.src, self.dest) + + def execute(self): + src = os.getenv('OUT') + "/" + self.src + argv = shlex.split(adb + ' push %s %s' % (src, self.dest)) + proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate() + return proc.returncode, stdout, stderr + +class MkdirTask: + def __init__(self, path): + self.path = path + + def get_type(self): + return TASK_MKDIR + + def get_name(self): + return self.path + + def execute(self): + return _adb_shell('mkdir -p %s' % self.path) + +class RmTask: + def __init__(self, path): + self.path = path + + def get_type(self): + return TASK_RM + + def get_name(self): + return self.path + + def execute(self): + returncode, stdout, stderr = _adb_shell('ls %s' % self.path) + if returncode != 0 and stdout.endswith(': No such file or directory\n'): + return 0, "", "" + return _adb_shell('rm -r %s' % self.path) + +class IdmapPathTask: + def __init__(self, path_target_apk, path_overlay_apk, path_idmap): + self.path_target_apk = path_target_apk + self.path_overlay_apk = path_overlay_apk + self.path_idmap = path_idmap + + def get_type(self): + return TASK_IDMAP_PATH + + def get_name(self): + return self.path_idmap + + def execute(self): + return _adb_shell('su system idmap --path "%s" "%s" "%s"' % (self.path_target_apk, self.path_overlay_apk, self.path_idmap)) + +class IdmapScanTask: + def __init__(self, overlay_dir, target_pkg_name, target_pkg, idmap_dir, symlink_dir): + self.overlay_dir = overlay_dir + self.target_pkg_name = target_pkg_name + self.target_pkg = target_pkg + self.idmap_dir = idmap_dir + self.symlink_dir = symlink_dir + + def get_type(self): + return TASK_IDMAP_SCAN + + def get_name(self): + return self.target_pkg_name + + def execute(self): + return _adb_shell('su system idmap --scan "%s" "%s" "%s" "%s"' % (self.overlay_dir, self.target_pkg_name, self.target_pkg, self.idmap_dir)) + +class FileExistsTest: + def __init__(self, path): + self.path = path + + def get_type(self): + return TASK_FILE_EXISTS_TEST + + def get_name(self): + return self.path + + def execute(self): + return _adb_shell('ls %s' % self.path) + +class GrepIdmapTest: + def __init__(self, path_idmap, pattern, expected_n): + self.path_idmap = path_idmap + self.pattern = pattern + self.expected_n = expected_n + + def get_type(self): + return TASK_GREP_IDMAP_TEST + + def get_name(self): + return self.pattern + + def execute(self): + returncode, stdout, stderr = _adb_shell('idmap --inspect %s' % self.path_idmap) + if returncode != 0: + return returncode, stdout, stderr + all_matches = re.findall('\s' + self.pattern + '$', stdout, flags=re.MULTILINE) + if len(all_matches) != self.expected_n: + return 1, 'pattern=%s idmap=%s expected=%d found=%d\n' % (self.pattern, self.path_idmap, self.expected_n, len(all_matches)), '' + return 0, "", "" + +class Md5Test: + def __init__(self, path, expected_content): + self.path = path + self.expected_md5 = hashlib.md5(expected_content).hexdigest() + + def get_type(self): + return TASK_MD5_TEST + + def get_name(self): + return self.path + + def execute(self): + returncode, stdout, stderr = _adb_shell('md5 %s' % self.path) + if returncode != 0: + return returncode, stdout, stderr + actual_md5 = stdout.split()[0] + if actual_md5 != self.expected_md5: + return 1, 'expected %s, got %s\n' % (self.expected_md5, actual_md5), '' + return 0, "", "" + +class StartTask: + def get_type(self): + return TASK_START + + def get_name(self): + return "" + + def execute(self): + (returncode, stdout, stderr) = _adb_shell('start') + if returncode != 0: + return returncode, stdout, stderr + + while True: + (returncode, stdout, stderr) = _adb_shell('getprop dev.bootcomplete') + if returncode != 0: + return returncode, stdout, stderr + if stdout.strip() == "1": + break + time.sleep(0.5) + + return 0, "", "" + +class StopTask: + def get_type(self): + return TASK_STOP + + def get_name(self): + return "" + + def execute(self): + (returncode, stdout, stderr) = _adb_shell('stop') + if returncode != 0: + return returncode, stdout, stderr + return _adb_shell('setprop dev.bootcomplete 0') + +class RootTask: + def get_type(self): + return TASK_ROOT + + def get_name(self): + return "" + + def execute(self): + (returncode, stdout, stderr) = _adb_shell('getprop service.adb.root 0') + if returncode != 0: + return returncode, stdout, stderr + if stdout.strip() == '1': # already root + return 0, "", "" + + argv = shlex.split(adb + ' root') + proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate() + if proc.returncode != 0: + return proc.returncode, stdout, stderr + + argv = shlex.split(adb + ' wait-for-device') + proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate() + return proc.returncode, stdout, stderr + +class RemountTask: + def get_type(self): + return TASK_REMOUNT + + def get_name(self): + return "" + + def execute(self): + argv = shlex.split(adb + ' remount') + proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate() + # adb remount returns 0 even if the operation failed, so check stdout + if stdout.startswith('remount failed:'): + return 1, stdout, stderr + return proc.returncode, stdout, stderr + +class CompoundTask: + def __init__(self, type, tasks): + self.type = type + self.tasks = tasks + + def get_type(self): + return self.type + + def get_name(self): + return "" + + def execute(self): + for t in self.tasks: + (returncode, stdout, stderr) = t.execute() + if returncode != 0: + return returncode, stdout, stderr + return 0, "", "" + +def _create_disable_overlays_task(): + tasks = [ + RmTask("/vendor/overlay/framework_a.apk"), + RmTask("/vendor/overlay/framework_b.apk"), + RmTask("/data/resource-cache/vendor@overlay@framework_a.apk@idmap"), + RmTask("/data/resource-cache/vendor@overlay@framework_b.apk@idmap"), + RmTask("/vendor/overlay/app_a.apk"), + RmTask("/vendor/overlay/app_b.apk"), + RmTask("/data/resource-cache/vendor@overlay@app_a.apk@idmap"), + RmTask("/data/resource-cache/vendor@overlay@app_b.apk@idmap"), + ] + return CompoundTask(TASK_DISABLE_OVERLAYS, tasks) + +def _create_enable_single_overlay_task(): + tasks = [ + _create_disable_overlays_task(), + MkdirTask('/system/vendor'), + MkdirTask('/vendor/overlay'), + PushTask('/data/app/com.android.overlaytest.overlay.apk', '/vendor/overlay/framework_a.apk'), + PushTask('/data/app/com.android.overlaytest.first_app_overlay.apk', '/vendor/overlay/app_a.apk'), + ] + return CompoundTask(TASK_ENABLE_SINGLE_OVERLAY, tasks) + +def _create_enable_multiple_overlays_task(): + tasks = [ + _create_disable_overlays_task(), + MkdirTask('/system/vendor'), + MkdirTask('/vendor/overlay'), + + PushTask('/data/app/com.android.overlaytest.overlay.apk', '/vendor/overlay/framework_b.apk'), + PushTask('/data/app/com.android.overlaytest.first_app_overlay.apk', '/vendor/overlay/app_a.apk'), + PushTask('/data/app/com.android.overlaytest.second_app_overlay.apk', '/vendor/overlay/app_b.apk'), + ] + return CompoundTask(TASK_ENABLE_MULTIPLE_OVERLAYS, tasks) + +def _create_setup_idmap_path_task(idmaps, symlinks): + tasks = [ + _create_enable_single_overlay_task(), + RmTask(symlinks), + RmTask(idmaps), + MkdirTask(idmaps), + MkdirTask(symlinks), + ] + return CompoundTask(TASK_SETUP_IDMAP_PATH, tasks) + +def _create_setup_idmap_scan_task(idmaps, symlinks): + tasks = [ + _create_enable_single_overlay_task(), + RmTask(symlinks), + RmTask(idmaps), + MkdirTask(idmaps), + MkdirTask(symlinks), + _create_enable_multiple_overlays_task(), + ] + return CompoundTask(TASK_SETUP_IDMAP_SCAN, tasks) + +def _handle_instrumentation_task_output(stdout, printer): + regex_status_code = re.compile(r'^INSTRUMENTATION_STATUS_CODE: -?(\d+)') + regex_name = re.compile(r'^INSTRUMENTATION_STATUS: test=(.*)') + regex_begin_stack = re.compile(r'^INSTRUMENTATION_STATUS: stack=(.*)') + regex_end_stack = re.compile(r'^$') + + failed_tests = 0 + current_test = None + current_stack = [] + mode_stack = False + for line in stdout.split("\n"): + line = line.rstrip() # strip \r from adb output + m = regex_status_code.match(line) + if m: + c = int(m.group(1)) + if c == 1: + printer.begin(TASK_INSTRUMENTATION_TEST, current_test) + elif c == 0: + printer.end_pass(TASK_INSTRUMENTATION_TEST, current_test) + else: + failed_tests += 1 + current_stack.append("\n") + msg = "\n".join(current_stack) + printer.end_fail(TASK_INSTRUMENTATION_TEST, current_test, msg.rstrip() + '\n') + continue + + m = regex_name.match(line) + if m: + current_test = m.group(1) + continue + + m = regex_begin_stack.match(line) + if m: + mode_stack = True + current_stack = [] + current_stack.append(" " + m.group(1)) + continue + + m = regex_end_stack.match(line) + if m: + mode_stack = False + continue + + if mode_stack: + current_stack.append(" " + line.strip()) + + return failed_tests + +def _set_adb_device(option, opt, value, parser): + global adb + if opt == '-d' or opt == '--device': + adb = 'adb -d' + if opt == '-e' or opt == '--emulator': + adb = 'adb -e' + if opt == '-s' or opt == '--serial': + adb = 'adb -s ' + value + +def _create_opt_parser(): + parser = optparse.OptionParser() + parser.add_option('-d', '--device', action='callback', callback=_set_adb_device, + help='pass -d to adb') + parser.add_option('-e', '--emulator', action='callback', callback=_set_adb_device, + help='pass -e to adb') + parser.add_option('-s', '--serial', type="str", action='callback', callback=_set_adb_device, + help='pass -s <serical> to adb') + parser.add_option('-C', '--no-color', action='store_false', + dest='use_color', default=True, + help='disable color escape sequences in output') + parser.add_option('-q', '--quiet', action='store_true', + dest='quiet_mode', default=False, + help='quiet mode, output only results') + parser.add_option('-b', '--no-build', action='store_false', + dest='do_build', default=True, + help='do not rebuild test projects') + parser.add_option('-k', '--continue', action='store_true', + dest='do_continue', default=False, + help='do not rebuild test projects') + parser.add_option('-i', '--test-idmap', action='store_true', + dest='test_idmap', default=False, + help='run tests for single overlay') + parser.add_option('-0', '--test-no-overlay', action='store_true', + dest='test_no_overlay', default=False, + help='run tests without any overlay') + parser.add_option('-1', '--test-single-overlay', action='store_true', + dest='test_single_overlay', default=False, + help='run tests for single overlay') + parser.add_option('-2', '--test-multiple-overlays', action='store_true', + dest='test_multiple_overlays', default=False, + help='run tests for multiple overlays') + return parser + +if __name__ == '__main__': + opt_parser = _create_opt_parser() + opts, args = opt_parser.parse_args(sys.argv[1:]) + if not opts.test_idmap and not opts.test_no_overlay and not opts.test_single_overlay and not opts.test_multiple_overlays: + opts.test_idmap = True + opts.test_no_overlay = True + opts.test_single_overlay = True + opts.test_multiple_overlays = True + if len(args) > 0: + opt_parser.error("unexpected arguments: %s" % " ".join(args)) + # will never reach this: opt_parser.error will call sys.exit + + if opts.quiet_mode: + printer = QuietPrinter() + else: + printer = VerbosePrinter(opts.use_color) + tasks = [] + + # must be in the same directory as this script for compilation tasks to work + script = sys.argv[0] + dirname = os.path.dirname(script) + wd = os.path.realpath(dirname) + os.chdir(wd) + + # build test cases + if opts.do_build: + tasks.append(CompilationTask('OverlayTest/Android.mk')) + tasks.append(CompilationTask('OverlayTestOverlay/Android.mk')) + tasks.append(CompilationTask('OverlayAppFirst/Android.mk')) + tasks.append(CompilationTask('OverlayAppSecond/Android.mk')) + + # remount filesystem, install test project + tasks.append(RootTask()) + tasks.append(RemountTask()) + tasks.append(PushTask('/system/app/OverlayTest.apk', '/system/app/OverlayTest.apk')) + + # test idmap + if opts.test_idmap: + idmaps='/data/local/tmp/idmaps' + symlinks='/data/local/tmp/symlinks' + + # idmap --path + tasks.append(StopTask()) + tasks.append(_create_setup_idmap_path_task(idmaps, symlinks)) + tasks.append(StartTask()) + tasks.append(IdmapPathTask('/vendor/overlay/framework_a.apk', '/system/framework/framework-res.apk', idmaps + '/a.idmap')) + tasks.append(FileExistsTest(idmaps + '/a.idmap')) + tasks.append(GrepIdmapTest(idmaps + '/a.idmap', 'bool/config_annoy_dianne', 1)) + + # idmap --scan + idmap = idmaps + '/vendor@overlay@framework_b.apk@idmap' + tasks.append(StopTask()) + tasks.append(_create_setup_idmap_scan_task(idmaps, symlinks)) + tasks.append(StartTask()) + tasks.append(IdmapScanTask('/vendor/overlay', 'android', '/system/framework/framework-res.apk', idmaps, symlinks)) + tasks.append(FileExistsTest(idmap)) + tasks.append(GrepIdmapTest(idmap, 'bool/config_annoy_dianne', 1)) + + # overlays.list + overlays_list_path = '/data/resource-cache/overlays.list' + expected_content = '''\ +/vendor/overlay/framework_b.apk /data/resource-cache/vendor@overlay@framework_b.apk@idmap +''' + tasks.append(FileExistsTest(overlays_list_path)) + tasks.append(Md5Test(overlays_list_path, expected_content)) + + # idmap cleanup + tasks.append(RmTask(symlinks)) + tasks.append(RmTask(idmaps)) + + # test no overlay + if opts.test_no_overlay: + tasks.append(StopTask()) + tasks.append(_create_disable_overlays_task()) + tasks.append(StartTask()) + tasks.append(InstrumentationTask('com.android.overlaytest.WithoutOverlayTest')) + + # test single overlay + if opts.test_single_overlay: + tasks.append(StopTask()) + tasks.append(_create_enable_single_overlay_task()) + tasks.append(StartTask()) + tasks.append(InstrumentationTask('com.android.overlaytest.WithOverlayTest')) + + # test multiple overlays + if opts.test_multiple_overlays: + tasks.append(StopTask()) + tasks.append(_create_enable_multiple_overlays_task()) + tasks.append(StartTask()) + tasks.append(InstrumentationTask('com.android.overlaytest.WithMultipleOverlaysTest')) + + ignored_errors = 0 + for t in tasks: + type = t.get_type() + name = t.get_name() + if type == TASK_INSTRUMENTATION: + # InstrumentationTask will run several tests, but we want it + # to appear as if each test was run individually. Calling + # "am instrument" with a single test method is prohibitively + # expensive, so let's instead post-process the output to + # emulate individual calls. + retcode, stdout, stderr = t.execute() + if retcode != 0: + printer.begin(TASK_INSTRUMENTATION, name) + printer.end_fail(TASK_INSTRUMENTATION, name, stderr) + sys.exit(retcode) + retcode = _handle_instrumentation_task_output(stdout, printer) + if retcode != 0: + if not opts.do_continue: + sys.exit(retcode) + else: + ignored_errors += retcode + else: + printer.begin(type, name) + retcode, stdout, stderr = t.execute() + if retcode == 0: + printer.end_pass(type, name) + if retcode != 0: + if len(stderr) == 0: + # hope for output from stdout instead (true for eg adb shell rm) + stderr = stdout + printer.end_fail(type, name, stderr) + if not opts.do_continue: + sys.exit(retcode) + else: + ignored_errors += retcode + sys.exit(ignored_errors) |
