# Copyright (C) 2011 Google Inc. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """Module for handling messaging for run-webkit-tests. This module implements a simple message broker abstraction that will be used to coordinate messages between the main run-webkit-tests thread (aka TestRunner) and the individual worker threads (previously known as dump_render_tree_threads). The broker simply distributes messages onto topics (named queues); the actual queues themselves are provided by the caller, as the queue's implementation requirements varies vary depending on the desired concurrency model (none/threads/processes). In order for shared-nothing messaging between processing to be possible, Messages must be picklable. The module defines one interface and two classes. Callers of this package must implement the BrokerClient interface, and most callers will create BrokerConnections as well as Brokers. The classes relate to each other as: BrokerClient ------> BrokerConnection ^ | | v \---------------- Broker (The BrokerClient never calls broker directly after it is created, only BrokerConnection. BrokerConnection passes a reference to BrokerClient to Broker, and Broker only invokes that reference, never talking directly to BrokerConnection). """ import cPickle import logging import Queue import time _log = logging.getLogger(__name__) class BrokerClient(object): """Abstract base class / interface that all message broker clients must implement. In addition to the methods below, by convention clients implement routines of the signature type handle_MESSAGE_NAME(self, src, ...): where MESSAGE_NAME matches the string passed to post_message(), and src indicates the name of the sender. If the message contains values in the message body, those will be provided as optparams.""" def __init__(self, *optargs, **kwargs): raise NotImplementedError def is_done(self): """Called from inside run_message_loop() to indicate whether to exit.""" raise NotImplementedError def name(self): """Return a name that identifies the client.""" raise NotImplementedError class Broker(object): """Brokers provide the basic model of a set of topics. Clients can post a message to any topic using post_message(), and can process messages on one topic at a time using run_message_loop().""" def __init__(self, options, queue_maker): """Args: options: a runtime option class from optparse queue_maker: a factory method that returns objects implementing a Queue interface (put()/get()). """ self._options = options self._queue_maker = queue_maker self._topics = {} def add_topic(self, topic_name): if topic_name not in self._topics: self._topics[topic_name] = self._queue_maker() def _get_queue_for_topic(self, topic_name): return self._topics[topic_name] def post_message(self, client, topic_name, message_name, *message_args): """Post a message to the appropriate topic name. Messages have a name and a tuple of optional arguments. Both must be picklable.""" message = _Message(client.name(), topic_name, message_name, message_args) queue = self._get_queue_for_topic(topic_name) queue.put(_Message.dumps(message)) def run_message_loop(self, topic_name, client, delay_secs=None): """Loop processing messages until client.is_done() or delay passes. To run indefinitely, set delay_secs to None.""" assert delay_secs is None or delay_secs > 0 self._run_loop(topic_name, client, block=True, delay_secs=delay_secs) def run_all_pending(self, topic_name, client): """Process messages until client.is_done() or caller would block.""" self._run_loop(topic_name, client, block=False, delay_secs=None) def _run_loop(self, topic_name, client, block, delay_secs): queue = self._get_queue_for_topic(topic_name) while not client.is_done(): try: s = queue.get(block, delay_secs) except Queue.Empty: return msg = _Message.loads(s) self._dispatch_message(msg, client) def _dispatch_message(self, message, client): if not hasattr(client, 'handle_' + message.name): raise ValueError( "%s: received message '%s' it couldn't handle" % (client.name(), message.name)) optargs = message.args message_handler = getattr(client, 'handle_' + message.name) message_handler(message.src, *optargs) class _Message(object): @staticmethod def loads(str): obj = cPickle.loads(str) assert(isinstance(obj, _Message)) return obj def __init__(self, src, topic_name, message_name, message_args): self.src = src self.topic_name = topic_name self.name = message_name self.args = message_args def dumps(self): return cPickle.dumps(self) def __repr__(self): return ("_Message(from='%s', topic_name='%s', message_name='%s')" % (self.src, self.topic_name, self.name)) class BrokerConnection(object): """BrokerConnection provides a connection-oriented facade on top of a Broker, so that callers don't have to repeatedly pass the same topic names over and over.""" def __init__(self, broker, client, run_topic, post_topic): """Create a BrokerConnection on top of a Broker. Note that the Broker is passed in rather than created so that a single Broker can be used by multiple BrokerConnections.""" self._broker = broker self._client = client self._post_topic = post_topic self._run_topic = run_topic broker.add_topic(run_topic) broker.add_topic(post_topic) def run_message_loop(self, delay_secs=None): self._broker.run_message_loop(self._run_topic, self._client, delay_secs) def post_message(self, message_name, *message_args): self._broker.post_message(self._client, self._post_topic, message_name, *message_args)