summaryrefslogtreecommitdiffstats
path: root/Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
diff options
context:
space:
mode:
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py')
-rw-r--r--Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py197
1 files changed, 197 insertions, 0 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py b/Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
new file mode 100644
index 0000000..e0ca8db
--- /dev/null
+++ b/Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
@@ -0,0 +1,197 @@
+# Copyright (C) 2010 Google Inc. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Module for handling messages, threads, processes, and concurrency for run-webkit-tests.
+
+Testing is accomplished by having a manager (TestRunner) gather all of the
+tests to be run, and sending messages to a pool of workers (TestShellThreads)
+to run each test. Each worker communicates with one driver (usually
+DumpRenderTree) to run one test at a time and then compare the output against
+what we expected to get.
+
+This modules provides a message broker that connects the manager to the
+workers: it provides a messaging abstraction and message loops, and
+handles launching threads and/or processes depending on the
+requested configuration.
+"""
+
+import logging
+import sys
+import time
+import traceback
+
+import dump_render_tree_thread
+
+_log = logging.getLogger(__name__)
+
+
+def get(port, options):
+ """Return an instance of a WorkerMessageBroker."""
+ worker_model = options.worker_model
+ if worker_model == 'old-inline':
+ return InlineBroker(port, options)
+ if worker_model == 'old-threads':
+ return MultiThreadedBroker(port, options)
+ raise ValueError('unsupported value for --worker-model: %s' % worker_model)
+
+
+class _WorkerState(object):
+ def __init__(self, name):
+ self.name = name
+ self.thread = None
+
+
+class WorkerMessageBroker(object):
+ def __init__(self, port, options):
+ self._port = port
+ self._options = options
+ self._num_workers = int(self._options.child_processes)
+
+ # This maps worker names to their _WorkerState values.
+ self._workers = {}
+
+ def _threads(self):
+ return tuple([w.thread for w in self._workers.values()])
+
+ def start_workers(self, test_runner):
+ """Starts up the pool of workers for running the tests.
+
+ Args:
+ test_runner: a handle to the manager/TestRunner object
+ """
+ self._test_runner = test_runner
+ for worker_number in xrange(self._num_workers):
+ worker = _WorkerState('worker-%d' % worker_number)
+ worker.thread = self._start_worker(worker_number, worker.name)
+ self._workers[worker.name] = worker
+ return self._threads()
+
+ def _start_worker(self, worker_number, worker_name):
+ raise NotImplementedError
+
+ def run_message_loop(self):
+ """Loop processing messages until done."""
+ raise NotImplementedError
+
+ def cancel_workers(self):
+ """Cancel/interrupt any workers that are still alive."""
+ pass
+
+ def cleanup(self):
+ """Perform any necessary cleanup on shutdown."""
+ pass
+
+
+class InlineBroker(WorkerMessageBroker):
+ def _start_worker(self, worker_number, worker_name):
+ # FIXME: Replace with something that isn't a thread.
+ thread = dump_render_tree_thread.TestShellThread(self._port,
+ self._options, worker_number, worker_name,
+ self._test_runner._current_filename_queue,
+ self._test_runner._result_queue)
+ # Note: Don't start() the thread! If we did, it would actually
+ # create another thread and start executing it, and we'd no longer
+ # be single-threaded.
+ return thread
+
+ def run_message_loop(self):
+ thread = self._threads()[0]
+ thread.run_in_main_thread(self._test_runner,
+ self._test_runner._current_result_summary)
+ self._test_runner.update()
+
+
+class MultiThreadedBroker(WorkerMessageBroker):
+ def _start_worker(self, worker_number, worker_name):
+ thread = dump_render_tree_thread.TestShellThread(self._port,
+ self._options, worker_number, worker_name,
+ self._test_runner._current_filename_queue,
+ self._test_runner._result_queue)
+ thread.start()
+ return thread
+
+ def run_message_loop(self):
+ threads = self._threads()
+
+ # Loop through all the threads waiting for them to finish.
+ some_thread_is_alive = True
+ while some_thread_is_alive:
+ some_thread_is_alive = False
+ t = time.time()
+ for thread in threads:
+ exception_info = thread.exception_info()
+ if exception_info is not None:
+ # Re-raise the thread's exception here to make it
+ # clear that testing was aborted. Otherwise,
+ # the tests that did not run would be assumed
+ # to have passed.
+ raise exception_info[0], exception_info[1], exception_info[2]
+
+ if thread.isAlive():
+ some_thread_is_alive = True
+ next_timeout = thread.next_timeout()
+ if next_timeout and t > next_timeout:
+ log_wedged_worker(thread.getName(), thread.id())
+ thread.clear_next_timeout()
+
+ self._test_runner.update()
+
+ if some_thread_is_alive:
+ time.sleep(0.01)
+
+ def cancel_workers(self):
+ threads = self._threads()
+ for thread in threads:
+ thread.cancel()
+
+
+def log_wedged_worker(name, id):
+ """Log information about the given worker state."""
+ stack = _find_thread_stack(id)
+ assert(stack is not None)
+ _log.error("")
+ _log.error("%s (tid %d) is wedged" % (name, id))
+ _log_stack(stack)
+ _log.error("")
+
+
+def _find_thread_stack(id):
+ """Returns a stack object that can be used to dump a stack trace for
+ the given thread id (or None if the id is not found)."""
+ for thread_id, stack in sys._current_frames().items():
+ if thread_id == id:
+ return stack
+ return None
+
+
+def _log_stack(stack):
+ """Log a stack trace to log.error()."""
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
+ if line:
+ _log.error(' %s' % line.strip())