diff options
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py')
-rw-r--r-- | Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py | 175 |
1 files changed, 138 insertions, 37 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py b/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py index f097b83..0522d39 100644 --- a/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py +++ b/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py @@ -34,25 +34,65 @@ workers and receive their completion messages accordingly. """ import logging +import time +from webkitpy.tool import grammar from webkitpy.layout_tests.layout_package import manager_worker_broker from webkitpy.layout_tests.layout_package import test_runner from webkitpy.layout_tests.layout_package import worker + _log = logging.getLogger(__name__) +class _WorkerState(object): + """A class for the TestRunner/manager to use to track the current state + of the workers.""" + def __init__(self, number, worker_connection): + self.worker_connection = worker_connection + self.number = number + self.done = False + self.current_test_name = None + self.next_timeout = None + self.wedged = False + self.stats = {} + self.stats['name'] = worker_connection.name + self.stats['num_tests'] = 0 + self.stats['total_time'] = 0 + + def __repr__(self): + return "_WorkerState(" + str(self.__dict__) + ")" + + class TestRunner2(test_runner.TestRunner): def __init__(self, port, options, printer): test_runner.TestRunner.__init__(self, port, options, printer) self._all_results = [] self._group_stats = {} self._current_result_summary = None - self._done = False + + # This maps worker names to the state we are tracking for each of them. + self._worker_states = {} def is_done(self): - return self._done + worker_states = self._worker_states.values() + return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states) + + def _worker_is_done(self, worker_state): + t = time.time() + if worker_state.done or worker_state.wedged: + return True + + next_timeout = worker_state.next_timeout + WEDGE_PADDING = 40.0 + if next_timeout and t > next_timeout + WEDGE_PADDING: + _log.error('') + worker_state.worker_connection.log_wedged_worker(worker_state.current_test_name) + _log.error('') + worker_state.wedged = True + return True + return False def name(self): return 'TestRunner2' @@ -60,8 +100,9 @@ class TestRunner2(test_runner.TestRunner): def _run_tests(self, file_list, result_summary): """Runs the tests in the file_list. - Return: A tuple (keyboard_interrupted, thread_timings, test_timings, - individual_test_timings) + Return: A tuple (interrupted, keyboard_interrupted, thread_timings, + test_timings, individual_test_timings) + interrupted is whether the run was interrupted keyboard_interrupted is whether someone typed Ctrl^C thread_timings is a list of dicts with the total runtime of each thread with 'name', 'num_tests', 'total_time' properties @@ -72,58 +113,118 @@ class TestRunner2(test_runner.TestRunner): result_summary: summary object to populate with the results """ self._current_result_summary = result_summary + self._all_results = [] + self._group_stats = {} + self._worker_states = {} + + num_workers = self._num_workers() + keyboard_interrupted = False + interrupted = False + thread_timings = [] + + self._printer.print_update('Sharding tests ...') + test_lists = self._shard_tests(file_list, + num_workers > 1 and not self._options.experimental_fully_parallel) + _log.debug("Using %d shards" % len(test_lists)) - # FIXME: shard properly. + manager_connection = manager_worker_broker.get(self._port, self._options, + self, worker.Worker) - # FIXME: should shard_tests return a list of objects rather than tuples? - test_lists = self._shard_tests(file_list, False) + if self._options.dry_run: + return (keyboard_interrupted, interrupted, thread_timings, + self._group_stats, self._all_results) - manager_connection = manager_worker_broker.get(self._port, self._options, self, worker.Worker) + self._printer.print_update('Starting %s ...' % + grammar.pluralize('worker', num_workers)) + for worker_number in xrange(num_workers): + worker_connection = manager_connection.start_worker(worker_number) + worker_state = _WorkerState(worker_number, worker_connection) + self._worker_states[worker_connection.name] = worker_state - # FIXME: start all of the workers. - manager_connection.start_worker(0) + # FIXME: If we start workers up too quickly, DumpRenderTree appears + # to thrash on something and time out its first few tests. Until + # we can figure out what's going on, sleep a bit in between + # workers. + time.sleep(0.1) + self._printer.print_update("Starting testing ...") for test_list in test_lists: manager_connection.post_message('test_list', test_list[0], test_list[1]) - manager_connection.post_message('stop') + # We post one 'stop' message for each worker. Because the stop message + # are sent after all of the tests, and because each worker will stop + # reading messsages after receiving a stop, we can be sure each + # worker will get a stop message and hence they will all shut down. + for i in xrange(num_workers): + manager_connection.post_message('stop') - keyboard_interrupted = False - interrupted = False - if not self._options.dry_run: - while not self._check_if_done(): + try: + while not self.is_done(): + # We loop with a timeout in order to be able to detect wedged threads. manager_connection.run_message_loop(delay_secs=1.0) - # FIXME: implement stats. - thread_timings = [] + if any(worker_state.wedged for worker_state in self._worker_states.values()): + _log.error('') + _log.error('Remaining workers are wedged, bailing out.') + _log.error('') + else: + _log.debug('No wedged threads') + + # Make sure all of the workers have shut down (if possible). + for worker_state in self._worker_states.values(): + if not worker_state.wedged and worker_state.worker_connection.is_alive(): + worker_state.worker_connection.join(0.5) + assert not worker_state.worker_connection.is_alive() + + except KeyboardInterrupt: + _log.info("Interrupted, exiting") + self.cancel_workers() + keyboard_interrupted = True + except test_runner.TestRunInterruptedException, e: + _log.info(e.reason) + self.cancel_workers() + interrupted = True + except: + # Unexpected exception; don't try to clean up workers. + _log.info("Exception raised, exiting") + raise + + thread_timings = [worker_state.stats for worker_state in self._worker_states.values()] # FIXME: should this be a class instead of a tuple? - return (keyboard_interrupted, interrupted, thread_timings, + return (interrupted, keyboard_interrupted, thread_timings, self._group_stats, self._all_results) - def _check_if_done(self): - """Returns true iff all the workers have either completed or wedged.""" - # FIXME: implement to check for wedged workers. - return self._done + def cancel_workers(self): + for worker_state in self._worker_states.values(): + worker_state.worker_connection.cancel() - def handle_started_test(self, src, test_info, hang_timeout): - # FIXME: implement - pass + def handle_started_test(self, source, test_info, hang_timeout): + worker_state = self._worker_states[source] + worker_state.current_test_name = self._port.relative_test_filename(test_info.filename) + worker_state.next_timeout = time.time() + hang_timeout - def handle_done(self, src): - # FIXME: implement properly to handle multiple workers. - self._done = True - pass + def handle_done(self, source): + worker_state = self._worker_states[source] + worker_state.done = True - def handle_exception(self, src, exception_info): - raise exception_info + def handle_exception(self, source, exception_info): + exception_type, exception_value, exception_traceback = exception_info + raise exception_type, exception_value, exception_traceback - def handle_finished_list(self, src, list_name, num_tests, elapsed_time): - # FIXME: update stats - pass + def handle_finished_list(self, source, list_name, num_tests, elapsed_time): + self._group_stats[list_name] = (num_tests, elapsed_time) - def handle_finished_test(self, src, result, elapsed_time): - self._update_summary_with_result(self._current_result_summary, result) + def handle_finished_test(self, source, result, elapsed_time): + worker_state = self._worker_states[source] + worker_state.next_timeout = None + worker_state.current_test_name = None + worker_state.stats['total_time'] += elapsed_time + worker_state.stats['num_tests'] += 1 + + if worker_state.wedged: + # This shouldn't happen if we have our timeouts tuned properly. + _log.error("%s unwedged", w.name) - # FIXME: update stats. self._all_results.append(result) + self._update_summary_with_result(self._current_result_summary, result) |