blob: 57a82531eaad1e21e2da1f82ba00c7272b0f92f4 [file] [log] [blame]
Justin Shepherd0d9bbd12011-08-11 12:57:44 -05001#!/usr/bin/env python
2# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
Justin Shepherd0d9bbd12011-08-11 12:57:44 -05004import heapq
5import os
6import unittest
7import sys
8import time
9
10from nose import config
11from nose import result
12from nose import core
13
14
15class _AnsiColorizer(object):
16 """
17 A colorizer is an object that loosely wraps around a stream, allowing
18 callers to write text to the stream in a particular color.
19
20 Colorizer classes must implement C{supported()} and C{write(text, color)}.
21 """
22 _colors = dict(black=30, red=31, green=32, yellow=33,
23 blue=34, magenta=35, cyan=36, white=37)
24
25 def __init__(self, stream):
26 self.stream = stream
27
28 def supported(cls, stream=sys.stdout):
29 """
30 A class method that returns True if the current platform supports
31 coloring terminal output using this method. Returns False otherwise.
32 """
33 if not stream.isatty():
34 return False # auto color only on TTYs
35 try:
36 import curses
37 except ImportError:
38 return False
39 else:
40 try:
41 try:
42 return curses.tigetnum("colors") > 2
43 except curses.error:
44 curses.setupterm()
45 return curses.tigetnum("colors") > 2
46 except:
47 raise
48 # guess false in case of error
49 return False
50 supported = classmethod(supported)
51
52 def write(self, text, color):
53 """
54 Write the given text to the stream in the given color.
55
56 @param text: Text to be written to the stream.
57
58 @param color: A string label for a color. e.g. 'red', 'white'.
59 """
60 color = self._colors[color]
61 self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
62
63
64class _Win32Colorizer(object):
65 """
66 See _AnsiColorizer docstring.
67 """
68 def __init__(self, stream):
69 from win32console import GetStdHandle, STD_OUT_HANDLE, \
70 FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
71 FOREGROUND_INTENSITY
72 red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
73 FOREGROUND_BLUE, FOREGROUND_INTENSITY)
74 self.stream = stream
75 self.screenBuffer = GetStdHandle(STD_OUT_HANDLE)
76 self._colors = {
77 'normal': red | green | blue,
78 'red': red | bold,
79 'green': green | bold,
80 'blue': blue | bold,
81 'yellow': red | green | bold,
82 'magenta': red | blue | bold,
83 'cyan': green | blue | bold,
84 'white': red | green | blue | bold
85 }
86
87 def supported(cls, stream=sys.stdout):
88 try:
89 import win32console
90 screenBuffer = win32console.GetStdHandle(
91 win32console.STD_OUT_HANDLE)
92 except ImportError:
93 return False
94 import pywintypes
95 try:
96 screenBuffer.SetConsoleTextAttribute(
97 win32console.FOREGROUND_RED |
98 win32console.FOREGROUND_GREEN |
99 win32console.FOREGROUND_BLUE)
100 except pywintypes.error:
101 return False
102 else:
103 return True
104 supported = classmethod(supported)
105
106 def write(self, text, color):
107 color = self._colors[color]
108 self.screenBuffer.SetConsoleTextAttribute(color)
109 self.stream.write(text)
110 self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
111
112
113class _NullColorizer(object):
114 """
115 See _AnsiColorizer docstring.
116 """
117 def __init__(self, stream):
118 self.stream = stream
119
120 def supported(cls, stream=sys.stdout):
121 return True
122 supported = classmethod(supported)
123
124 def write(self, text, color):
125 self.stream.write(text)
126
127
128def get_elapsed_time_color(elapsed_time):
129 if elapsed_time > 1.0:
130 return 'red'
131 elif elapsed_time > 0.25:
132 return 'yellow'
133 else:
134 return 'green'
135
136
137class KongTestResult(result.TextTestResult):
138 def __init__(self, *args, **kw):
139 self.show_elapsed = kw.pop('show_elapsed')
140 result.TextTestResult.__init__(self, *args, **kw)
141 self.num_slow_tests = 5
142 self.slow_tests = [] # this is a fixed-sized heap
143 self._last_case = None
144 self.colorizer = None
145 # NOTE(vish, tfukushima): reset stdout for the terminal check
146 stdout = sys.__stdout__
147 sys.stdout = sys.__stdout__
148 for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
149 if colorizer.supported():
150 self.colorizer = colorizer(self.stream)
151 break
152 sys.stdout = stdout
153
154 # NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate
155 # error results in it failing to be initialized later. Otherwise,
156 # _handleElapsedTime will fail, causing the wrong error message to
157 # be outputted.
158 self.start_time = time.time()
159
160 def getDescription(self, test):
161 return str(test)
162
163 def _handleElapsedTime(self, test):
164 self.elapsed_time = time.time() - self.start_time
165 item = (self.elapsed_time, test)
166 # Record only the n-slowest tests using heap
167 if len(self.slow_tests) >= self.num_slow_tests:
168 heapq.heappushpop(self.slow_tests, item)
169 else:
170 heapq.heappush(self.slow_tests, item)
171
172 def _writeElapsedTime(self, test):
173 color = get_elapsed_time_color(self.elapsed_time)
174 self.colorizer.write(" %.2f" % self.elapsed_time, color)
175
176 def _writeResult(self, test, long_result, color, short_result, success):
177 if self.showAll:
178 self.colorizer.write(long_result, color)
179 if self.show_elapsed and success:
180 self._writeElapsedTime(test)
181 self.stream.writeln()
182 elif self.dots:
183 self.stream.write(short_result)
184 self.stream.flush()
185
186 # NOTE(vish, tfukushima): copied from unittest with edit to add color
187 def addSuccess(self, test):
188 unittest.TestResult.addSuccess(self, test)
189 self._handleElapsedTime(test)
190 self._writeResult(test, 'OK', 'green', '.', True)
191
192 # NOTE(vish, tfukushima): copied from unittest with edit to add color
193 def addFailure(self, test, err):
194 unittest.TestResult.addFailure(self, test, err)
195 self._handleElapsedTime(test)
196 self._writeResult(test, 'FAIL', 'red', 'F', False)
197
198 # NOTE(vish, tfukushima): copied from unittest with edit to add color
199 def addError(self, test, err):
200 """Overrides normal addError to add support for errorClasses.
201 If the exception is a registered class, the error will be added
202 to the list for that class, not errors.
203 """
204 self._handleElapsedTime(test)
205 stream = getattr(self, 'stream', None)
206 ec, ev, tb = err
207 try:
208 exc_info = self._exc_info_to_string(err, test)
209 except TypeError:
210 # This is for compatibility with Python 2.3.
211 exc_info = self._exc_info_to_string(err)
212 for cls, (storage, label, isfail) in self.errorClasses.items():
213 if result.isclass(ec) and issubclass(ec, cls):
214 if isfail:
215 test.passwd = False
216 storage.append((test, exc_info))
217 # Might get patched into a streamless result
218 if stream is not None:
219 if self.showAll:
220 message = [label]
221 detail = result._exception_detail(err[1])
222 if detail:
223 message.append(detail)
224 stream.writeln(": ".join(message))
225 elif self.dots:
226 stream.write(label[:1])
227 return
228 self.errors.append((test, exc_info))
229 test.passed = False
230 if stream is not None:
231 self._writeResult(test, 'ERROR', 'red', 'E', False)
232
233 def startTest(self, test):
234 unittest.TestResult.startTest(self, test)
235 self.start_time = time.time()
236 current_case = test.test.__class__.__name__
237
238 if self.showAll:
239 if current_case != self._last_case:
240 self.stream.writeln(current_case)
241 self._last_case = current_case
242
243 self.stream.write(
244 ' %s' % str(test.test._testMethodName).ljust(60))
245 self.stream.flush()
246
247
248class KongTestRunner(core.TextTestRunner):
249 def __init__(self, *args, **kwargs):
250 self.show_elapsed = kwargs.pop('show_elapsed')
251 core.TextTestRunner.__init__(self, *args, **kwargs)
252
253 def _makeResult(self):
254 return KongTestResult(self.stream,
255 self.descriptions,
256 self.verbosity,
257 self.config,
258 show_elapsed=self.show_elapsed)
259
260 def _writeSlowTests(self, result_):
261 # Pare out 'fast' tests
262 slow_tests = [item for item in result_.slow_tests
263 if get_elapsed_time_color(item[0]) != 'green']
264 if slow_tests:
265 slow_total_time = sum(item[0] for item in slow_tests)
266 self.stream.writeln("Slowest %i tests took %.2f secs:"
267 % (len(slow_tests), slow_total_time))
268 for elapsed_time, test in sorted(slow_tests, reverse=True):
269 time_str = "%.2f" % elapsed_time
270 self.stream.writeln(" %s %s" % (time_str.ljust(10), test))
271
272 def run(self, test):
273 result_ = core.TextTestRunner.run(self, test)
274 if self.show_elapsed:
275 self._writeSlowTests(result_)
276 return result_
277
278
279if __name__ == '__main__':
280 show_elapsed = True
281 argv = []
282 for x in sys.argv:
283 if x.startswith('test_'):
284 argv.append('nova.tests.%s' % x)
285 elif x.startswith('--hide-elapsed'):
286 show_elapsed = False
287 else:
288 argv.append(x)
289
290 c = config.Config(stream=sys.stdout,
291 env=os.environ,
292 verbosity=3,
293 plugins=core.DefaultPluginManager())
294
295 runner = KongTestRunner(stream=c.stream,
296 verbosity=c.verbosity,
297 config=c,
298 show_elapsed=show_elapsed)
299 sys.exit(not core.run(config=c, testRunner=runner, argv=argv))