Justin Shepherd | 0d9bbd1 | 2011-08-11 12:57:44 -0500 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # vim: tabstop=4 shiftwidth=4 softtabstop=4 |
| 3 | |
Justin Shepherd | 0d9bbd1 | 2011-08-11 12:57:44 -0500 | [diff] [blame] | 4 | import heapq |
| 5 | import os |
| 6 | import unittest |
| 7 | import sys |
| 8 | import time |
| 9 | |
| 10 | from nose import config |
| 11 | from nose import result |
| 12 | from nose import core |
| 13 | |
| 14 | |
| 15 | class _AnsiColorizer(object): |
| 16 | """ |
| 17 | A colorizer is an object that loosely wraps around a stream, allowing |
| 18 | callers to write text to the stream in a particular color. |
| 19 | |
| 20 | Colorizer classes must implement C{supported()} and C{write(text, color)}. |
| 21 | """ |
| 22 | _colors = dict(black=30, red=31, green=32, yellow=33, |
| 23 | blue=34, magenta=35, cyan=36, white=37) |
| 24 | |
| 25 | def __init__(self, stream): |
| 26 | self.stream = stream |
| 27 | |
| 28 | def supported(cls, stream=sys.stdout): |
| 29 | """ |
| 30 | A class method that returns True if the current platform supports |
| 31 | coloring terminal output using this method. Returns False otherwise. |
| 32 | """ |
| 33 | if not stream.isatty(): |
| 34 | return False # auto color only on TTYs |
| 35 | try: |
| 36 | import curses |
| 37 | except ImportError: |
| 38 | return False |
| 39 | else: |
| 40 | try: |
| 41 | try: |
| 42 | return curses.tigetnum("colors") > 2 |
| 43 | except curses.error: |
| 44 | curses.setupterm() |
| 45 | return curses.tigetnum("colors") > 2 |
| 46 | except: |
| 47 | raise |
| 48 | # guess false in case of error |
| 49 | return False |
| 50 | supported = classmethod(supported) |
| 51 | |
| 52 | def write(self, text, color): |
| 53 | """ |
| 54 | Write the given text to the stream in the given color. |
| 55 | |
| 56 | @param text: Text to be written to the stream. |
| 57 | |
| 58 | @param color: A string label for a color. e.g. 'red', 'white'. |
| 59 | """ |
| 60 | color = self._colors[color] |
| 61 | self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text)) |
| 62 | |
| 63 | |
| 64 | class _Win32Colorizer(object): |
| 65 | """ |
| 66 | See _AnsiColorizer docstring. |
| 67 | """ |
| 68 | def __init__(self, stream): |
| 69 | from win32console import GetStdHandle, STD_OUT_HANDLE, \ |
| 70 | FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \ |
| 71 | FOREGROUND_INTENSITY |
| 72 | red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN, |
| 73 | FOREGROUND_BLUE, FOREGROUND_INTENSITY) |
| 74 | self.stream = stream |
| 75 | self.screenBuffer = GetStdHandle(STD_OUT_HANDLE) |
| 76 | self._colors = { |
| 77 | 'normal': red | green | blue, |
| 78 | 'red': red | bold, |
| 79 | 'green': green | bold, |
| 80 | 'blue': blue | bold, |
| 81 | 'yellow': red | green | bold, |
| 82 | 'magenta': red | blue | bold, |
| 83 | 'cyan': green | blue | bold, |
| 84 | 'white': red | green | blue | bold |
| 85 | } |
| 86 | |
| 87 | def supported(cls, stream=sys.stdout): |
| 88 | try: |
| 89 | import win32console |
| 90 | screenBuffer = win32console.GetStdHandle( |
| 91 | win32console.STD_OUT_HANDLE) |
| 92 | except ImportError: |
| 93 | return False |
| 94 | import pywintypes |
| 95 | try: |
| 96 | screenBuffer.SetConsoleTextAttribute( |
| 97 | win32console.FOREGROUND_RED | |
| 98 | win32console.FOREGROUND_GREEN | |
| 99 | win32console.FOREGROUND_BLUE) |
| 100 | except pywintypes.error: |
| 101 | return False |
| 102 | else: |
| 103 | return True |
| 104 | supported = classmethod(supported) |
| 105 | |
| 106 | def write(self, text, color): |
| 107 | color = self._colors[color] |
| 108 | self.screenBuffer.SetConsoleTextAttribute(color) |
| 109 | self.stream.write(text) |
| 110 | self.screenBuffer.SetConsoleTextAttribute(self._colors['normal']) |
| 111 | |
| 112 | |
| 113 | class _NullColorizer(object): |
| 114 | """ |
| 115 | See _AnsiColorizer docstring. |
| 116 | """ |
| 117 | def __init__(self, stream): |
| 118 | self.stream = stream |
| 119 | |
| 120 | def supported(cls, stream=sys.stdout): |
| 121 | return True |
| 122 | supported = classmethod(supported) |
| 123 | |
| 124 | def write(self, text, color): |
| 125 | self.stream.write(text) |
| 126 | |
| 127 | |
| 128 | def get_elapsed_time_color(elapsed_time): |
| 129 | if elapsed_time > 1.0: |
| 130 | return 'red' |
| 131 | elif elapsed_time > 0.25: |
| 132 | return 'yellow' |
| 133 | else: |
| 134 | return 'green' |
| 135 | |
| 136 | |
| 137 | class KongTestResult(result.TextTestResult): |
| 138 | def __init__(self, *args, **kw): |
| 139 | self.show_elapsed = kw.pop('show_elapsed') |
| 140 | result.TextTestResult.__init__(self, *args, **kw) |
| 141 | self.num_slow_tests = 5 |
| 142 | self.slow_tests = [] # this is a fixed-sized heap |
| 143 | self._last_case = None |
| 144 | self.colorizer = None |
| 145 | # NOTE(vish, tfukushima): reset stdout for the terminal check |
| 146 | stdout = sys.__stdout__ |
| 147 | sys.stdout = sys.__stdout__ |
| 148 | for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]: |
| 149 | if colorizer.supported(): |
| 150 | self.colorizer = colorizer(self.stream) |
| 151 | break |
| 152 | sys.stdout = stdout |
| 153 | |
| 154 | # NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate |
| 155 | # error results in it failing to be initialized later. Otherwise, |
| 156 | # _handleElapsedTime will fail, causing the wrong error message to |
| 157 | # be outputted. |
| 158 | self.start_time = time.time() |
| 159 | |
| 160 | def getDescription(self, test): |
| 161 | return str(test) |
| 162 | |
| 163 | def _handleElapsedTime(self, test): |
| 164 | self.elapsed_time = time.time() - self.start_time |
| 165 | item = (self.elapsed_time, test) |
| 166 | # Record only the n-slowest tests using heap |
| 167 | if len(self.slow_tests) >= self.num_slow_tests: |
| 168 | heapq.heappushpop(self.slow_tests, item) |
| 169 | else: |
| 170 | heapq.heappush(self.slow_tests, item) |
| 171 | |
| 172 | def _writeElapsedTime(self, test): |
| 173 | color = get_elapsed_time_color(self.elapsed_time) |
| 174 | self.colorizer.write(" %.2f" % self.elapsed_time, color) |
| 175 | |
| 176 | def _writeResult(self, test, long_result, color, short_result, success): |
| 177 | if self.showAll: |
| 178 | self.colorizer.write(long_result, color) |
| 179 | if self.show_elapsed and success: |
| 180 | self._writeElapsedTime(test) |
| 181 | self.stream.writeln() |
| 182 | elif self.dots: |
| 183 | self.stream.write(short_result) |
| 184 | self.stream.flush() |
| 185 | |
| 186 | # NOTE(vish, tfukushima): copied from unittest with edit to add color |
| 187 | def addSuccess(self, test): |
| 188 | unittest.TestResult.addSuccess(self, test) |
| 189 | self._handleElapsedTime(test) |
| 190 | self._writeResult(test, 'OK', 'green', '.', True) |
| 191 | |
| 192 | # NOTE(vish, tfukushima): copied from unittest with edit to add color |
| 193 | def addFailure(self, test, err): |
| 194 | unittest.TestResult.addFailure(self, test, err) |
| 195 | self._handleElapsedTime(test) |
| 196 | self._writeResult(test, 'FAIL', 'red', 'F', False) |
| 197 | |
| 198 | # NOTE(vish, tfukushima): copied from unittest with edit to add color |
| 199 | def addError(self, test, err): |
| 200 | """Overrides normal addError to add support for errorClasses. |
| 201 | If the exception is a registered class, the error will be added |
| 202 | to the list for that class, not errors. |
| 203 | """ |
| 204 | self._handleElapsedTime(test) |
| 205 | stream = getattr(self, 'stream', None) |
| 206 | ec, ev, tb = err |
| 207 | try: |
| 208 | exc_info = self._exc_info_to_string(err, test) |
| 209 | except TypeError: |
| 210 | # This is for compatibility with Python 2.3. |
| 211 | exc_info = self._exc_info_to_string(err) |
| 212 | for cls, (storage, label, isfail) in self.errorClasses.items(): |
| 213 | if result.isclass(ec) and issubclass(ec, cls): |
| 214 | if isfail: |
| 215 | test.passwd = False |
| 216 | storage.append((test, exc_info)) |
| 217 | # Might get patched into a streamless result |
| 218 | if stream is not None: |
| 219 | if self.showAll: |
| 220 | message = [label] |
| 221 | detail = result._exception_detail(err[1]) |
| 222 | if detail: |
| 223 | message.append(detail) |
| 224 | stream.writeln(": ".join(message)) |
| 225 | elif self.dots: |
| 226 | stream.write(label[:1]) |
| 227 | return |
| 228 | self.errors.append((test, exc_info)) |
| 229 | test.passed = False |
| 230 | if stream is not None: |
| 231 | self._writeResult(test, 'ERROR', 'red', 'E', False) |
| 232 | |
| 233 | def startTest(self, test): |
| 234 | unittest.TestResult.startTest(self, test) |
| 235 | self.start_time = time.time() |
| 236 | current_case = test.test.__class__.__name__ |
| 237 | |
| 238 | if self.showAll: |
| 239 | if current_case != self._last_case: |
| 240 | self.stream.writeln(current_case) |
| 241 | self._last_case = current_case |
| 242 | |
| 243 | self.stream.write( |
| 244 | ' %s' % str(test.test._testMethodName).ljust(60)) |
| 245 | self.stream.flush() |
| 246 | |
| 247 | |
| 248 | class KongTestRunner(core.TextTestRunner): |
| 249 | def __init__(self, *args, **kwargs): |
| 250 | self.show_elapsed = kwargs.pop('show_elapsed') |
| 251 | core.TextTestRunner.__init__(self, *args, **kwargs) |
| 252 | |
| 253 | def _makeResult(self): |
| 254 | return KongTestResult(self.stream, |
| 255 | self.descriptions, |
| 256 | self.verbosity, |
| 257 | self.config, |
| 258 | show_elapsed=self.show_elapsed) |
| 259 | |
| 260 | def _writeSlowTests(self, result_): |
| 261 | # Pare out 'fast' tests |
| 262 | slow_tests = [item for item in result_.slow_tests |
| 263 | if get_elapsed_time_color(item[0]) != 'green'] |
| 264 | if slow_tests: |
| 265 | slow_total_time = sum(item[0] for item in slow_tests) |
| 266 | self.stream.writeln("Slowest %i tests took %.2f secs:" |
| 267 | % (len(slow_tests), slow_total_time)) |
| 268 | for elapsed_time, test in sorted(slow_tests, reverse=True): |
| 269 | time_str = "%.2f" % elapsed_time |
| 270 | self.stream.writeln(" %s %s" % (time_str.ljust(10), test)) |
| 271 | |
| 272 | def run(self, test): |
| 273 | result_ = core.TextTestRunner.run(self, test) |
| 274 | if self.show_elapsed: |
| 275 | self._writeSlowTests(result_) |
| 276 | return result_ |
| 277 | |
| 278 | |
| 279 | if __name__ == '__main__': |
| 280 | show_elapsed = True |
| 281 | argv = [] |
| 282 | for x in sys.argv: |
| 283 | if x.startswith('test_'): |
| 284 | argv.append('nova.tests.%s' % x) |
| 285 | elif x.startswith('--hide-elapsed'): |
| 286 | show_elapsed = False |
| 287 | else: |
| 288 | argv.append(x) |
| 289 | |
| 290 | c = config.Config(stream=sys.stdout, |
| 291 | env=os.environ, |
| 292 | verbosity=3, |
| 293 | plugins=core.DefaultPluginManager()) |
| 294 | |
| 295 | runner = KongTestRunner(stream=c.stream, |
| 296 | verbosity=c.verbosity, |
| 297 | config=c, |
| 298 | show_elapsed=show_elapsed) |
| 299 | sys.exit(not core.run(config=c, testRunner=runner, argv=argv)) |