blob: 129016cceb6f5667ae31cbb32b3a6331f7e8f2f3 [file] [log] [blame]
Roger Meier41ad4342015-03-24 22:30:40 +01001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20import contextlib
21import multiprocessing
22import multiprocessing.managers
23import os
Jens Geyeraad06de2015-11-21 14:43:56 +010024import sys
Roger Meier41ad4342015-03-24 22:30:40 +010025import platform
26import random
27import socket
28import signal
29import subprocess
30import threading
31import time
32import traceback
33
Nobuaki Sukegawa2de27002015-11-22 01:13:48 +090034from .compat import str_join
35from .test import TestEntry, domain_socket_path
36from .report import ExecReporter, SummaryReporter
Roger Meier41ad4342015-03-24 22:30:40 +010037
38RESULT_TIMEOUT = 128
39RESULT_ERROR = 64
40
41
42class ExecutionContext(object):
43 def __init__(self, cmd, cwd, env, report):
44 self._log = multiprocessing.get_logger()
45 self.report = report
46 self.cmd = cmd
47 self.cwd = cwd
48 self.env = env
49 self.timer = None
50 self.expired = False
51
52 def _expire(self):
53 self._log.info('Timeout')
54 self.expired = True
55 self.kill()
56
57 def kill(self):
58 self._log.debug('Killing process : %d' % self.proc.pid)
59 if platform.system() != 'Windows':
60 try:
61 os.killpg(self.proc.pid, signal.SIGKILL)
62 except Exception as err:
63 self._log.info('Failed to kill process group : %s' % str(err))
64 try:
65 self.proc.kill()
66 except Exception as err:
67 self._log.info('Failed to kill process : %s' % str(err))
68 self.report.killed()
69
70 def _popen_args(self):
71 args = {
72 'cwd': self.cwd,
73 'env': self.env,
74 'stdout': self.report.out,
75 'stderr': subprocess.STDOUT,
76 }
77 # make sure child processes doesn't remain after killing
78 if platform.system() == 'Windows':
79 DETACHED_PROCESS = 0x00000008
80 args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
81 else:
82 args.update(preexec_fn=os.setsid)
83 return args
84
85 def start(self, timeout=0):
Nobuaki Sukegawa2de27002015-11-22 01:13:48 +090086 joined = str_join(' ', self.cmd)
Jens Geyeraad06de2015-11-21 14:43:56 +010087 self._log.debug('COMMAND: %s', joined)
Roger Meier41ad4342015-03-24 22:30:40 +010088 self._log.debug('WORKDIR: %s', self.cwd)
89 self._log.debug('LOGFILE: %s', self.report.logpath)
90 self.report.begin()
Nobuaki Sukegawa2de27002015-11-22 01:13:48 +090091 self.proc = subprocess.Popen(self.cmd, **self._popen_args())
Roger Meier41ad4342015-03-24 22:30:40 +010092 if timeout > 0:
93 self.timer = threading.Timer(timeout, self._expire)
94 self.timer.start()
95 return self._scoped()
96
97 @contextlib.contextmanager
98 def _scoped(self):
99 yield self
100 self._log.debug('Killing scoped process')
101 self.kill()
102
103 def wait(self):
104 self.proc.communicate()
105 if self.timer:
106 self.timer.cancel()
107 self.report.end(self.returncode)
108
109 @property
110 def returncode(self):
111 return self.proc.returncode if self.proc else None
112
113
114def exec_context(port, testdir, test, prog):
115 report = ExecReporter(testdir, test, prog)
116 prog.build_command(port)
117 return ExecutionContext(prog.command, prog.workdir, prog.env, report)
118
119
120def run_test(testdir, test_dict, async=True, max_retry=3):
121 try:
122 logger = multiprocessing.get_logger()
123 retry_count = 0
124 test = TestEntry(testdir, **test_dict)
125 while True:
126 if stop.is_set():
127 logger.debug('Skipping because shutting down')
128 return None
129 logger.debug('Start')
130 with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
131 logger.debug('Start with port %d' % port)
132 sv = exec_context(port, testdir, test, test.server)
133 cl = exec_context(port, testdir, test, test.client)
134
135 logger.debug('Starting server')
136 with sv.start():
137 if test.delay > 0:
138 logger.debug('Delaying client for %.2f seconds' % test.delay)
139 time.sleep(test.delay)
140 cl_retry_count = 0
141 cl_max_retry = 10
142 cl_retry_wait = 0.5
143 while True:
144 logger.debug('Starting client')
145 cl.start(test.timeout)
146 logger.debug('Waiting client')
147 cl.wait()
148 if not cl.report.maybe_false_positive() or cl_retry_count >= cl_max_retry:
149 if cl_retry_count > 0 and cl_retry_count < cl_max_retry:
150 logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, cl_retry_count, cl_retry_wait))
151 break
152 logger.debug('Server may not be ready, waiting %.2f second...' % cl_retry_wait)
153 time.sleep(cl_retry_wait)
154 cl_retry_count += 1
155
156 if not sv.report.maybe_false_positive() or retry_count >= max_retry:
157 logger.debug('Finish')
158 return RESULT_TIMEOUT if cl.expired else cl.proc.returncode
159 logger.warn('[%s]: Detected socket bind failure, retrying...' % test.server.name)
160 retry_count += 1
161 except (KeyboardInterrupt, SystemExit):
162 logger.info('Interrupted execution')
163 if not async:
164 raise
165 stop.set()
166 return None
167 except Exception as ex:
168 logger.warn('Error while executing test : %s' % str(ex))
169 if not async:
170 raise
171 logger.info(traceback.print_exc())
172 return RESULT_ERROR
173
174
175class PortAllocator(object):
176 def __init__(self):
177 self._log = multiprocessing.get_logger()
178 self._lock = multiprocessing.Lock()
179 self._ports = set()
180 self._dom_ports = set()
181 self._last_alloc = 0
182
183 def _get_tcp_port(self):
184 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
185 sock.bind(('127.0.0.1', 0))
186 port = sock.getsockname()[1]
187 self._lock.acquire()
188 try:
189 ok = port not in self._ports
190 if ok:
191 self._ports.add(port)
192 self._last_alloc = time.time()
193 finally:
194 self._lock.release()
195 sock.close()
196 return port if ok else self._get_tcp_port()
197
198 def _get_domain_port(self):
199 port = random.randint(1024, 65536)
200 self._lock.acquire()
201 try:
202 ok = port not in self._dom_ports
203 if ok:
204 self._dom_ports.add(port)
205 finally:
206 self._lock.release()
207 return port if ok else self._get_domain_port()
208
209 def alloc_port(self, socket_type):
pavlodd08f6e2015-10-08 16:43:56 -0400210 if socket_type in ('domain', 'abstract'):
Roger Meier41ad4342015-03-24 22:30:40 +0100211 return self._get_domain_port()
212 else:
213 return self._get_tcp_port()
214
215 # static method for inter-process invokation
216 @staticmethod
217 @contextlib.contextmanager
218 def alloc_port_scoped(allocator, socket_type):
219 port = allocator.alloc_port(socket_type)
220 yield port
221 allocator.free_port(socket_type, port)
222
223 def free_port(self, socket_type, port):
224 self._log.debug('free_port')
225 self._lock.acquire()
226 try:
227 if socket_type == 'domain':
228 self._dom_ports.remove(port)
229 path = domain_socket_path(port)
230 if os.path.exists(path):
231 os.remove(path)
pavlodd08f6e2015-10-08 16:43:56 -0400232 elif socket_type == 'abstract':
233 self._dom_ports.remove(port)
Roger Meier41ad4342015-03-24 22:30:40 +0100234 else:
235 self._ports.remove(port)
236 except IOError as err:
237 self._log.info('Error while freeing port : %s' % str(err))
238 finally:
239 self._lock.release()
240
241
242class NonAsyncResult(object):
243 def __init__(self, value):
244 self._value = value
245
246 def get(self, timeout=None):
247 return self._value
248
249 def wait(self, timeout=None):
250 pass
251
252 def ready(self):
253 return True
254
255 def successful(self):
256 return self._value == 0
257
258
259class TestDispatcher(object):
260 def __init__(self, testdir, concurrency):
261 self._log = multiprocessing.get_logger()
262 self.testdir = testdir
263 # seems needed for python 2.x to handle keyboard interrupt
264 self._stop = multiprocessing.Event()
265 self._async = concurrency > 1
266 if not self._async:
267 self._pool = None
268 global stop
269 global ports
270 stop = self._stop
271 ports = PortAllocator()
272 else:
273 self._m = multiprocessing.managers.BaseManager()
274 self._m.register('ports', PortAllocator)
275 self._m.start()
276 self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
277 self._report = SummaryReporter(testdir, concurrency > 1)
278 self._log.debug(
279 'TestDispatcher started with %d concurrent jobs' % concurrency)
280
281 def _pool_init(self, address):
282 global stop
283 global m
284 global ports
285 stop = self._stop
286 m = multiprocessing.managers.BaseManager(address)
287 m.connect()
288 ports = m.ports()
289
290 def _dispatch_sync(self, test, cont):
291 r = run_test(self.testdir, test, False)
292 cont(r)
293 return NonAsyncResult(r)
294
295 def _dispatch_async(self, test, cont):
296 return self._pool.apply_async(func=run_test, args=(self.testdir, test,), callback=cont)
297
298 def dispatch(self, test):
299 index = self._report.add_test(test)
300
301 def cont(r):
302 if not self._stop.is_set():
303 self._log.debug('freeing port')
304 self._log.debug('adding result')
305 self._report.add_result(index, r, r == RESULT_TIMEOUT)
306 self._log.debug('finish continuation')
307 fn = self._dispatch_async if self._async else self._dispatch_sync
308 return fn(test, cont)
309
310 def wait(self):
311 if self._async:
312 self._pool.close()
313 self._pool.join()
314 self._m.shutdown()
315 return self._report.end()
316
317 def terminate(self):
318 self._stop.set()
319 if self._async:
320 self._pool.terminate()
321 self._pool.join()
322 self._m.shutdown()