diff options
Diffstat (limited to 'Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py')
-rw-r--r-- | Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py | 91 |
1 files changed, 75 insertions, 16 deletions
diff --git a/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py b/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py index a0f252c..4886c30 100644 --- a/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py +++ b/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py @@ -52,17 +52,13 @@ import time # Handle Python < 2.6 where multiprocessing isn't available. -# -# _Multiprocessing_Process is needed so that _MultiProcessWorker -# can be defined with or without multiprocessing. try: import multiprocessing - _Multiprocessing_Process = multiprocessing.Process except ImportError: multiprocessing = None - _Multiprocessing_Process = threading.Thread +from webkitpy.common.system import stack_utils from webkitpy.layout_tests import port from webkitpy.layout_tests.layout_package import message_broker2 @@ -219,6 +215,18 @@ class _WorkerConnection(message_broker2.BrokerConnection): message_broker2.BrokerConnection.__init__(self, broker, self._client, ANY_WORKER_TOPIC, MANAGER_TOPIC) + def cancel(self): + raise NotImplementedError + + def is_alive(self): + raise NotImplementedError + + def join(self, timeout): + raise NotImplementedError + + def log_wedged_worker(self, test_name): + raise NotImplementedError + def yield_to_broker(self): pass @@ -226,11 +234,26 @@ class _WorkerConnection(message_broker2.BrokerConnection): class _InlineWorkerConnection(_WorkerConnection): def __init__(self, broker, port, manager_client, worker_class, worker_number): _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options) + self._alive = False self._port = port self._manager_client = manager_client + def cancel(self): + self._client.cancel() + + def is_alive(self): + return self._alive + + def join(self, timeout): + assert not self._alive + + def log_wedged_worker(self, test_name): + assert False, "_InlineWorkerConnection.log_wedged_worker() called" + def run(self): + self._alive = True self._client.run(self._port) + self._alive = False def yield_to_broker(self): self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client) @@ -243,6 +266,12 @@ class _Thread(threading.Thread): self._port = port self._client = client + def cancel(self): + return self._client.cancel() + + def log_wedged_worker(self, test_name): + stack_utils.log_thread_state(_log.error, self._client.name(), self.ident, " is wedged on test %s" % test_name) + def run(self): # FIXME: We can remove this once everyone is on 2.6. if not hasattr(self, 'ident'): @@ -255,22 +284,40 @@ class _ThreadedWorkerConnection(_WorkerConnection): _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options) self._thread = _Thread(self, port, self._client) + def cancel(self): + return self._thread.cancel() + + def is_alive(self): + # FIXME: Change this to is_alive once everyone is on 2.6. + return self._thread.isAlive() + + def join(self, timeout): + return self._thread.join(timeout) + + def log_wedged_worker(self, test_name): + return self._thread.log_wedged_worker(test_name) + def start(self): self._thread.start() -class _Process(_Multiprocessing_Process): - def __init__(self, worker_connection, platform_name, options, client): - _Multiprocessing_Process.__init__(self) - self._worker_connection = worker_connection - self._platform_name = platform_name - self._options = options - self._client = client +if multiprocessing: - def run(self): - logging.basicConfig() - port_obj = port.get(self._platform_name, self._options) - self._client.run(port_obj) + class _Process(multiprocessing.Process): + def __init__(self, worker_connection, platform_name, options, client): + multiprocessing.Process.__init__(self) + self._worker_connection = worker_connection + self._platform_name = platform_name + self._options = options + self._client = client + + def log_wedged_worker(self, test_name): + _log.error("%s (pid %d) is wedged on test %s" % (self.name, self.pid, test_name)) + + def run(self): + logging.basicConfig() + port_obj = port.get(self._platform_name, self._options) + self._client.run(port_obj) class _MultiProcessWorkerConnection(_WorkerConnection): @@ -278,5 +325,17 @@ class _MultiProcessWorkerConnection(_WorkerConnection): _WorkerConnection.__init__(self, broker, worker_class, worker_number, options) self._proc = _Process(self, platform_name, options, self._client) + def cancel(self): + return self._proc.terminate() + + def is_alive(self): + return self._proc.is_alive() + + def join(self, timeout): + return self._proc.join(timeout) + + def log_wedged_worker(self, test_name): + return self._proc.log_wedged_worker(test_name) + def start(self): self._proc.start() |