#!/usr/bin/python import hashlib import optparse import os import re import shlex import subprocess import sys import threading import time TASK_COMPILATION = 'compile' TASK_DISABLE_OVERLAYS = 'disable overlays' TASK_ENABLE_MULTIPLE_OVERLAYS = 'enable multiple overlays' TASK_ENABLE_SINGLE_OVERLAY = 'enable single overlay' TASK_FILE_EXISTS_TEST = 'test (file exists)' TASK_GREP_IDMAP_TEST = 'test (grep idmap)' TASK_MD5_TEST = 'test (md5)' TASK_IDMAP_PATH = 'idmap --path' TASK_IDMAP_SCAN = 'idmap --scan' TASK_INSTRUMENTATION = 'instrumentation' TASK_INSTRUMENTATION_TEST = 'test (instrumentation)' TASK_MKDIR = 'mkdir' TASK_PUSH = 'push' TASK_ROOT = 'root' TASK_REMOUNT = 'remount' TASK_RM = 'rm' TASK_SETUP_IDMAP_PATH = 'setup idmap --path' TASK_SETUP_IDMAP_SCAN = 'setup idmap --scan' TASK_START = 'start' TASK_STOP = 'stop' adb = 'adb' def _adb_shell(cmd): argv = shlex.split(adb + " shell '" + cmd + "; echo $?'") proc = subprocess.Popen(argv, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = proc.communicate() (stdout, stderr) = (stdout.replace('\r', ''), stderr.replace('\r', '')) tmp = stdout.rsplit('\n', 2) if len(tmp) == 2: stdout == '' returncode = int(tmp[0]) else: stdout = tmp[0] + '\n' returncode = int(tmp[1]) return returncode, stdout, stderr class VerbosePrinter: class Ticker(threading.Thread): def _print(self): s = '\r' + self.text + '[' + '.' * self.i + ' ' * (4 - self.i) + ']' sys.stdout.write(s) sys.stdout.flush() self.i = (self.i + 1) % 5 def __init__(self, cond_var, text): threading.Thread.__init__(self) self.text = text self.setDaemon(True) self.cond_var = cond_var self.running = False self.i = 0 self._print() self.running = True def run(self): self.cond_var.acquire() while True: self.cond_var.wait(0.25) running = self.running if not running: break self._print() self.cond_var.release() def stop(self): self.cond_var.acquire() self.running = False self.cond_var.notify_all() self.cond_var.release() def _start_ticker(self): self.ticker = VerbosePrinter.Ticker(self.cond_var, self.text) self.ticker.start() def _stop_ticker(self): self.ticker.stop() self.ticker.join() self.ticker = None def _format_begin(self, type, name): N = self.width - len(type) - len(' [ ] ') fmt = '%%s %%-%ds ' % N return fmt % (type, name) def __init__(self, use_color): self.cond_var = threading.Condition() self.ticker = None if use_color: self.color_RED = '\033[1;31m' self.color_red = '\033[0;31m' self.color_reset = '\033[0;37m' else: self.color_RED = '' self.color_red = '' self.color_reset = '' argv = shlex.split('stty size') # get terminal width proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = proc.communicate() if proc.returncode == 0: (h, w) = stdout.split() self.width = int(w) else: self.width = 72 # conservative guesstimate def begin(self, type, name): self.text = self._format_begin(type, name) sys.stdout.write(self.text + '[ ]') sys.stdout.flush() self._start_ticker() def end_pass(self, type, name): self._stop_ticker() sys.stdout.write('\r' + self.text + '[ OK ]\n') sys.stdout.flush() def end_fail(self, type, name, msg): self._stop_ticker() sys.stdout.write('\r' + self.color_RED + self.text + '[FAIL]\n') sys.stdout.write(self.color_red) sys.stdout.write(msg) sys.stdout.write(self.color_reset) sys.stdout.flush() class QuietPrinter: def begin(self, type, name): pass def end_pass(self, type, name): sys.stdout.write('PASS ' + type + ' ' + name + '\n') sys.stdout.flush() def end_fail(self, type, name, msg): sys.stdout.write('FAIL ' + type + ' ' + name + '\n') sys.stdout.flush() class CompilationTask: def __init__(self, makefile): self.makefile = makefile def get_type(self): return TASK_COMPILATION def get_name(self): return self.makefile def execute(self): os.putenv('ONE_SHOT_MAKEFILE', os.getcwd() + "/" + self.makefile) argv = shlex.split('make -C "../../../../../" files') proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = proc.communicate() return proc.returncode, stdout, stderr class InstrumentationTask: def __init__(self, instrumentation_class): self.instrumentation_class = instrumentation_class def get_type(self): return TASK_INSTRUMENTATION def get_name(self): return self.instrumentation_class def execute(self): return _adb_shell('am instrument -r -w -e class %s com.android.overlaytest/android.test.InstrumentationTestRunner' % self.instrumentation_class) class PushTask: def __init__(self, src, dest): self.src = src self.dest = dest def get_type(self): return TASK_PUSH def get_name(self): return "%s -> %s" % (self.src, self.dest) def execute(self): src = os.getenv('OUT') + "/" + self.src argv = shlex.split(adb + ' push %s %s' % (src, self.dest)) proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = proc.communicate() return proc.returncode, stdout, stderr class MkdirTask: def __init__(self, path): self.path = path def get_type(self): return TASK_MKDIR def get_name(self): return self.path def execute(self): return _adb_shell('mkdir -p %s' % self.path) class RmTask: def __init__(self, path): self.path = path def get_type(self): return TASK_RM def get_name(self): return self.path def execute(self): returncode, stdout, stderr = _adb_shell('ls %s' % self.path) if returncode != 0 and stdout.endswith(': No such file or directory\n'): return 0, "", "" return _adb_shell('rm -r %s' % self.path) class IdmapPathTask: def __init__(self, path_target_apk, path_overlay_apk, path_idmap): self.path_target_apk = path_target_apk self.path_overlay_apk = path_overlay_apk self.path_idmap = path_idmap def get_type(self): return TASK_IDMAP_PATH def get_name(self): return self.path_idmap def execute(self): return _adb_shell('su system idmap --path "%s" "%s" "%s"' % (self.path_target_apk, self.path_overlay_apk, self.path_idmap)) class IdmapScanTask: def __init__(self, overlay_dir, target_pkg_name, target_pkg, idmap_dir, symlink_dir): self.overlay_dir = overlay_dir self.target_pkg_name = target_pkg_name self.target_pkg = target_pkg self.idmap_dir = idmap_dir self.symlink_dir = symlink_dir def get_type(self): return TASK_IDMAP_SCAN def get_name(self): return self.target_pkg_name def execute(self): return _adb_shell('su system idmap --scan "%s" "%s" "%s" "%s"' % (self.overlay_dir, self.target_pkg_name, self.target_pkg, self.idmap_dir)) class FileExistsTest: def __init__(self, path): self.path = path def get_type(self): return TASK_FILE_EXISTS_TEST def get_name(self): return self.path def execute(self): return _adb_shell('ls %s' % self.path) class GrepIdmapTest: def __init__(self, path_idmap, pattern, expected_n): self.path_idmap = path_idmap self.pattern = pattern self.expected_n = expected_n def get_type(self): return TASK_GREP_IDMAP_TEST def get_name(self): return self.pattern def execute(self): returncode, stdout, stderr = _adb_shell('idmap --inspect %s' % self.path_idmap) if returncode != 0: return returncode, stdout, stderr all_matches = re.findall('\s' + self.pattern + '$', stdout, flags=re.MULTILINE) if len(all_matches) != self.expected_n: return 1, 'pattern=%s idmap=%s expected=%d found=%d\n' % (self.pattern, self.path_idmap, self.expected_n, len(all_matches)), '' return 0, "", "" class Md5Test: def __init__(self, path, expected_content): self.path = path self.expected_md5 = hashlib.md5(expected_content).hexdigest() def get_type(self): return TASK_MD5_TEST def get_name(self): return self.path def execute(self): returncode, stdout, stderr = _adb_shell('md5 %s' % self.path) if returncode != 0: return returncode, stdout, stderr actual_md5 = stdout.split()[0] if actual_md5 != self.expected_md5: return 1, 'expected %s, got %s\n' % (self.expected_md5, actual_md5), '' return 0, "", "" class StartTask: def get_type(self): return TASK_START def get_name(self): return "" def execute(self): (returncode, stdout, stderr) = _adb_shell('start') if returncode != 0: return returncode, stdout, stderr while True: (returncode, stdout, stderr) = _adb_shell('getprop dev.bootcomplete') if returncode != 0: return returncode, stdout, stderr if stdout.strip() == "1": break time.sleep(0.5) return 0, "", "" class StopTask: def get_type(self): return TASK_STOP def get_name(self): return "" def execute(self): (returncode, stdout, stderr) = _adb_shell('stop') if returncode != 0: return returncode, stdout, stderr return _adb_shell('setprop dev.bootcomplete 0') class RootTask: def get_type(self): return TASK_ROOT def get_name(self): return "" def execute(self): (returncode, stdout, stderr) = _adb_shell('getprop service.adb.root 0') if returncode != 0: return returncode, stdout, stderr if stdout.strip() == '1': # already root return 0, "", "" argv = shlex.split(adb + ' root') proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = proc.communicate() if proc.returncode != 0: return proc.returncode, stdout, stderr argv = shlex.split(adb + ' wait-for-device') proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = proc.communicate() return proc.returncode, stdout, stderr class RemountTask: def get_type(self): return TASK_REMOUNT def get_name(self): return "" def execute(self): argv = shlex.split(adb + ' remount') proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout, stderr) = proc.communicate() # adb remount returns 0 even if the operation failed, so check stdout if stdout.startswith('remount failed:'): return 1, stdout, stderr return proc.returncode, stdout, stderr class CompoundTask: def __init__(self, type, tasks): self.type = type self.tasks = tasks def get_type(self): return self.type def get_name(self): return "" def execute(self): for t in self.tasks: (returncode, stdout, stderr) = t.execute() if returncode != 0: return returncode, stdout, stderr return 0, "", "" def _create_disable_overlays_task(): tasks = [ RmTask("/vendor/overlay/framework_a.apk"), RmTask("/vendor/overlay/framework_b.apk"), RmTask("/data/resource-cache/vendor@overlay@framework_a.apk@idmap"), RmTask("/data/resource-cache/vendor@overlay@framework_b.apk@idmap"), RmTask("/vendor/overlay/app_a.apk"), RmTask("/vendor/overlay/app_b.apk"), RmTask("/data/resource-cache/vendor@overlay@app_a.apk@idmap"), RmTask("/data/resource-cache/vendor@overlay@app_b.apk@idmap"), ] return CompoundTask(TASK_DISABLE_OVERLAYS, tasks) def _create_enable_single_overlay_task(): tasks = [ _create_disable_overlays_task(), MkdirTask('/system/vendor'), MkdirTask('/vendor/overlay'), PushTask('/data/app/com.android.overlaytest.overlay/com.android.overlaytest.overlay.apk', '/vendor/overlay/framework_a.apk'), PushTask('/data/app/com.android.overlaytest.first_app_overlay/com.android.overlaytest.first_app_overlay.apk', '/vendor/overlay/app_a.apk'), ] return CompoundTask(TASK_ENABLE_SINGLE_OVERLAY, tasks) def _create_enable_multiple_overlays_task(): tasks = [ _create_disable_overlays_task(), MkdirTask('/system/vendor'), MkdirTask('/vendor/overlay'), PushTask('/data/app/com.android.overlaytest.overlay/com.android.overlaytest.overlay.apk', '/vendor/overlay/framework_b.apk'), PushTask('/data/app/com.android.overlaytest.first_app_overlay/com.android.overlaytest.first_app_overlay.apk', '/vendor/overlay/app_a.apk'), PushTask('/data/app/com.android.overlaytest.second_app_overlay/com.android.overlaytest.second_app_overlay.apk', '/vendor/overlay/app_b.apk'), ] return CompoundTask(TASK_ENABLE_MULTIPLE_OVERLAYS, tasks) def _create_setup_idmap_path_task(idmaps, symlinks): tasks = [ _create_enable_single_overlay_task(), RmTask(symlinks), RmTask(idmaps), MkdirTask(idmaps), MkdirTask(symlinks), ] return CompoundTask(TASK_SETUP_IDMAP_PATH, tasks) def _create_setup_idmap_scan_task(idmaps, symlinks): tasks = [ _create_enable_single_overlay_task(), RmTask(symlinks), RmTask(idmaps), MkdirTask(idmaps), MkdirTask(symlinks), _create_enable_multiple_overlays_task(), ] return CompoundTask(TASK_SETUP_IDMAP_SCAN, tasks) def _handle_instrumentation_task_output(stdout, printer): regex_status_code = re.compile(r'^INSTRUMENTATION_STATUS_CODE: -?(\d+)') regex_name = re.compile(r'^INSTRUMENTATION_STATUS: test=(.*)') regex_begin_stack = re.compile(r'^INSTRUMENTATION_STATUS: stack=(.*)') regex_end_stack = re.compile(r'^$') failed_tests = 0 current_test = None current_stack = [] mode_stack = False for line in stdout.split("\n"): line = line.rstrip() # strip \r from adb output m = regex_status_code.match(line) if m: c = int(m.group(1)) if c == 1: printer.begin(TASK_INSTRUMENTATION_TEST, current_test) elif c == 0: printer.end_pass(TASK_INSTRUMENTATION_TEST, current_test) else: failed_tests += 1 current_stack.append("\n") msg = "\n".join(current_stack) printer.end_fail(TASK_INSTRUMENTATION_TEST, current_test, msg.rstrip() + '\n') continue m = regex_name.match(line) if m: current_test = m.group(1) continue m = regex_begin_stack.match(line) if m: mode_stack = True current_stack = [] current_stack.append(" " + m.group(1)) continue m = regex_end_stack.match(line) if m: mode_stack = False continue if mode_stack: current_stack.append(" " + line.strip()) return failed_tests def _set_adb_device(option, opt, value, parser): global adb if opt == '-d' or opt == '--device': adb = 'adb -d' if opt == '-e' or opt == '--emulator': adb = 'adb -e' if opt == '-s' or opt == '--serial': adb = 'adb -s ' + value def _create_opt_parser(): parser = optparse.OptionParser() parser.add_option('-d', '--device', action='callback', callback=_set_adb_device, help='pass -d to adb') parser.add_option('-e', '--emulator', action='callback', callback=_set_adb_device, help='pass -e to adb') parser.add_option('-s', '--serial', type="str", action='callback', callback=_set_adb_device, help='pass -s to adb') parser.add_option('-C', '--no-color', action='store_false', dest='use_color', default=True, help='disable color escape sequences in output') parser.add_option('-q', '--quiet', action='store_true', dest='quiet_mode', default=False, help='quiet mode, output only results') parser.add_option('-b', '--no-build', action='store_false', dest='do_build', default=True, help='do not rebuild test projects') parser.add_option('-k', '--continue', action='store_true', dest='do_continue', default=False, help='do not rebuild test projects') parser.add_option('-i', '--test-idmap', action='store_true', dest='test_idmap', default=False, help='run tests for single overlay') parser.add_option('-0', '--test-no-overlay', action='store_true', dest='test_no_overlay', default=False, help='run tests without any overlay') parser.add_option('-1', '--test-single-overlay', action='store_true', dest='test_single_overlay', default=False, help='run tests for single overlay') parser.add_option('-2', '--test-multiple-overlays', action='store_true', dest='test_multiple_overlays', default=False, help='run tests for multiple overlays') return parser if __name__ == '__main__': opt_parser = _create_opt_parser() opts, args = opt_parser.parse_args(sys.argv[1:]) if not opts.test_idmap and not opts.test_no_overlay and not opts.test_single_overlay and not opts.test_multiple_overlays: opts.test_idmap = True opts.test_no_overlay = True opts.test_single_overlay = True opts.test_multiple_overlays = True if len(args) > 0: opt_parser.error("unexpected arguments: %s" % " ".join(args)) # will never reach this: opt_parser.error will call sys.exit if opts.quiet_mode: printer = QuietPrinter() else: printer = VerbosePrinter(opts.use_color) tasks = [] # must be in the same directory as this script for compilation tasks to work script = sys.argv[0] dirname = os.path.dirname(script) wd = os.path.realpath(dirname) os.chdir(wd) # build test cases if opts.do_build: tasks.append(CompilationTask('OverlayTest/Android.mk')) tasks.append(CompilationTask('OverlayTestOverlay/Android.mk')) tasks.append(CompilationTask('OverlayAppFirst/Android.mk')) tasks.append(CompilationTask('OverlayAppSecond/Android.mk')) # remount filesystem, install test project tasks.append(RootTask()) tasks.append(RemountTask()) tasks.append(PushTask('/system/app/OverlayTest/OverlayTest.apk', '/system/app/OverlayTest.apk')) # test idmap if opts.test_idmap: idmaps='/data/local/tmp/idmaps' symlinks='/data/local/tmp/symlinks' # idmap --path tasks.append(StopTask()) tasks.append(_create_setup_idmap_path_task(idmaps, symlinks)) tasks.append(StartTask()) tasks.append(IdmapPathTask('/vendor/overlay/framework_a.apk', '/system/framework/framework-res.apk', idmaps + '/a.idmap')) tasks.append(FileExistsTest(idmaps + '/a.idmap')) tasks.append(GrepIdmapTest(idmaps + '/a.idmap', 'bool/config_annoy_dianne', 1)) # idmap --scan idmap = idmaps + '/vendor@overlay@framework_b.apk@idmap' tasks.append(StopTask()) tasks.append(_create_setup_idmap_scan_task(idmaps, symlinks)) tasks.append(StartTask()) tasks.append(IdmapScanTask('/vendor/overlay', 'android', '/system/framework/framework-res.apk', idmaps, symlinks)) tasks.append(FileExistsTest(idmap)) tasks.append(GrepIdmapTest(idmap, 'bool/config_annoy_dianne', 1)) # overlays.list overlays_list_path = idmaps + '/overlays.list' expected_content = '''\ /vendor/overlay/framework_b.apk /data/local/tmp/idmaps/vendor@overlay@framework_b.apk@idmap ''' tasks.append(FileExistsTest(overlays_list_path)) tasks.append(Md5Test(overlays_list_path, expected_content)) # idmap cleanup tasks.append(RmTask(symlinks)) tasks.append(RmTask(idmaps)) # test no overlay if opts.test_no_overlay: tasks.append(StopTask()) tasks.append(_create_disable_overlays_task()) tasks.append(StartTask()) tasks.append(InstrumentationTask('com.android.overlaytest.WithoutOverlayTest')) # test single overlay if opts.test_single_overlay: tasks.append(StopTask()) tasks.append(_create_enable_single_overlay_task()) tasks.append(StartTask()) tasks.append(InstrumentationTask('com.android.overlaytest.WithOverlayTest')) # test multiple overlays if opts.test_multiple_overlays: tasks.append(StopTask()) tasks.append(_create_enable_multiple_overlays_task()) tasks.append(StartTask()) tasks.append(InstrumentationTask('com.android.overlaytest.WithMultipleOverlaysTest')) ignored_errors = 0 for t in tasks: type = t.get_type() name = t.get_name() if type == TASK_INSTRUMENTATION: # InstrumentationTask will run several tests, but we want it # to appear as if each test was run individually. Calling # "am instrument" with a single test method is prohibitively # expensive, so let's instead post-process the output to # emulate individual calls. retcode, stdout, stderr = t.execute() if retcode != 0: printer.begin(TASK_INSTRUMENTATION, name) printer.end_fail(TASK_INSTRUMENTATION, name, stderr) sys.exit(retcode) retcode = _handle_instrumentation_task_output(stdout, printer) if retcode != 0: if not opts.do_continue: sys.exit(retcode) else: ignored_errors += retcode else: printer.begin(type, name) retcode, stdout, stderr = t.execute() if retcode == 0: printer.end_pass(type, name) if retcode != 0: if len(stderr) == 0: # hope for output from stdout instead (true for eg adb shell rm) stderr = stdout printer.end_fail(type, name, stderr) if not opts.do_continue: sys.exit(retcode) else: ignored_errors += retcode sys.exit(ignored_errors)