diff options
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker2.py')
-rw-r--r-- | Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker2.py | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker2.py b/Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker2.py new file mode 100644 index 0000000..ec3c970 --- /dev/null +++ b/Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker2.py @@ -0,0 +1,196 @@ +# Copyright (C) 2011 Google Inc. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Module for handling messaging for run-webkit-tests. + +This module implements a simple message broker abstraction that will be +used to coordinate messages between the main run-webkit-tests thread +(aka TestRunner) and the individual worker threads (previously known as +dump_render_tree_threads). + +The broker simply distributes messages onto topics (named queues); the actual +queues themselves are provided by the caller, as the queue's implementation +requirements varies vary depending on the desired concurrency model +(none/threads/processes). + +In order for shared-nothing messaging between processing to be possible, +Messages must be picklable. + +The module defines one interface and two classes. Callers of this package +must implement the BrokerClient interface, and most callers will create +BrokerConnections as well as Brokers. + +The classes relate to each other as: + + BrokerClient ------> BrokerConnection + ^ | + | v + \---------------- Broker + +(The BrokerClient never calls broker directly after it is created, only +BrokerConnection. BrokerConnection passes a reference to BrokerClient to +Broker, and Broker only invokes that reference, never talking directly to +BrokerConnection). +""" + +import cPickle +import logging +import Queue +import time + + +_log = logging.getLogger(__name__) + + +class BrokerClient(object): + """Abstract base class / interface that all message broker clients must + implement. In addition to the methods below, by convention clients + implement routines of the signature type + + handle_MESSAGE_NAME(self, src, ...): + + where MESSAGE_NAME matches the string passed to post_message(), and + src indicates the name of the sender. If the message contains values in + the message body, those will be provided as optparams.""" + + def __init__(self, *optargs, **kwargs): + raise NotImplementedError + + def is_done(self): + """Called from inside run_message_loop() to indicate whether to exit.""" + raise NotImplementedError + + def name(self): + """Return a name that identifies the client.""" + raise NotImplementedError + + +class Broker(object): + """Brokers provide the basic model of a set of topics. Clients can post a + message to any topic using post_message(), and can process messages on one + topic at a time using run_message_loop().""" + + def __init__(self, options, queue_maker): + """Args: + options: a runtime option class from optparse + queue_maker: a factory method that returns objects implementing a + Queue interface (put()/get()). + """ + self._options = options + self._queue_maker = queue_maker + self._topics = {} + + def add_topic(self, topic_name): + if topic_name not in self._topics: + self._topics[topic_name] = self._queue_maker() + + def _get_queue_for_topic(self, topic_name): + return self._topics[topic_name] + + def post_message(self, client, topic_name, message_name, *message_args): + """Post a message to the appropriate topic name. + + Messages have a name and a tuple of optional arguments. Both must be picklable.""" + message = _Message(client.name(), topic_name, message_name, message_args) + queue = self._get_queue_for_topic(topic_name) + queue.put(_Message.dumps(message)) + + def run_message_loop(self, topic_name, client, delay_secs=None): + """Loop processing messages until client.is_done() or delay passes. + + To run indefinitely, set delay_secs to None.""" + assert delay_secs is None or delay_secs > 0 + self._run_loop(topic_name, client, block=True, delay_secs=delay_secs) + + def run_all_pending(self, topic_name, client): + """Process messages until client.is_done() or caller would block.""" + self._run_loop(topic_name, client, block=False, delay_secs=None) + + def _run_loop(self, topic_name, client, block, delay_secs): + queue = self._get_queue_for_topic(topic_name) + while not client.is_done(): + try: + s = queue.get(block, delay_secs) + except Queue.Empty: + return + msg = _Message.loads(s) + self._dispatch_message(msg, client) + + def _dispatch_message(self, message, client): + if not hasattr(client, 'handle_' + message.name): + raise ValueError( + "%s: received message '%s' it couldn't handle" % + (client.name(), message.name)) + optargs = message.args + message_handler = getattr(client, 'handle_' + message.name) + message_handler(message.src, *optargs) + + +class _Message(object): + @staticmethod + def loads(str): + obj = cPickle.loads(str) + assert(isinstance(obj, _Message)) + return obj + + def __init__(self, src, topic_name, message_name, message_args): + self.src = src + self.topic_name = topic_name + self.name = message_name + self.args = message_args + + def dumps(self): + return cPickle.dumps(self) + + def __repr__(self): + return ("_Message(from='%s', topic_name='%s', message_name='%s')" % + (self.src, self.topic_name, self.name)) + + +class BrokerConnection(object): + """BrokerConnection provides a connection-oriented facade on top of a + Broker, so that callers don't have to repeatedly pass the same topic + names over and over.""" + + def __init__(self, broker, client, run_topic, post_topic): + """Create a BrokerConnection on top of a Broker. Note that the Broker + is passed in rather than created so that a single Broker can be used + by multiple BrokerConnections.""" + self._broker = broker + self._client = client + self._post_topic = post_topic + self._run_topic = run_topic + broker.add_topic(run_topic) + broker.add_topic(post_topic) + + def run_message_loop(self, delay_secs=None): + self._broker.run_message_loop(self._run_topic, self._client, delay_secs) + + def post_message(self, message_name, *message_args): + self._broker.post_message(self._client, self._post_topic, + message_name, *message_args) |