blob: acba33531a199ba802a066608fa1592253dd347b [file] [log] [blame]
Roger Meier41ad4342015-03-24 22:30:40 +01001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20import contextlib
21import multiprocessing
22import multiprocessing.managers
23import os
24import platform
25import random
Roger Meier41ad4342015-03-24 22:30:40 +010026import signal
Nobuaki Sukegawaa6ab1f52015-11-28 15:04:39 +090027import socket
Roger Meier41ad4342015-03-24 22:30:40 +010028import subprocess
Nobuaki Sukegawaa6ab1f52015-11-28 15:04:39 +090029import sys
Roger Meier41ad4342015-03-24 22:30:40 +010030import threading
31import time
Roger Meier41ad4342015-03-24 22:30:40 +010032
Nobuaki Sukegawa2de27002015-11-22 01:13:48 +090033from .compat import str_join
34from .test import TestEntry, domain_socket_path
35from .report import ExecReporter, SummaryReporter
Roger Meier41ad4342015-03-24 22:30:40 +010036
37RESULT_TIMEOUT = 128
38RESULT_ERROR = 64
39
40
41class ExecutionContext(object):
42 def __init__(self, cmd, cwd, env, report):
43 self._log = multiprocessing.get_logger()
44 self.report = report
45 self.cmd = cmd
46 self.cwd = cwd
47 self.env = env
48 self.timer = None
49 self.expired = False
50
51 def _expire(self):
52 self._log.info('Timeout')
53 self.expired = True
54 self.kill()
55
56 def kill(self):
57 self._log.debug('Killing process : %d' % self.proc.pid)
58 if platform.system() != 'Windows':
59 try:
60 os.killpg(self.proc.pid, signal.SIGKILL)
61 except Exception as err:
62 self._log.info('Failed to kill process group : %s' % str(err))
63 try:
64 self.proc.kill()
65 except Exception as err:
66 self._log.info('Failed to kill process : %s' % str(err))
67 self.report.killed()
68
69 def _popen_args(self):
70 args = {
71 'cwd': self.cwd,
72 'env': self.env,
73 'stdout': self.report.out,
74 'stderr': subprocess.STDOUT,
75 }
76 # make sure child processes doesn't remain after killing
77 if platform.system() == 'Windows':
78 DETACHED_PROCESS = 0x00000008
79 args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
80 else:
81 args.update(preexec_fn=os.setsid)
82 return args
83
84 def start(self, timeout=0):
Nobuaki Sukegawa2de27002015-11-22 01:13:48 +090085 joined = str_join(' ', self.cmd)
Jens Geyeraad06de2015-11-21 14:43:56 +010086 self._log.debug('COMMAND: %s', joined)
Roger Meier41ad4342015-03-24 22:30:40 +010087 self._log.debug('WORKDIR: %s', self.cwd)
88 self._log.debug('LOGFILE: %s', self.report.logpath)
89 self.report.begin()
Nobuaki Sukegawa2de27002015-11-22 01:13:48 +090090 self.proc = subprocess.Popen(self.cmd, **self._popen_args())
Roger Meier41ad4342015-03-24 22:30:40 +010091 if timeout > 0:
92 self.timer = threading.Timer(timeout, self._expire)
93 self.timer.start()
94 return self._scoped()
95
96 @contextlib.contextmanager
97 def _scoped(self):
98 yield self
99 self._log.debug('Killing scoped process')
100 self.kill()
101
102 def wait(self):
103 self.proc.communicate()
104 if self.timer:
105 self.timer.cancel()
106 self.report.end(self.returncode)
107
108 @property
109 def returncode(self):
110 return self.proc.returncode if self.proc else None
111
112
113def exec_context(port, testdir, test, prog):
114 report = ExecReporter(testdir, test, prog)
115 prog.build_command(port)
116 return ExecutionContext(prog.command, prog.workdir, prog.env, report)
117
118
119def run_test(testdir, test_dict, async=True, max_retry=3):
120 try:
121 logger = multiprocessing.get_logger()
122 retry_count = 0
123 test = TestEntry(testdir, **test_dict)
124 while True:
125 if stop.is_set():
126 logger.debug('Skipping because shutting down')
127 return None
128 logger.debug('Start')
129 with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
130 logger.debug('Start with port %d' % port)
131 sv = exec_context(port, testdir, test, test.server)
132 cl = exec_context(port, testdir, test, test.client)
133
134 logger.debug('Starting server')
135 with sv.start():
136 if test.delay > 0:
137 logger.debug('Delaying client for %.2f seconds' % test.delay)
138 time.sleep(test.delay)
139 cl_retry_count = 0
140 cl_max_retry = 10
141 cl_retry_wait = 0.5
142 while True:
143 logger.debug('Starting client')
144 cl.start(test.timeout)
145 logger.debug('Waiting client')
146 cl.wait()
147 if not cl.report.maybe_false_positive() or cl_retry_count >= cl_max_retry:
148 if cl_retry_count > 0 and cl_retry_count < cl_max_retry:
149 logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, cl_retry_count, cl_retry_wait))
150 break
151 logger.debug('Server may not be ready, waiting %.2f second...' % cl_retry_wait)
152 time.sleep(cl_retry_wait)
153 cl_retry_count += 1
154
155 if not sv.report.maybe_false_positive() or retry_count >= max_retry:
156 logger.debug('Finish')
157 return RESULT_TIMEOUT if cl.expired else cl.proc.returncode
158 logger.warn('[%s]: Detected socket bind failure, retrying...' % test.server.name)
159 retry_count += 1
160 except (KeyboardInterrupt, SystemExit):
161 logger.info('Interrupted execution')
162 if not async:
163 raise
164 stop.set()
165 return None
166 except Exception as ex:
Nobuaki Sukegawaa6ab1f52015-11-28 15:04:39 +0900167 logger.warn('%s', ex)
Roger Meier41ad4342015-03-24 22:30:40 +0100168 if not async:
169 raise
Nobuaki Sukegawaa6ab1f52015-11-28 15:04:39 +0900170 logger.debug('Error executing [%s]', test.name, exc_info=sys.exc_info())
Roger Meier41ad4342015-03-24 22:30:40 +0100171 return RESULT_ERROR
172
173
174class PortAllocator(object):
175 def __init__(self):
176 self._log = multiprocessing.get_logger()
177 self._lock = multiprocessing.Lock()
178 self._ports = set()
179 self._dom_ports = set()
180 self._last_alloc = 0
181
182 def _get_tcp_port(self):
183 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
184 sock.bind(('127.0.0.1', 0))
185 port = sock.getsockname()[1]
186 self._lock.acquire()
187 try:
188 ok = port not in self._ports
189 if ok:
190 self._ports.add(port)
191 self._last_alloc = time.time()
192 finally:
193 self._lock.release()
194 sock.close()
195 return port if ok else self._get_tcp_port()
196
197 def _get_domain_port(self):
198 port = random.randint(1024, 65536)
199 self._lock.acquire()
200 try:
201 ok = port not in self._dom_ports
202 if ok:
203 self._dom_ports.add(port)
204 finally:
205 self._lock.release()
206 return port if ok else self._get_domain_port()
207
208 def alloc_port(self, socket_type):
pavlodd08f6e2015-10-08 16:43:56 -0400209 if socket_type in ('domain', 'abstract'):
Roger Meier41ad4342015-03-24 22:30:40 +0100210 return self._get_domain_port()
211 else:
212 return self._get_tcp_port()
213
214 # static method for inter-process invokation
215 @staticmethod
216 @contextlib.contextmanager
217 def alloc_port_scoped(allocator, socket_type):
218 port = allocator.alloc_port(socket_type)
219 yield port
220 allocator.free_port(socket_type, port)
221
222 def free_port(self, socket_type, port):
223 self._log.debug('free_port')
224 self._lock.acquire()
225 try:
226 if socket_type == 'domain':
227 self._dom_ports.remove(port)
228 path = domain_socket_path(port)
229 if os.path.exists(path):
230 os.remove(path)
pavlodd08f6e2015-10-08 16:43:56 -0400231 elif socket_type == 'abstract':
232 self._dom_ports.remove(port)
Roger Meier41ad4342015-03-24 22:30:40 +0100233 else:
234 self._ports.remove(port)
235 except IOError as err:
236 self._log.info('Error while freeing port : %s' % str(err))
237 finally:
238 self._lock.release()
239
240
241class NonAsyncResult(object):
242 def __init__(self, value):
243 self._value = value
244
245 def get(self, timeout=None):
246 return self._value
247
248 def wait(self, timeout=None):
249 pass
250
251 def ready(self):
252 return True
253
254 def successful(self):
255 return self._value == 0
256
257
258class TestDispatcher(object):
259 def __init__(self, testdir, concurrency):
260 self._log = multiprocessing.get_logger()
261 self.testdir = testdir
262 # seems needed for python 2.x to handle keyboard interrupt
263 self._stop = multiprocessing.Event()
264 self._async = concurrency > 1
265 if not self._async:
266 self._pool = None
267 global stop
268 global ports
269 stop = self._stop
270 ports = PortAllocator()
271 else:
272 self._m = multiprocessing.managers.BaseManager()
273 self._m.register('ports', PortAllocator)
274 self._m.start()
275 self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
276 self._report = SummaryReporter(testdir, concurrency > 1)
277 self._log.debug(
278 'TestDispatcher started with %d concurrent jobs' % concurrency)
279
280 def _pool_init(self, address):
281 global stop
282 global m
283 global ports
284 stop = self._stop
285 m = multiprocessing.managers.BaseManager(address)
286 m.connect()
287 ports = m.ports()
288
289 def _dispatch_sync(self, test, cont):
290 r = run_test(self.testdir, test, False)
291 cont(r)
292 return NonAsyncResult(r)
293
294 def _dispatch_async(self, test, cont):
295 return self._pool.apply_async(func=run_test, args=(self.testdir, test,), callback=cont)
296
297 def dispatch(self, test):
298 index = self._report.add_test(test)
299
300 def cont(r):
301 if not self._stop.is_set():
302 self._log.debug('freeing port')
303 self._log.debug('adding result')
304 self._report.add_result(index, r, r == RESULT_TIMEOUT)
305 self._log.debug('finish continuation')
306 fn = self._dispatch_async if self._async else self._dispatch_sync
307 return fn(test, cont)
308
309 def wait(self):
310 if self._async:
311 self._pool.close()
312 self._pool.join()
313 self._m.shutdown()
314 return self._report.end()
315
316 def terminate(self):
317 self._stop.set()
318 if self._async:
319 self._pool.terminate()
320 self._pool.join()
321 self._m.shutdown()