blob: 0d617c0e3d9d51dbd420c927a2df7f4dd518857d [file] [log] [blame]
Roger Meier41ad4342015-03-24 22:30:40 +01001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20import contextlib
21import multiprocessing
22import multiprocessing.managers
23import os
24import platform
25import random
Roger Meier41ad4342015-03-24 22:30:40 +010026import signal
Nobuaki Sukegawaa6ab1f52015-11-28 15:04:39 +090027import socket
Roger Meier41ad4342015-03-24 22:30:40 +010028import subprocess
Nobuaki Sukegawaa6ab1f52015-11-28 15:04:39 +090029import sys
Roger Meier41ad4342015-03-24 22:30:40 +010030import threading
31import time
Roger Meier41ad4342015-03-24 22:30:40 +010032
Nobuaki Sukegawa2de27002015-11-22 01:13:48 +090033from .compat import str_join
34from .test import TestEntry, domain_socket_path
35from .report import ExecReporter, SummaryReporter
Roger Meier41ad4342015-03-24 22:30:40 +010036
37RESULT_TIMEOUT = 128
38RESULT_ERROR = 64
39
40
41class ExecutionContext(object):
42 def __init__(self, cmd, cwd, env, report):
43 self._log = multiprocessing.get_logger()
44 self.report = report
45 self.cmd = cmd
46 self.cwd = cwd
47 self.env = env
48 self.timer = None
49 self.expired = False
Nobuaki Sukegawa2ba79442016-01-12 19:37:55 +090050 self.killed = False
Roger Meier41ad4342015-03-24 22:30:40 +010051
52 def _expire(self):
53 self._log.info('Timeout')
54 self.expired = True
55 self.kill()
56
57 def kill(self):
58 self._log.debug('Killing process : %d' % self.proc.pid)
Nobuaki Sukegawa2ba79442016-01-12 19:37:55 +090059 self.killed = True
Roger Meier41ad4342015-03-24 22:30:40 +010060 if platform.system() != 'Windows':
61 try:
62 os.killpg(self.proc.pid, signal.SIGKILL)
63 except Exception as err:
64 self._log.info('Failed to kill process group : %s' % str(err))
65 try:
66 self.proc.kill()
67 except Exception as err:
68 self._log.info('Failed to kill process : %s' % str(err))
Roger Meier41ad4342015-03-24 22:30:40 +010069
70 def _popen_args(self):
71 args = {
72 'cwd': self.cwd,
73 'env': self.env,
74 'stdout': self.report.out,
75 'stderr': subprocess.STDOUT,
76 }
77 # make sure child processes doesn't remain after killing
78 if platform.system() == 'Windows':
79 DETACHED_PROCESS = 0x00000008
80 args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
81 else:
82 args.update(preexec_fn=os.setsid)
83 return args
84
85 def start(self, timeout=0):
Nobuaki Sukegawa2de27002015-11-22 01:13:48 +090086 joined = str_join(' ', self.cmd)
Jens Geyeraad06de2015-11-21 14:43:56 +010087 self._log.debug('COMMAND: %s', joined)
Roger Meier41ad4342015-03-24 22:30:40 +010088 self._log.debug('WORKDIR: %s', self.cwd)
89 self._log.debug('LOGFILE: %s', self.report.logpath)
90 self.report.begin()
Nobuaki Sukegawa2de27002015-11-22 01:13:48 +090091 self.proc = subprocess.Popen(self.cmd, **self._popen_args())
Roger Meier41ad4342015-03-24 22:30:40 +010092 if timeout > 0:
93 self.timer = threading.Timer(timeout, self._expire)
94 self.timer.start()
95 return self._scoped()
96
97 @contextlib.contextmanager
98 def _scoped(self):
99 yield self
100 self._log.debug('Killing scoped process')
Nobuaki Sukegawa2ba79442016-01-12 19:37:55 +0900101 if self.proc.poll() is None:
102 self.kill()
103 self.report.killed()
104 else:
105 self._log.debug('Process died unexpectedly')
106 self.report.died()
Roger Meier41ad4342015-03-24 22:30:40 +0100107
108 def wait(self):
109 self.proc.communicate()
110 if self.timer:
111 self.timer.cancel()
112 self.report.end(self.returncode)
113
114 @property
115 def returncode(self):
116 return self.proc.returncode if self.proc else None
117
118
Nobuaki Sukegawa378b7272016-01-03 17:04:50 +0900119def exec_context(port, logdir, test, prog):
120 report = ExecReporter(logdir, test, prog)
Roger Meier41ad4342015-03-24 22:30:40 +0100121 prog.build_command(port)
122 return ExecutionContext(prog.command, prog.workdir, prog.env, report)
123
124
Nobuaki Sukegawa378b7272016-01-03 17:04:50 +0900125def run_test(testdir, logdir, test_dict, async=True, max_retry=3):
Roger Meier41ad4342015-03-24 22:30:40 +0100126 try:
127 logger = multiprocessing.get_logger()
128 retry_count = 0
129 test = TestEntry(testdir, **test_dict)
130 while True:
131 if stop.is_set():
132 logger.debug('Skipping because shutting down')
133 return None
134 logger.debug('Start')
135 with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
136 logger.debug('Start with port %d' % port)
Nobuaki Sukegawa378b7272016-01-03 17:04:50 +0900137 sv = exec_context(port, logdir, test, test.server)
138 cl = exec_context(port, logdir, test, test.client)
Roger Meier41ad4342015-03-24 22:30:40 +0100139
140 logger.debug('Starting server')
141 with sv.start():
142 if test.delay > 0:
143 logger.debug('Delaying client for %.2f seconds' % test.delay)
144 time.sleep(test.delay)
145 cl_retry_count = 0
146 cl_max_retry = 10
147 cl_retry_wait = 0.5
148 while True:
149 logger.debug('Starting client')
150 cl.start(test.timeout)
151 logger.debug('Waiting client')
152 cl.wait()
153 if not cl.report.maybe_false_positive() or cl_retry_count >= cl_max_retry:
154 if cl_retry_count > 0 and cl_retry_count < cl_max_retry:
155 logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, cl_retry_count, cl_retry_wait))
Nobuaki Sukegawa2ba79442016-01-12 19:37:55 +0900156 # Wait for 50 ms to see if server does not die at the end.
157 time.sleep(0.05)
Roger Meier41ad4342015-03-24 22:30:40 +0100158 break
159 logger.debug('Server may not be ready, waiting %.2f second...' % cl_retry_wait)
160 time.sleep(cl_retry_wait)
161 cl_retry_count += 1
162
163 if not sv.report.maybe_false_positive() or retry_count >= max_retry:
164 logger.debug('Finish')
Nobuaki Sukegawa2ba79442016-01-12 19:37:55 +0900165 if cl.expired:
166 return RESULT_TIMEOUT
167 elif not sv.killed and cl.proc.returncode == 0:
168 # Server should be alive at the end.
169 return RESULT_ERROR
170 else:
171 return cl.proc.returncode
Roger Meier41ad4342015-03-24 22:30:40 +0100172 logger.warn('[%s]: Detected socket bind failure, retrying...' % test.server.name)
173 retry_count += 1
174 except (KeyboardInterrupt, SystemExit):
175 logger.info('Interrupted execution')
176 if not async:
177 raise
178 stop.set()
179 return None
Nobuaki Sukegawa2ba79442016-01-12 19:37:55 +0900180 except:
Roger Meier41ad4342015-03-24 22:30:40 +0100181 if not async:
182 raise
Nobuaki Sukegawa2ba79442016-01-12 19:37:55 +0900183 logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info())
Roger Meier41ad4342015-03-24 22:30:40 +0100184 return RESULT_ERROR
185
186
187class PortAllocator(object):
188 def __init__(self):
189 self._log = multiprocessing.get_logger()
190 self._lock = multiprocessing.Lock()
191 self._ports = set()
192 self._dom_ports = set()
193 self._last_alloc = 0
194
195 def _get_tcp_port(self):
196 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
197 sock.bind(('127.0.0.1', 0))
198 port = sock.getsockname()[1]
199 self._lock.acquire()
200 try:
201 ok = port not in self._ports
202 if ok:
203 self._ports.add(port)
204 self._last_alloc = time.time()
205 finally:
206 self._lock.release()
207 sock.close()
208 return port if ok else self._get_tcp_port()
209
210 def _get_domain_port(self):
211 port = random.randint(1024, 65536)
212 self._lock.acquire()
213 try:
214 ok = port not in self._dom_ports
215 if ok:
216 self._dom_ports.add(port)
217 finally:
218 self._lock.release()
219 return port if ok else self._get_domain_port()
220
221 def alloc_port(self, socket_type):
pavlodd08f6e2015-10-08 16:43:56 -0400222 if socket_type in ('domain', 'abstract'):
Roger Meier41ad4342015-03-24 22:30:40 +0100223 return self._get_domain_port()
224 else:
225 return self._get_tcp_port()
226
227 # static method for inter-process invokation
228 @staticmethod
229 @contextlib.contextmanager
230 def alloc_port_scoped(allocator, socket_type):
231 port = allocator.alloc_port(socket_type)
232 yield port
233 allocator.free_port(socket_type, port)
234
235 def free_port(self, socket_type, port):
236 self._log.debug('free_port')
237 self._lock.acquire()
238 try:
239 if socket_type == 'domain':
240 self._dom_ports.remove(port)
241 path = domain_socket_path(port)
242 if os.path.exists(path):
243 os.remove(path)
pavlodd08f6e2015-10-08 16:43:56 -0400244 elif socket_type == 'abstract':
245 self._dom_ports.remove(port)
Roger Meier41ad4342015-03-24 22:30:40 +0100246 else:
247 self._ports.remove(port)
248 except IOError as err:
249 self._log.info('Error while freeing port : %s' % str(err))
250 finally:
251 self._lock.release()
252
253
254class NonAsyncResult(object):
255 def __init__(self, value):
256 self._value = value
257
258 def get(self, timeout=None):
259 return self._value
260
261 def wait(self, timeout=None):
262 pass
263
264 def ready(self):
265 return True
266
267 def successful(self):
268 return self._value == 0
269
270
271class TestDispatcher(object):
Nobuaki Sukegawabd165302016-01-19 11:10:07 +0900272 def __init__(self, testdir, basedir, logdir_rel, concurrency):
Roger Meier41ad4342015-03-24 22:30:40 +0100273 self._log = multiprocessing.get_logger()
274 self.testdir = testdir
Nobuaki Sukegawabd165302016-01-19 11:10:07 +0900275 self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1)
276 self.logdir = self._report.testdir
Roger Meier41ad4342015-03-24 22:30:40 +0100277 # seems needed for python 2.x to handle keyboard interrupt
278 self._stop = multiprocessing.Event()
279 self._async = concurrency > 1
280 if not self._async:
281 self._pool = None
282 global stop
283 global ports
284 stop = self._stop
285 ports = PortAllocator()
286 else:
287 self._m = multiprocessing.managers.BaseManager()
288 self._m.register('ports', PortAllocator)
289 self._m.start()
290 self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
Roger Meier41ad4342015-03-24 22:30:40 +0100291 self._log.debug(
292 'TestDispatcher started with %d concurrent jobs' % concurrency)
293
294 def _pool_init(self, address):
295 global stop
296 global m
297 global ports
298 stop = self._stop
299 m = multiprocessing.managers.BaseManager(address)
300 m.connect()
301 ports = m.ports()
302
303 def _dispatch_sync(self, test, cont):
Nobuaki Sukegawa378b7272016-01-03 17:04:50 +0900304 r = run_test(self.testdir, self.logdir, test, False)
Roger Meier41ad4342015-03-24 22:30:40 +0100305 cont(r)
306 return NonAsyncResult(r)
307
308 def _dispatch_async(self, test, cont):
Nobuaki Sukegawa378b7272016-01-03 17:04:50 +0900309 self._log.debug('_dispatch_async')
310 return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test,), callback=cont)
Roger Meier41ad4342015-03-24 22:30:40 +0100311
312 def dispatch(self, test):
313 index = self._report.add_test(test)
314
315 def cont(r):
316 if not self._stop.is_set():
317 self._log.debug('freeing port')
318 self._log.debug('adding result')
319 self._report.add_result(index, r, r == RESULT_TIMEOUT)
320 self._log.debug('finish continuation')
321 fn = self._dispatch_async if self._async else self._dispatch_sync
322 return fn(test, cont)
323
324 def wait(self):
325 if self._async:
326 self._pool.close()
327 self._pool.join()
328 self._m.shutdown()
329 return self._report.end()
330
331 def terminate(self):
332 self._stop.set()
333 if self._async:
334 self._pool.terminate()
335 self._pool.join()
336 self._m.shutdown()