blob: 091dce487f23d4760dfa971ab2f257ecd865d834 [file] [log] [blame]
Justin Shepherd0d9bbd12011-08-11 12:57:44 -05001#!/usr/bin/env python
2# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
4import gettext
5import heapq
6import os
7import unittest
8import sys
9import time
10
11from nose import config
12from nose import result
13from nose import core
14
15
16class _AnsiColorizer(object):
17 """
18 A colorizer is an object that loosely wraps around a stream, allowing
19 callers to write text to the stream in a particular color.
20
21 Colorizer classes must implement C{supported()} and C{write(text, color)}.
22 """
23 _colors = dict(black=30, red=31, green=32, yellow=33,
24 blue=34, magenta=35, cyan=36, white=37)
25
26 def __init__(self, stream):
27 self.stream = stream
28
29 def supported(cls, stream=sys.stdout):
30 """
31 A class method that returns True if the current platform supports
32 coloring terminal output using this method. Returns False otherwise.
33 """
34 if not stream.isatty():
35 return False # auto color only on TTYs
36 try:
37 import curses
38 except ImportError:
39 return False
40 else:
41 try:
42 try:
43 return curses.tigetnum("colors") > 2
44 except curses.error:
45 curses.setupterm()
46 return curses.tigetnum("colors") > 2
47 except:
48 raise
49 # guess false in case of error
50 return False
51 supported = classmethod(supported)
52
53 def write(self, text, color):
54 """
55 Write the given text to the stream in the given color.
56
57 @param text: Text to be written to the stream.
58
59 @param color: A string label for a color. e.g. 'red', 'white'.
60 """
61 color = self._colors[color]
62 self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
63
64
65class _Win32Colorizer(object):
66 """
67 See _AnsiColorizer docstring.
68 """
69 def __init__(self, stream):
70 from win32console import GetStdHandle, STD_OUT_HANDLE, \
71 FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
72 FOREGROUND_INTENSITY
73 red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
74 FOREGROUND_BLUE, FOREGROUND_INTENSITY)
75 self.stream = stream
76 self.screenBuffer = GetStdHandle(STD_OUT_HANDLE)
77 self._colors = {
78 'normal': red | green | blue,
79 'red': red | bold,
80 'green': green | bold,
81 'blue': blue | bold,
82 'yellow': red | green | bold,
83 'magenta': red | blue | bold,
84 'cyan': green | blue | bold,
85 'white': red | green | blue | bold
86 }
87
88 def supported(cls, stream=sys.stdout):
89 try:
90 import win32console
91 screenBuffer = win32console.GetStdHandle(
92 win32console.STD_OUT_HANDLE)
93 except ImportError:
94 return False
95 import pywintypes
96 try:
97 screenBuffer.SetConsoleTextAttribute(
98 win32console.FOREGROUND_RED |
99 win32console.FOREGROUND_GREEN |
100 win32console.FOREGROUND_BLUE)
101 except pywintypes.error:
102 return False
103 else:
104 return True
105 supported = classmethod(supported)
106
107 def write(self, text, color):
108 color = self._colors[color]
109 self.screenBuffer.SetConsoleTextAttribute(color)
110 self.stream.write(text)
111 self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
112
113
114class _NullColorizer(object):
115 """
116 See _AnsiColorizer docstring.
117 """
118 def __init__(self, stream):
119 self.stream = stream
120
121 def supported(cls, stream=sys.stdout):
122 return True
123 supported = classmethod(supported)
124
125 def write(self, text, color):
126 self.stream.write(text)
127
128
129def get_elapsed_time_color(elapsed_time):
130 if elapsed_time > 1.0:
131 return 'red'
132 elif elapsed_time > 0.25:
133 return 'yellow'
134 else:
135 return 'green'
136
137
138class KongTestResult(result.TextTestResult):
139 def __init__(self, *args, **kw):
140 self.show_elapsed = kw.pop('show_elapsed')
141 result.TextTestResult.__init__(self, *args, **kw)
142 self.num_slow_tests = 5
143 self.slow_tests = [] # this is a fixed-sized heap
144 self._last_case = None
145 self.colorizer = None
146 # NOTE(vish, tfukushima): reset stdout for the terminal check
147 stdout = sys.__stdout__
148 sys.stdout = sys.__stdout__
149 for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
150 if colorizer.supported():
151 self.colorizer = colorizer(self.stream)
152 break
153 sys.stdout = stdout
154
155 # NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate
156 # error results in it failing to be initialized later. Otherwise,
157 # _handleElapsedTime will fail, causing the wrong error message to
158 # be outputted.
159 self.start_time = time.time()
160
161 def getDescription(self, test):
162 return str(test)
163
164 def _handleElapsedTime(self, test):
165 self.elapsed_time = time.time() - self.start_time
166 item = (self.elapsed_time, test)
167 # Record only the n-slowest tests using heap
168 if len(self.slow_tests) >= self.num_slow_tests:
169 heapq.heappushpop(self.slow_tests, item)
170 else:
171 heapq.heappush(self.slow_tests, item)
172
173 def _writeElapsedTime(self, test):
174 color = get_elapsed_time_color(self.elapsed_time)
175 self.colorizer.write(" %.2f" % self.elapsed_time, color)
176
177 def _writeResult(self, test, long_result, color, short_result, success):
178 if self.showAll:
179 self.colorizer.write(long_result, color)
180 if self.show_elapsed and success:
181 self._writeElapsedTime(test)
182 self.stream.writeln()
183 elif self.dots:
184 self.stream.write(short_result)
185 self.stream.flush()
186
187 # NOTE(vish, tfukushima): copied from unittest with edit to add color
188 def addSuccess(self, test):
189 unittest.TestResult.addSuccess(self, test)
190 self._handleElapsedTime(test)
191 self._writeResult(test, 'OK', 'green', '.', True)
192
193 # NOTE(vish, tfukushima): copied from unittest with edit to add color
194 def addFailure(self, test, err):
195 unittest.TestResult.addFailure(self, test, err)
196 self._handleElapsedTime(test)
197 self._writeResult(test, 'FAIL', 'red', 'F', False)
198
199 # NOTE(vish, tfukushima): copied from unittest with edit to add color
200 def addError(self, test, err):
201 """Overrides normal addError to add support for errorClasses.
202 If the exception is a registered class, the error will be added
203 to the list for that class, not errors.
204 """
205 self._handleElapsedTime(test)
206 stream = getattr(self, 'stream', None)
207 ec, ev, tb = err
208 try:
209 exc_info = self._exc_info_to_string(err, test)
210 except TypeError:
211 # This is for compatibility with Python 2.3.
212 exc_info = self._exc_info_to_string(err)
213 for cls, (storage, label, isfail) in self.errorClasses.items():
214 if result.isclass(ec) and issubclass(ec, cls):
215 if isfail:
216 test.passwd = False
217 storage.append((test, exc_info))
218 # Might get patched into a streamless result
219 if stream is not None:
220 if self.showAll:
221 message = [label]
222 detail = result._exception_detail(err[1])
223 if detail:
224 message.append(detail)
225 stream.writeln(": ".join(message))
226 elif self.dots:
227 stream.write(label[:1])
228 return
229 self.errors.append((test, exc_info))
230 test.passed = False
231 if stream is not None:
232 self._writeResult(test, 'ERROR', 'red', 'E', False)
233
234 def startTest(self, test):
235 unittest.TestResult.startTest(self, test)
236 self.start_time = time.time()
237 current_case = test.test.__class__.__name__
238
239 if self.showAll:
240 if current_case != self._last_case:
241 self.stream.writeln(current_case)
242 self._last_case = current_case
243
244 self.stream.write(
245 ' %s' % str(test.test._testMethodName).ljust(60))
246 self.stream.flush()
247
248
249class KongTestRunner(core.TextTestRunner):
250 def __init__(self, *args, **kwargs):
251 self.show_elapsed = kwargs.pop('show_elapsed')
252 core.TextTestRunner.__init__(self, *args, **kwargs)
253
254 def _makeResult(self):
255 return KongTestResult(self.stream,
256 self.descriptions,
257 self.verbosity,
258 self.config,
259 show_elapsed=self.show_elapsed)
260
261 def _writeSlowTests(self, result_):
262 # Pare out 'fast' tests
263 slow_tests = [item for item in result_.slow_tests
264 if get_elapsed_time_color(item[0]) != 'green']
265 if slow_tests:
266 slow_total_time = sum(item[0] for item in slow_tests)
267 self.stream.writeln("Slowest %i tests took %.2f secs:"
268 % (len(slow_tests), slow_total_time))
269 for elapsed_time, test in sorted(slow_tests, reverse=True):
270 time_str = "%.2f" % elapsed_time
271 self.stream.writeln(" %s %s" % (time_str.ljust(10), test))
272
273 def run(self, test):
274 result_ = core.TextTestRunner.run(self, test)
275 if self.show_elapsed:
276 self._writeSlowTests(result_)
277 return result_
278
279
280if __name__ == '__main__':
281 show_elapsed = True
282 argv = []
283 for x in sys.argv:
284 if x.startswith('test_'):
285 argv.append('nova.tests.%s' % x)
286 elif x.startswith('--hide-elapsed'):
287 show_elapsed = False
288 else:
289 argv.append(x)
290
291 c = config.Config(stream=sys.stdout,
292 env=os.environ,
293 verbosity=3,
294 plugins=core.DefaultPluginManager())
295
296 runner = KongTestRunner(stream=c.stream,
297 verbosity=c.verbosity,
298 config=c,
299 show_elapsed=show_elapsed)
300 sys.exit(not core.run(config=c, testRunner=runner, argv=argv))