aboutsummaryrefslogtreecommitdiffstats
path: root/utils/lit/lit/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'utils/lit/lit/main.py')
-rwxr-xr-xutils/lit/lit/main.py292
1 files changed, 117 insertions, 175 deletions
diff --git a/utils/lit/lit/main.py b/utils/lit/lit/main.py
index acf6101..6f672a0 100755
--- a/utils/lit/lit/main.py
+++ b/utils/lit/lit/main.py
@@ -7,37 +7,23 @@ See lit.pod for more information.
"""
from __future__ import absolute_import
-import math, os, platform, random, re, sys, time, threading, traceback
+import math, os, platform, random, re, sys, time
import lit.ProgressBar
import lit.LitConfig
import lit.Test
-import lit.Util
-
+import lit.run
+import lit.util
import lit.discovery
-class TestingProgressDisplay:
+class TestingProgressDisplay(object):
def __init__(self, opts, numTests, progressBar=None):
self.opts = opts
self.numTests = numTests
self.current = None
- self.lock = threading.Lock()
self.progressBar = progressBar
self.completed = 0
- def update(self, test):
- # Avoid locking overhead in quiet mode
- if self.opts.quiet and not test.result.isFailure:
- self.completed += 1
- return
-
- # Output lock.
- self.lock.acquire()
- try:
- self.handleUpdate(test)
- finally:
- self.lock.release()
-
def finish(self):
if self.progressBar:
self.progressBar.clear()
@@ -46,123 +32,86 @@ class TestingProgressDisplay:
elif self.opts.succinct:
sys.stdout.write('\n')
- def handleUpdate(self, test):
+ def update(self, test):
self.completed += 1
if self.progressBar:
self.progressBar.update(float(self.completed)/self.numTests,
test.getFullName())
- if self.opts.succinct and not test.result.isFailure:
+ if not test.result.code.isFailure and \
+ (self.opts.quiet or self.opts.succinct):
return
if self.progressBar:
self.progressBar.clear()
- print('%s: %s (%d of %d)' % (test.result.name, test.getFullName(),
+ # Show the test result line.
+ test_name = test.getFullName()
+ print('%s: %s (%d of %d)' % (test.result.code.name, test_name,
self.completed, self.numTests))
- if test.result.isFailure and self.opts.showOutput:
+ # Show the test failure output, if requested.
+ if test.result.code.isFailure and self.opts.showOutput:
print("%s TEST '%s' FAILED %s" % ('*'*20, test.getFullName(),
'*'*20))
- print(test.output)
+ print(test.result.output)
print("*" * 20)
+ # Report test metrics, if present.
+ if test.result.metrics:
+ print("%s TEST '%s' RESULTS %s" % ('*'*10, test.getFullName(),
+ '*'*10))
+ items = sorted(test.result.metrics.items())
+ for metric_name, value in items:
+ print('%s: %s ' % (metric_name, value.format()))
+ print("*" * 10)
+
+ # Ensure the output is flushed.
sys.stdout.flush()
-class TestProvider:
- def __init__(self, tests, maxTime):
- self.maxTime = maxTime
- self.iter = iter(tests)
- self.lock = threading.Lock()
- self.startTime = time.time()
- self.canceled = False
-
- def cancel(self):
- self.lock.acquire()
- self.canceled = True
- self.lock.release()
-
- def get(self):
- # Check if we have run out of time.
- if self.maxTime is not None:
- if time.time() - self.startTime > self.maxTime:
- return None
-
- # Otherwise take the next test.
- self.lock.acquire()
- if self.canceled:
- self.lock.release()
- return None
- for item in self.iter:
- break
- else:
- item = None
- self.lock.release()
- return item
-
-class Tester(threading.Thread):
- def __init__(self, litConfig, provider, display):
- threading.Thread.__init__(self)
- self.litConfig = litConfig
- self.provider = provider
- self.display = display
-
- def run(self):
- while 1:
- item = self.provider.get()
- if item is None:
- break
- self.runTest(item)
-
- def runTest(self, test):
- result = None
- startTime = time.time()
- try:
- result, output = test.config.test_format.execute(test,
- self.litConfig)
- except KeyboardInterrupt:
- # This is a sad hack. Unfortunately subprocess goes
- # bonkers with ctrl-c and we start forking merrily.
- print('\nCtrl-C detected, goodbye.')
- os.kill(0,9)
- except:
- if self.litConfig.debug:
- raise
- result = lit.Test.UNRESOLVED
- output = 'Exception during script execution:\n'
- output += traceback.format_exc()
- output += '\n'
- elapsed = time.time() - startTime
-
- test.setResult(result, output, elapsed)
- self.display.update(test)
-
-def runTests(numThreads, litConfig, provider, display):
- # If only using one testing thread, don't use threads at all; this lets us
- # profile, among other things.
- if numThreads == 1:
- t = Tester(litConfig, provider, display)
- t.run()
- return
-
- # Otherwise spin up the testing threads and wait for them to finish.
- testers = [Tester(litConfig, provider, display)
- for i in range(numThreads)]
- for t in testers:
- t.start()
+def write_test_results(run, lit_config, testing_time, output_path):
try:
- for t in testers:
- t.join()
- except KeyboardInterrupt:
- sys.exit(2)
+ import json
+ except ImportError:
+ lit_config.fatal('test output unsupported with Python 2.5')
+
+ # Construct the data we will write.
+ data = {}
+ # Encode the current lit version as a schema version.
+ data['__version__'] = lit.__versioninfo__
+ data['elapsed'] = testing_time
+ # FIXME: Record some information on the lit configuration used?
+ # FIXME: Record information from the individual test suites?
+
+ # Encode the tests.
+ data['tests'] = tests_data = []
+ for test in run.tests:
+ test_data = {
+ 'name' : test.getFullName(),
+ 'code' : test.result.code.name,
+ 'output' : test.result.output,
+ 'elapsed' : test.result.elapsed }
+
+ # Add test metrics, if present.
+ if test.result.metrics:
+ test_data['metrics'] = metrics_data = {}
+ for key, value in test.result.metrics.items():
+ metrics_data[key] = value.todata()
+
+ tests_data.append(test_data)
+
+ # Write the output.
+ f = open(output_path, 'w')
+ try:
+ json.dump(data, f, indent=2, sort_keys=True)
+ f.write('\n')
+ finally:
+ f.close()
def main(builtinParameters = {}):
- # Bump the GIL check interval, its more important to get any one thread to a
- # blocking operation (hopefully exec) than to try and unblock other threads.
- #
- # FIXME: This is a hack.
- import sys
- sys.setcheckinterval(1000)
+ # Use processes by default on Unix platforms.
+ isWindows = platform.system() == 'Windows'
+ useProcessesIsDefault = not isWindows
global options
from optparse import OptionParser, OptionGroup
@@ -191,6 +140,9 @@ def main(builtinParameters = {}):
group.add_option("-v", "--verbose", dest="showOutput",
help="Show all test output",
action="store_true", default=False)
+ group.add_option("-o", "--output", dest="output_path",
+ help="Write test results to the provided path",
+ action="store", type=str, metavar="PATH")
group.add_option("", "--no-progress-bar", dest="useProgressBar",
help="Do not use curses based progress bar",
action="store_false", default=True)
@@ -243,9 +195,12 @@ def main(builtinParameters = {}):
group.add_option("", "--show-tests", dest="showTests",
help="Show all discovered tests",
action="store_true", default=False)
- group.add_option("", "--repeat", dest="repeatTests", metavar="N",
- help="Repeat tests N times (for timing)",
- action="store", default=None, type=int)
+ group.add_option("", "--use-processes", dest="useProcesses",
+ help="Run tests in parallel with processes (not threads)",
+ action="store_true", default=useProcessesIsDefault)
+ group.add_option("", "--use-threads", dest="useProcesses",
+ help="Run tests in parallel with threads (not processes)",
+ action="store_false", default=useProcessesIsDefault)
parser.add_option_group(group)
(opts, args) = parser.parse_args()
@@ -259,7 +214,7 @@ def main(builtinParameters = {}):
# I haven't seen this bug occur with 2.5.2 and later, so only enable multiple
# threads by default there.
if sys.hexversion >= 0x2050200:
- opts.numThreads = lit.Util.detectCPUs()
+ opts.numThreads = lit.util.detectCPUs()
else:
opts.numThreads = 1
@@ -284,20 +239,22 @@ def main(builtinParameters = {}):
valgrindArgs = opts.valgrindArgs,
noExecute = opts.noExecute,
debug = opts.debug,
- isWindows = (platform.system()=='Windows'),
+ isWindows = isWindows,
params = userParams,
config_prefix = opts.configPrefix)
- tests = lit.discovery.find_tests_for_inputs(litConfig, inputs)
+ # Perform test discovery.
+ run = lit.run.Run(litConfig,
+ lit.discovery.find_tests_for_inputs(litConfig, inputs))
if opts.showSuites or opts.showTests:
# Aggregate the tests by suite.
suitesAndTests = {}
- for t in tests:
+ for t in run.tests:
if t.suite not in suitesAndTests:
suitesAndTests[t.suite] = []
suitesAndTests[t.suite].append(t)
- suitesAndTests = suitesAndTests.items()
+ suitesAndTests = list(suitesAndTests.items())
suitesAndTests.sort(key = lambda item: item[0].name)
# Show the suites, if requested.
@@ -315,9 +272,12 @@ def main(builtinParameters = {}):
ts_tests.sort(key = lambda test: test.path_in_suite)
for test in ts_tests:
print(' %s' % (test.getFullName(),))
-
+
+ # Exit.
+ sys.exit(0)
+
# Select and order the tests.
- numTotalTests = len(tests)
+ numTotalTests = len(run.tests)
# First, select based on the filter expression if given.
if opts.filter:
@@ -326,33 +286,28 @@ def main(builtinParameters = {}):
except:
parser.error("invalid regular expression for --filter: %r" % (
opts.filter))
- tests = [t for t in tests
- if rex.search(t.getFullName())]
+ run.tests = [t for t in run.tests
+ if rex.search(t.getFullName())]
# Then select the order.
if opts.shuffle:
- random.shuffle(tests)
+ random.shuffle(run.tests)
else:
- tests.sort(key = lambda t: t.getFullName())
+ run.tests.sort(key = lambda t: t.getFullName())
# Finally limit the number of tests, if desired.
if opts.maxTests is not None:
- tests = tests[:opts.maxTests]
+ run.tests = run.tests[:opts.maxTests]
# Don't create more threads than tests.
- opts.numThreads = min(len(tests), opts.numThreads)
+ opts.numThreads = min(len(run.tests), opts.numThreads)
extra = ''
- if len(tests) != numTotalTests:
+ if len(run.tests) != numTotalTests:
extra = ' of %d' % numTotalTests
- header = '-- Testing: %d%s tests, %d threads --'%(len(tests),extra,
+ header = '-- Testing: %d%s tests, %d threads --'%(len(run.tests), extra,
opts.numThreads)
- if opts.repeatTests:
- tests = [t.copyWithIndex(i)
- for t in tests
- for i in range(opts.repeatTests)]
-
progressBar = None
if not opts.quiet:
if opts.succinct and opts.useProgressBar:
@@ -366,63 +321,50 @@ def main(builtinParameters = {}):
print(header)
startTime = time.time()
- display = TestingProgressDisplay(opts, len(tests), progressBar)
- provider = TestProvider(tests, opts.maxTime)
-
+ display = TestingProgressDisplay(opts, len(run.tests), progressBar)
try:
- import win32api
- except ImportError:
- pass
- else:
- def console_ctrl_handler(type):
- provider.cancel()
- return True
- win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
-
- runTests(opts.numThreads, litConfig, provider, display)
+ run.execute_tests(display, opts.numThreads, opts.maxTime,
+ opts.useProcesses)
+ except KeyboardInterrupt:
+ sys.exit(2)
display.finish()
+ testing_time = time.time() - startTime
if not opts.quiet:
- print('Testing Time: %.2fs'%(time.time() - startTime))
+ print('Testing Time: %.2fs' % (testing_time,))
- # Update results for any tests which weren't run.
- for t in tests:
- if t.result is None:
- t.setResult(lit.Test.UNRESOLVED, '', 0.0)
+ # Write out the test data, if requested.
+ if opts.output_path is not None:
+ write_test_results(run, litConfig, testing_time, opts.output_path)
# List test results organized by kind.
hasFailures = False
byCode = {}
- for t in tests:
- if t.result not in byCode:
- byCode[t.result] = []
- byCode[t.result].append(t)
- if t.result.isFailure:
+ for test in run.tests:
+ if test.result.code not in byCode:
+ byCode[test.result.code] = []
+ byCode[test.result.code].append(test)
+ if test.result.code.isFailure:
hasFailures = True
- # FIXME: Show unresolved and (optionally) unsupported tests.
+ # Print each test in any of the failing groups.
for title,code in (('Unexpected Passing Tests', lit.Test.XPASS),
- ('Failing Tests', lit.Test.FAIL)):
+ ('Failing Tests', lit.Test.FAIL),
+ ('Unresolved Tests', lit.Test.UNRESOLVED)):
elts = byCode.get(code)
if not elts:
continue
print('*'*20)
print('%s (%d):' % (title, len(elts)))
- for t in elts:
- print(' %s' % t.getFullName())
+ for test in elts:
+ print(' %s' % test.getFullName())
sys.stdout.write('\n')
- if opts.timeTests:
- # Collate, in case we repeated tests.
- times = {}
- for t in tests:
- key = t.getFullName()
- times[key] = times.get(key, 0.) + t.elapsed
-
- byTime = list(times.items())
- byTime.sort(key = lambda item: item[1])
- if byTime:
- lit.Util.printHistogram(byTime, title='Tests')
+ if opts.timeTests and run.tests:
+ # Order by time.
+ test_times = [(test.getFullName(), test.result.elapsed)
+ for test in run.tests]
+ lit.util.printHistogram(test_times, title='Tests')
for name,code in (('Expected Passes ', lit.Test.PASS),
('Expected Failures ', lit.Test.XFAIL),