diff options
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py')
-rw-r--r-- | Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py | 282 |
1 files changed, 282 insertions, 0 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py b/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py new file mode 100644 index 0000000..a0f252c --- /dev/null +++ b/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py @@ -0,0 +1,282 @@ +# Copyright (C) 2011 Google Inc. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Module for handling messages and concurrency for run-webkit-tests. + +This module implements a message broker that connects the manager +(TestRunner2) to the workers: it provides a messaging abstraction and +message loops (building on top of message_broker2), and handles starting +workers by launching threads and/or processes depending on the +requested configuration. + +There are a lot of classes and objects involved in a fully connected system. +They interact more or less like: + +TestRunner2 --> _InlineManager ---> _InlineWorker <-> Worker + ^ \ / ^ + | v v | + \-------------------- MessageBroker -------------/ +""" + +import logging +import optparse +import Queue +import thread +import threading +import time + + +# Handle Python < 2.6 where multiprocessing isn't available. +# +# _Multiprocessing_Process is needed so that _MultiProcessWorker +# can be defined with or without multiprocessing. +try: + import multiprocessing + _Multiprocessing_Process = multiprocessing.Process +except ImportError: + multiprocessing = None + _Multiprocessing_Process = threading.Thread + + +from webkitpy.layout_tests import port +from webkitpy.layout_tests.layout_package import message_broker2 + + +_log = logging.getLogger(__name__) + +# +# Topic names for Manager <-> Worker messaging +# +MANAGER_TOPIC = 'managers' +ANY_WORKER_TOPIC = 'workers' + + +def runtime_options(): + """Return a list of optparse.Option objects for any runtime values used + by this module.""" + options = [ + optparse.make_option("--worker-model", action="store", + help=("controls worker model. Valid values are " + "'inline', 'threads', and 'processes'.")), + ] + return options + + +def get(port, options, client, worker_class): + """Return a connection to a manager/worker message_broker + + Args: + port - handle to layout_tests/port object for port-specific stuff + options - optparse argument for command-line options + client - message_broker2.BrokerClient implementation to dispatch + replies to. + worker_class - type of workers to create. This class must implement + the methods in AbstractWorker. + Returns: + A handle to an object that will talk to a message broker configured + for the normal manager/worker communication. + """ + worker_model = options.worker_model + if worker_model == 'inline': + queue_class = Queue.Queue + manager_class = _InlineManager + elif worker_model == 'threads': + queue_class = Queue.Queue + manager_class = _ThreadedManager + elif worker_model == 'processes' and multiprocessing: + queue_class = multiprocessing.Queue + manager_class = _MultiProcessManager + else: + raise ValueError("unsupported value for --worker-model: %s" % + worker_model) + + broker = message_broker2.Broker(options, queue_class) + return manager_class(broker, port, options, client, worker_class) + + +class AbstractWorker(message_broker2.BrokerClient): + def __init__(self, broker_connection, worker_number, options): + """The constructor should be used to do any simple initialization + necessary, but should not do anything that creates data structures + that cannot be Pickled or sent across processes (like opening + files or sockets). Complex initialization should be done at the + start of the run() call. + + Args: + broker_connection - handle to the BrokerConnection object creating + the worker and that can be used for messaging. + worker_number - identifier for this particular worker + options - command-line argument object from optparse""" + + raise NotImplementedError + + def run(self, port): + """Callback for the worker to start executing. Typically does any + remaining initialization and then calls broker_connection.run_message_loop().""" + raise NotImplementedError + + def cancel(self): + """Called when possible to indicate to the worker to stop processing + messages and shut down. Note that workers may be stopped without this + method being called, so clients should not rely solely on this.""" + raise NotImplementedError + + +class _ManagerConnection(message_broker2.BrokerConnection): + def __init__(self, broker, options, client, worker_class): + """Base initialization for all Manager objects. + + Args: + broker: handle to the message_broker2 object + options: command line options object + client: callback object (the caller) + worker_class: class object to use to create workers. + """ + message_broker2.BrokerConnection.__init__(self, broker, client, + MANAGER_TOPIC, ANY_WORKER_TOPIC) + self._options = options + self._worker_class = worker_class + + def start_worker(self, worker_number): + raise NotImplementedError + + +class _InlineManager(_ManagerConnection): + def __init__(self, broker, port, options, client, worker_class): + _ManagerConnection.__init__(self, broker, options, client, worker_class) + self._port = port + self._inline_worker = None + + def start_worker(self, worker_number): + self._inline_worker = _InlineWorkerConnection(self._broker, self._port, + self._client, self._worker_class, worker_number) + return self._inline_worker + + def run_message_loop(self, delay_secs=None): + # Note that delay_secs is ignored in this case since we can't easily + # implement it. + self._inline_worker.run() + self._broker.run_all_pending(MANAGER_TOPIC, self._client) + + +class _ThreadedManager(_ManagerConnection): + def __init__(self, broker, port, options, client, worker_class): + _ManagerConnection.__init__(self, broker, options, client, worker_class) + self._port = port + + def start_worker(self, worker_number): + worker_connection = _ThreadedWorkerConnection(self._broker, self._port, + self._worker_class, worker_number) + worker_connection.start() + return worker_connection + + +class _MultiProcessManager(_ManagerConnection): + def __init__(self, broker, port, options, client, worker_class): + # Note that this class does not keep a handle to the actual port + # object, because it isn't Picklable. Instead it keeps the port + # name and recreates the port in the child process from the name + # and options. + _ManagerConnection.__init__(self, broker, options, client, worker_class) + self._platform_name = port.real_name() + + def start_worker(self, worker_number): + worker_connection = _MultiProcessWorkerConnection(self._broker, self._platform_name, + self._worker_class, worker_number, self._options) + worker_connection.start() + return worker_connection + + +class _WorkerConnection(message_broker2.BrokerConnection): + def __init__(self, broker, worker_class, worker_number, options): + self._client = worker_class(self, worker_number, options) + self.name = self._client.name() + message_broker2.BrokerConnection.__init__(self, broker, self._client, + ANY_WORKER_TOPIC, MANAGER_TOPIC) + + def yield_to_broker(self): + pass + + +class _InlineWorkerConnection(_WorkerConnection): + def __init__(self, broker, port, manager_client, worker_class, worker_number): + _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options) + self._port = port + self._manager_client = manager_client + + def run(self): + self._client.run(self._port) + + def yield_to_broker(self): + self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client) + + +class _Thread(threading.Thread): + def __init__(self, worker_connection, port, client): + threading.Thread.__init__(self) + self._worker_connection = worker_connection + self._port = port + self._client = client + + def run(self): + # FIXME: We can remove this once everyone is on 2.6. + if not hasattr(self, 'ident'): + self.ident = thread.get_ident() + self._client.run(self._port) + + +class _ThreadedWorkerConnection(_WorkerConnection): + def __init__(self, broker, port, worker_class, worker_number): + _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options) + self._thread = _Thread(self, port, self._client) + + def start(self): + self._thread.start() + + +class _Process(_Multiprocessing_Process): + def __init__(self, worker_connection, platform_name, options, client): + _Multiprocessing_Process.__init__(self) + self._worker_connection = worker_connection + self._platform_name = platform_name + self._options = options + self._client = client + + def run(self): + logging.basicConfig() + port_obj = port.get(self._platform_name, self._options) + self._client.run(port_obj) + + +class _MultiProcessWorkerConnection(_WorkerConnection): + def __init__(self, broker, platform_name, worker_class, worker_number, options): + _WorkerConnection.__init__(self, broker, worker_class, worker_number, options) + self._proc = _Process(self, platform_name, options, self._client) + + def start(self): + self._proc.start() |