summaryrefslogtreecommitdiffstats
path: root/Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
blob: 481c6170494856add5556eab5572b1e712b500bc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# Copyright (C) 2010 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Module for handling messages, threads, processes, and concurrency for run-webkit-tests.

Testing is accomplished by having a manager (TestRunner) gather all of the
tests to be run, and sending messages to a pool of workers (TestShellThreads)
to run each test. Each worker communicates with one driver (usually
DumpRenderTree) to run one test at a time and then compare the output against
what we expected to get.

This modules provides a message broker that connects the manager to the
workers: it provides a messaging abstraction and message loops, and
handles launching threads and/or processes depending on the
requested configuration.
"""

import logging
import sys
import time
import traceback

import dump_render_tree_thread

_log = logging.getLogger(__name__)


def get(port, options):
    """Return an instance of a WorkerMessageBroker."""
    worker_model = options.worker_model
    if worker_model == 'old-inline':
        return InlineBroker(port, options)
    if worker_model == 'old-threads':
        return MultiThreadedBroker(port, options)
    raise ValueError('unsupported value for --worker-model: %s' % worker_model)


class _WorkerState(object):
    def __init__(self, name):
        self.name = name
        self.thread = None


class WorkerMessageBroker(object):
    def __init__(self, port, options):
        self._port = port
        self._options = options
        self._num_workers = int(self._options.child_processes)

        # This maps worker names to their _WorkerState values.
        self._workers = {}

    def _threads(self):
        return tuple([w.thread for w in self._workers.values()])

    def start_workers(self, test_runner):
        """Starts up the pool of workers for running the tests.

        Args:
            test_runner: a handle to the manager/TestRunner object
        """
        self._test_runner = test_runner
        for worker_number in xrange(self._num_workers):
            worker = _WorkerState('worker-%d' % worker_number)
            worker.thread = self._start_worker(worker_number, worker.name)
            self._workers[worker.name] = worker
        return self._threads()

    def _start_worker(self, worker_number, worker_name):
        raise NotImplementedError

    def run_message_loop(self):
        """Loop processing messages until done."""
        raise NotImplementedError

    def cancel_workers(self):
        """Cancel/interrupt any workers that are still alive."""
        pass

    def cleanup(self):
        """Perform any necessary cleanup on shutdown."""
        pass


class InlineBroker(WorkerMessageBroker):
    def _start_worker(self, worker_number, worker_name):
        # FIXME: Replace with something that isn't a thread.
        thread = dump_render_tree_thread.TestShellThread(self._port,
            self._options, worker_number, worker_name,
            self._test_runner._current_filename_queue,
            self._test_runner._result_queue)
        # Note: Don't start() the thread! If we did, it would actually
        # create another thread and start executing it, and we'd no longer
        # be single-threaded.
        return thread

    def run_message_loop(self):
        thread = self._threads()[0]
        thread.run_in_main_thread(self._test_runner,
                                  self._test_runner._current_result_summary)
        self._test_runner.update()


class MultiThreadedBroker(WorkerMessageBroker):
    def _start_worker(self, worker_number, worker_name):
        thread = dump_render_tree_thread.TestShellThread(self._port,
            self._options, worker_number, worker_name,
            self._test_runner._current_filename_queue,
            self._test_runner._result_queue)
        thread.start()
        return thread

    def run_message_loop(self):
        threads = self._threads()

        # Loop through all the threads waiting for them to finish.
        some_thread_is_alive = True
        while some_thread_is_alive:
            some_thread_is_alive = False
            t = time.time()
            for thread in threads:
                if thread.isAlive():
                    some_thread_is_alive = True
                    next_timeout = thread.next_timeout()
                    if next_timeout and t > next_timeout:
                        log_wedged_worker(thread.getName(), thread.id())
                        thread.clear_next_timeout()

                exception_info = thread.exception_info()
                if exception_info is not None:
                    # Re-raise the thread's exception here to make it
                    # clear that testing was aborted. Otherwise,
                    # the tests that did not run would be assumed
                    # to have passed.
                    raise exception_info[0], exception_info[1], exception_info[2]

            self._test_runner.update()

            if some_thread_is_alive:
                time.sleep(0.01)

    def cancel_workers(self):
        threads = self._threads()
        for thread in threads:
            thread.cancel()


def log_wedged_worker(name, id):
    """Log information about the given worker state."""
    stack = _find_thread_stack(id)
    assert(stack is not None)
    _log.error("")
    _log.error("%s (tid %d) is wedged" % (name, id))
    _log_stack(stack)
    _log.error("")


def _find_thread_stack(id):
    """Returns a stack object that can be used to dump a stack trace for
    the given thread id (or None if the id is not found)."""
    for thread_id, stack in sys._current_frames().items():
        if thread_id == id:
            return stack
    return None


def _log_stack(stack):
    """Log a stack trace to log.error()."""
    for filename, lineno, name, line in traceback.extract_stack(stack):
        _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
        if line:
            _log.error('  %s' % line.strip())