summaryrefslogtreecommitdiffstats
path: root/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py
diff options
context:
space:
mode:
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py')
-rw-r--r--Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py175
1 files changed, 138 insertions, 37 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py b/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py
index f097b83..0522d39 100644
--- a/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py
+++ b/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py
@@ -34,25 +34,65 @@ workers and receive their completion messages accordingly.
"""
import logging
+import time
+from webkitpy.tool import grammar
from webkitpy.layout_tests.layout_package import manager_worker_broker
from webkitpy.layout_tests.layout_package import test_runner
from webkitpy.layout_tests.layout_package import worker
+
_log = logging.getLogger(__name__)
+class _WorkerState(object):
+ """A class for the TestRunner/manager to use to track the current state
+ of the workers."""
+ def __init__(self, number, worker_connection):
+ self.worker_connection = worker_connection
+ self.number = number
+ self.done = False
+ self.current_test_name = None
+ self.next_timeout = None
+ self.wedged = False
+ self.stats = {}
+ self.stats['name'] = worker_connection.name
+ self.stats['num_tests'] = 0
+ self.stats['total_time'] = 0
+
+ def __repr__(self):
+ return "_WorkerState(" + str(self.__dict__) + ")"
+
+
class TestRunner2(test_runner.TestRunner):
def __init__(self, port, options, printer):
test_runner.TestRunner.__init__(self, port, options, printer)
self._all_results = []
self._group_stats = {}
self._current_result_summary = None
- self._done = False
+
+ # This maps worker names to the state we are tracking for each of them.
+ self._worker_states = {}
def is_done(self):
- return self._done
+ worker_states = self._worker_states.values()
+ return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states)
+
+ def _worker_is_done(self, worker_state):
+ t = time.time()
+ if worker_state.done or worker_state.wedged:
+ return True
+
+ next_timeout = worker_state.next_timeout
+ WEDGE_PADDING = 40.0
+ if next_timeout and t > next_timeout + WEDGE_PADDING:
+ _log.error('')
+ worker_state.worker_connection.log_wedged_worker(worker_state.current_test_name)
+ _log.error('')
+ worker_state.wedged = True
+ return True
+ return False
def name(self):
return 'TestRunner2'
@@ -60,8 +100,9 @@ class TestRunner2(test_runner.TestRunner):
def _run_tests(self, file_list, result_summary):
"""Runs the tests in the file_list.
- Return: A tuple (keyboard_interrupted, thread_timings, test_timings,
- individual_test_timings)
+ Return: A tuple (interrupted, keyboard_interrupted, thread_timings,
+ test_timings, individual_test_timings)
+ interrupted is whether the run was interrupted
keyboard_interrupted is whether someone typed Ctrl^C
thread_timings is a list of dicts with the total runtime
of each thread with 'name', 'num_tests', 'total_time' properties
@@ -72,58 +113,118 @@ class TestRunner2(test_runner.TestRunner):
result_summary: summary object to populate with the results
"""
self._current_result_summary = result_summary
+ self._all_results = []
+ self._group_stats = {}
+ self._worker_states = {}
+
+ num_workers = self._num_workers()
+ keyboard_interrupted = False
+ interrupted = False
+ thread_timings = []
+
+ self._printer.print_update('Sharding tests ...')
+ test_lists = self._shard_tests(file_list,
+ num_workers > 1 and not self._options.experimental_fully_parallel)
+ _log.debug("Using %d shards" % len(test_lists))
- # FIXME: shard properly.
+ manager_connection = manager_worker_broker.get(self._port, self._options,
+ self, worker.Worker)
- # FIXME: should shard_tests return a list of objects rather than tuples?
- test_lists = self._shard_tests(file_list, False)
+ if self._options.dry_run:
+ return (keyboard_interrupted, interrupted, thread_timings,
+ self._group_stats, self._all_results)
- manager_connection = manager_worker_broker.get(self._port, self._options, self, worker.Worker)
+ self._printer.print_update('Starting %s ...' %
+ grammar.pluralize('worker', num_workers))
+ for worker_number in xrange(num_workers):
+ worker_connection = manager_connection.start_worker(worker_number)
+ worker_state = _WorkerState(worker_number, worker_connection)
+ self._worker_states[worker_connection.name] = worker_state
- # FIXME: start all of the workers.
- manager_connection.start_worker(0)
+ # FIXME: If we start workers up too quickly, DumpRenderTree appears
+ # to thrash on something and time out its first few tests. Until
+ # we can figure out what's going on, sleep a bit in between
+ # workers.
+ time.sleep(0.1)
+ self._printer.print_update("Starting testing ...")
for test_list in test_lists:
manager_connection.post_message('test_list', test_list[0], test_list[1])
- manager_connection.post_message('stop')
+ # We post one 'stop' message for each worker. Because the stop message
+ # are sent after all of the tests, and because each worker will stop
+ # reading messsages after receiving a stop, we can be sure each
+ # worker will get a stop message and hence they will all shut down.
+ for i in xrange(num_workers):
+ manager_connection.post_message('stop')
- keyboard_interrupted = False
- interrupted = False
- if not self._options.dry_run:
- while not self._check_if_done():
+ try:
+ while not self.is_done():
+ # We loop with a timeout in order to be able to detect wedged threads.
manager_connection.run_message_loop(delay_secs=1.0)
- # FIXME: implement stats.
- thread_timings = []
+ if any(worker_state.wedged for worker_state in self._worker_states.values()):
+ _log.error('')
+ _log.error('Remaining workers are wedged, bailing out.')
+ _log.error('')
+ else:
+ _log.debug('No wedged threads')
+
+ # Make sure all of the workers have shut down (if possible).
+ for worker_state in self._worker_states.values():
+ if not worker_state.wedged and worker_state.worker_connection.is_alive():
+ worker_state.worker_connection.join(0.5)
+ assert not worker_state.worker_connection.is_alive()
+
+ except KeyboardInterrupt:
+ _log.info("Interrupted, exiting")
+ self.cancel_workers()
+ keyboard_interrupted = True
+ except test_runner.TestRunInterruptedException, e:
+ _log.info(e.reason)
+ self.cancel_workers()
+ interrupted = True
+ except:
+ # Unexpected exception; don't try to clean up workers.
+ _log.info("Exception raised, exiting")
+ raise
+
+ thread_timings = [worker_state.stats for worker_state in self._worker_states.values()]
# FIXME: should this be a class instead of a tuple?
- return (keyboard_interrupted, interrupted, thread_timings,
+ return (interrupted, keyboard_interrupted, thread_timings,
self._group_stats, self._all_results)
- def _check_if_done(self):
- """Returns true iff all the workers have either completed or wedged."""
- # FIXME: implement to check for wedged workers.
- return self._done
+ def cancel_workers(self):
+ for worker_state in self._worker_states.values():
+ worker_state.worker_connection.cancel()
- def handle_started_test(self, src, test_info, hang_timeout):
- # FIXME: implement
- pass
+ def handle_started_test(self, source, test_info, hang_timeout):
+ worker_state = self._worker_states[source]
+ worker_state.current_test_name = self._port.relative_test_filename(test_info.filename)
+ worker_state.next_timeout = time.time() + hang_timeout
- def handle_done(self, src):
- # FIXME: implement properly to handle multiple workers.
- self._done = True
- pass
+ def handle_done(self, source):
+ worker_state = self._worker_states[source]
+ worker_state.done = True
- def handle_exception(self, src, exception_info):
- raise exception_info
+ def handle_exception(self, source, exception_info):
+ exception_type, exception_value, exception_traceback = exception_info
+ raise exception_type, exception_value, exception_traceback
- def handle_finished_list(self, src, list_name, num_tests, elapsed_time):
- # FIXME: update stats
- pass
+ def handle_finished_list(self, source, list_name, num_tests, elapsed_time):
+ self._group_stats[list_name] = (num_tests, elapsed_time)
- def handle_finished_test(self, src, result, elapsed_time):
- self._update_summary_with_result(self._current_result_summary, result)
+ def handle_finished_test(self, source, result, elapsed_time):
+ worker_state = self._worker_states[source]
+ worker_state.next_timeout = None
+ worker_state.current_test_name = None
+ worker_state.stats['total_time'] += elapsed_time
+ worker_state.stats['num_tests'] += 1
+
+ if worker_state.wedged:
+ # This shouldn't happen if we have our timeouts tuned properly.
+ _log.error("%s unwedged", w.name)
- # FIXME: update stats.
self._all_results.append(result)
+ self._update_summary_with_result(self._current_result_summary, result)