summaryrefslogtreecommitdiffstats
path: root/Tools/Scripts/webkitpy/layout_tests/layout_package/message_broker2.py
blob: ec3c9701b998efd2c73e729c0ac7ac20b0c2b94b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# Copyright (C) 2011 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Module for handling messaging for run-webkit-tests.

This module implements a simple message broker abstraction that will be
used to coordinate messages between the main run-webkit-tests thread
(aka TestRunner) and the individual worker threads (previously known as
dump_render_tree_threads).

The broker simply distributes messages onto topics (named queues); the actual
queues themselves are provided by the caller, as the queue's implementation
requirements varies vary depending on the desired concurrency model
(none/threads/processes).

In order for shared-nothing messaging between processing to be possible,
Messages must be picklable.

The module defines one interface and two classes. Callers of this package
must implement the BrokerClient interface, and most callers will create
BrokerConnections as well as Brokers.

The classes relate to each other as:

    BrokerClient   ------>    BrokerConnection
         ^                         |
         |                         v
         \----------------      Broker

(The BrokerClient never calls broker directly after it is created, only
BrokerConnection.  BrokerConnection passes a reference to BrokerClient to
Broker, and Broker only invokes that reference, never talking directly to
BrokerConnection).
"""

import cPickle
import logging
import Queue
import time


_log = logging.getLogger(__name__)


class BrokerClient(object):
    """Abstract base class / interface that all message broker clients must
    implement. In addition to the methods below, by convention clients
    implement routines of the signature type

        handle_MESSAGE_NAME(self, src, ...):

    where MESSAGE_NAME matches the string passed to post_message(), and
    src indicates the name of the sender. If the message contains values in
    the message body, those will be provided as optparams."""

    def __init__(self, *optargs, **kwargs):
        raise NotImplementedError

    def is_done(self):
        """Called from inside run_message_loop() to indicate whether to exit."""
        raise NotImplementedError

    def name(self):
        """Return a name that identifies the client."""
        raise NotImplementedError


class Broker(object):
    """Brokers provide the basic model of a set of topics. Clients can post a
    message to any topic using post_message(), and can process messages on one
    topic at a time using run_message_loop()."""

    def __init__(self, options, queue_maker):
        """Args:
            options: a runtime option class from optparse
            queue_maker: a factory method that returns objects implementing a
                Queue interface (put()/get()).
        """
        self._options = options
        self._queue_maker = queue_maker
        self._topics = {}

    def add_topic(self, topic_name):
        if topic_name not in self._topics:
            self._topics[topic_name] = self._queue_maker()

    def _get_queue_for_topic(self, topic_name):
        return self._topics[topic_name]

    def post_message(self, client, topic_name, message_name, *message_args):
        """Post a message to the appropriate topic name.

        Messages have a name and a tuple of optional arguments. Both must be picklable."""
        message = _Message(client.name(), topic_name, message_name, message_args)
        queue = self._get_queue_for_topic(topic_name)
        queue.put(_Message.dumps(message))

    def run_message_loop(self, topic_name, client, delay_secs=None):
        """Loop processing messages until client.is_done() or delay passes.

        To run indefinitely, set delay_secs to None."""
        assert delay_secs is None or delay_secs > 0
        self._run_loop(topic_name, client, block=True, delay_secs=delay_secs)

    def run_all_pending(self, topic_name, client):
        """Process messages until client.is_done() or caller would block."""
        self._run_loop(topic_name, client, block=False, delay_secs=None)

    def _run_loop(self, topic_name, client, block, delay_secs):
        queue = self._get_queue_for_topic(topic_name)
        while not client.is_done():
            try:
                s = queue.get(block, delay_secs)
            except Queue.Empty:
                return
            msg = _Message.loads(s)
            self._dispatch_message(msg, client)

    def _dispatch_message(self, message, client):
        if not hasattr(client, 'handle_' + message.name):
            raise ValueError(
               "%s: received message '%s' it couldn't handle" %
               (client.name(), message.name))
        optargs = message.args
        message_handler = getattr(client, 'handle_' + message.name)
        message_handler(message.src, *optargs)


class _Message(object):
    @staticmethod
    def loads(str):
        obj = cPickle.loads(str)
        assert(isinstance(obj, _Message))
        return obj

    def __init__(self, src, topic_name, message_name, message_args):
        self.src = src
        self.topic_name = topic_name
        self.name = message_name
        self.args = message_args

    def dumps(self):
        return cPickle.dumps(self)

    def __repr__(self):
        return ("_Message(from='%s', topic_name='%s', message_name='%s')" %
                (self.src, self.topic_name, self.name))


class BrokerConnection(object):
    """BrokerConnection provides a connection-oriented facade on top of a
    Broker, so that callers don't have to repeatedly pass the same topic
    names over and over."""

    def __init__(self, broker, client, run_topic, post_topic):
        """Create a BrokerConnection on top of a Broker. Note that the Broker
        is passed in rather than created so that a single Broker can be used
        by multiple BrokerConnections."""
        self._broker = broker
        self._client = client
        self._post_topic = post_topic
        self._run_topic = run_topic
        broker.add_topic(run_topic)
        broker.add_topic(post_topic)

    def run_message_loop(self, delay_secs=None):
        self._broker.run_message_loop(self._run_topic, self._client, delay_secs)

    def post_message(self, message_name, *message_args):
        self._broker.post_message(self._client, self._post_topic,
                                  message_name, *message_args)