summaryrefslogtreecommitdiffstats
path: root/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py
diff options
context:
space:
mode:
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py')
-rw-r--r--Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py91
1 files changed, 75 insertions, 16 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py b/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py
index a0f252c..4886c30 100644
--- a/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py
+++ b/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py
@@ -52,17 +52,13 @@ import time
# Handle Python < 2.6 where multiprocessing isn't available.
-#
-# _Multiprocessing_Process is needed so that _MultiProcessWorker
-# can be defined with or without multiprocessing.
try:
import multiprocessing
- _Multiprocessing_Process = multiprocessing.Process
except ImportError:
multiprocessing = None
- _Multiprocessing_Process = threading.Thread
+from webkitpy.common.system import stack_utils
from webkitpy.layout_tests import port
from webkitpy.layout_tests.layout_package import message_broker2
@@ -219,6 +215,18 @@ class _WorkerConnection(message_broker2.BrokerConnection):
message_broker2.BrokerConnection.__init__(self, broker, self._client,
ANY_WORKER_TOPIC, MANAGER_TOPIC)
+ def cancel(self):
+ raise NotImplementedError
+
+ def is_alive(self):
+ raise NotImplementedError
+
+ def join(self, timeout):
+ raise NotImplementedError
+
+ def log_wedged_worker(self, test_name):
+ raise NotImplementedError
+
def yield_to_broker(self):
pass
@@ -226,11 +234,26 @@ class _WorkerConnection(message_broker2.BrokerConnection):
class _InlineWorkerConnection(_WorkerConnection):
def __init__(self, broker, port, manager_client, worker_class, worker_number):
_WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
+ self._alive = False
self._port = port
self._manager_client = manager_client
+ def cancel(self):
+ self._client.cancel()
+
+ def is_alive(self):
+ return self._alive
+
+ def join(self, timeout):
+ assert not self._alive
+
+ def log_wedged_worker(self, test_name):
+ assert False, "_InlineWorkerConnection.log_wedged_worker() called"
+
def run(self):
+ self._alive = True
self._client.run(self._port)
+ self._alive = False
def yield_to_broker(self):
self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
@@ -243,6 +266,12 @@ class _Thread(threading.Thread):
self._port = port
self._client = client
+ def cancel(self):
+ return self._client.cancel()
+
+ def log_wedged_worker(self, test_name):
+ stack_utils.log_thread_state(_log.error, self._client.name(), self.ident, " is wedged on test %s" % test_name)
+
def run(self):
# FIXME: We can remove this once everyone is on 2.6.
if not hasattr(self, 'ident'):
@@ -255,22 +284,40 @@ class _ThreadedWorkerConnection(_WorkerConnection):
_WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
self._thread = _Thread(self, port, self._client)
+ def cancel(self):
+ return self._thread.cancel()
+
+ def is_alive(self):
+ # FIXME: Change this to is_alive once everyone is on 2.6.
+ return self._thread.isAlive()
+
+ def join(self, timeout):
+ return self._thread.join(timeout)
+
+ def log_wedged_worker(self, test_name):
+ return self._thread.log_wedged_worker(test_name)
+
def start(self):
self._thread.start()
-class _Process(_Multiprocessing_Process):
- def __init__(self, worker_connection, platform_name, options, client):
- _Multiprocessing_Process.__init__(self)
- self._worker_connection = worker_connection
- self._platform_name = platform_name
- self._options = options
- self._client = client
+if multiprocessing:
- def run(self):
- logging.basicConfig()
- port_obj = port.get(self._platform_name, self._options)
- self._client.run(port_obj)
+ class _Process(multiprocessing.Process):
+ def __init__(self, worker_connection, platform_name, options, client):
+ multiprocessing.Process.__init__(self)
+ self._worker_connection = worker_connection
+ self._platform_name = platform_name
+ self._options = options
+ self._client = client
+
+ def log_wedged_worker(self, test_name):
+ _log.error("%s (pid %d) is wedged on test %s" % (self.name, self.pid, test_name))
+
+ def run(self):
+ logging.basicConfig()
+ port_obj = port.get(self._platform_name, self._options)
+ self._client.run(port_obj)
class _MultiProcessWorkerConnection(_WorkerConnection):
@@ -278,5 +325,17 @@ class _MultiProcessWorkerConnection(_WorkerConnection):
_WorkerConnection.__init__(self, broker, worker_class, worker_number, options)
self._proc = _Process(self, platform_name, options, self._client)
+ def cancel(self):
+ return self._proc.terminate()
+
+ def is_alive(self):
+ return self._proc.is_alive()
+
+ def join(self, timeout):
+ return self._proc.join(timeout)
+
+ def log_wedged_worker(self, test_name):
+ return self._proc.log_wedged_worker(test_name)
+
def start(self):
self._proc.start()