blob: e3300bad8191c6ccd9a4a20bdf0c690cfb34ce29 [file] [log] [blame]
Roger Meier41ad4342015-03-24 22:30:40 +01001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20import contextlib
21import multiprocessing
22import multiprocessing.managers
23import os
24import platform
25import random
26import socket
27import signal
28import subprocess
29import threading
30import time
31import traceback
32
33from crossrunner.test import TestEntry, domain_socket_path
34from crossrunner.report import ExecReporter, SummaryReporter
35
36RESULT_TIMEOUT = 128
37RESULT_ERROR = 64
38
39
40class ExecutionContext(object):
41 def __init__(self, cmd, cwd, env, report):
42 self._log = multiprocessing.get_logger()
43 self.report = report
44 self.cmd = cmd
45 self.cwd = cwd
46 self.env = env
47 self.timer = None
48 self.expired = False
49
50 def _expire(self):
51 self._log.info('Timeout')
52 self.expired = True
53 self.kill()
54
55 def kill(self):
56 self._log.debug('Killing process : %d' % self.proc.pid)
57 if platform.system() != 'Windows':
58 try:
59 os.killpg(self.proc.pid, signal.SIGKILL)
60 except Exception as err:
61 self._log.info('Failed to kill process group : %s' % str(err))
62 try:
63 self.proc.kill()
64 except Exception as err:
65 self._log.info('Failed to kill process : %s' % str(err))
66 self.report.killed()
67
68 def _popen_args(self):
69 args = {
70 'cwd': self.cwd,
71 'env': self.env,
72 'stdout': self.report.out,
73 'stderr': subprocess.STDOUT,
74 }
75 # make sure child processes doesn't remain after killing
76 if platform.system() == 'Windows':
77 DETACHED_PROCESS = 0x00000008
78 args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
79 else:
80 args.update(preexec_fn=os.setsid)
81 return args
82
83 def start(self, timeout=0):
84 self._log.debug('COMMAND: %s', ' '.join(self.cmd))
85 self._log.debug('WORKDIR: %s', self.cwd)
86 self._log.debug('LOGFILE: %s', self.report.logpath)
87 self.report.begin()
88 self.proc = subprocess.Popen(self.cmd, **self._popen_args())
89 if timeout > 0:
90 self.timer = threading.Timer(timeout, self._expire)
91 self.timer.start()
92 return self._scoped()
93
94 @contextlib.contextmanager
95 def _scoped(self):
96 yield self
97 self._log.debug('Killing scoped process')
98 self.kill()
99
100 def wait(self):
101 self.proc.communicate()
102 if self.timer:
103 self.timer.cancel()
104 self.report.end(self.returncode)
105
106 @property
107 def returncode(self):
108 return self.proc.returncode if self.proc else None
109
110
111def exec_context(port, testdir, test, prog):
112 report = ExecReporter(testdir, test, prog)
113 prog.build_command(port)
114 return ExecutionContext(prog.command, prog.workdir, prog.env, report)
115
116
117def run_test(testdir, test_dict, async=True, max_retry=3):
118 try:
119 logger = multiprocessing.get_logger()
120 retry_count = 0
121 test = TestEntry(testdir, **test_dict)
122 while True:
123 if stop.is_set():
124 logger.debug('Skipping because shutting down')
125 return None
126 logger.debug('Start')
127 with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
128 logger.debug('Start with port %d' % port)
129 sv = exec_context(port, testdir, test, test.server)
130 cl = exec_context(port, testdir, test, test.client)
131
132 logger.debug('Starting server')
133 with sv.start():
134 if test.delay > 0:
135 logger.debug('Delaying client for %.2f seconds' % test.delay)
136 time.sleep(test.delay)
137 cl_retry_count = 0
138 cl_max_retry = 10
139 cl_retry_wait = 0.5
140 while True:
141 logger.debug('Starting client')
142 cl.start(test.timeout)
143 logger.debug('Waiting client')
144 cl.wait()
145 if not cl.report.maybe_false_positive() or cl_retry_count >= cl_max_retry:
146 if cl_retry_count > 0 and cl_retry_count < cl_max_retry:
147 logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, cl_retry_count, cl_retry_wait))
148 break
149 logger.debug('Server may not be ready, waiting %.2f second...' % cl_retry_wait)
150 time.sleep(cl_retry_wait)
151 cl_retry_count += 1
152
153 if not sv.report.maybe_false_positive() or retry_count >= max_retry:
154 logger.debug('Finish')
155 return RESULT_TIMEOUT if cl.expired else cl.proc.returncode
156 logger.warn('[%s]: Detected socket bind failure, retrying...' % test.server.name)
157 retry_count += 1
158 except (KeyboardInterrupt, SystemExit):
159 logger.info('Interrupted execution')
160 if not async:
161 raise
162 stop.set()
163 return None
164 except Exception as ex:
165 logger.warn('Error while executing test : %s' % str(ex))
166 if not async:
167 raise
168 logger.info(traceback.print_exc())
169 return RESULT_ERROR
170
171
172class PortAllocator(object):
173 def __init__(self):
174 self._log = multiprocessing.get_logger()
175 self._lock = multiprocessing.Lock()
176 self._ports = set()
177 self._dom_ports = set()
178 self._last_alloc = 0
179
180 def _get_tcp_port(self):
181 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
182 sock.bind(('127.0.0.1', 0))
183 port = sock.getsockname()[1]
184 self._lock.acquire()
185 try:
186 ok = port not in self._ports
187 if ok:
188 self._ports.add(port)
189 self._last_alloc = time.time()
190 finally:
191 self._lock.release()
192 sock.close()
193 return port if ok else self._get_tcp_port()
194
195 def _get_domain_port(self):
196 port = random.randint(1024, 65536)
197 self._lock.acquire()
198 try:
199 ok = port not in self._dom_ports
200 if ok:
201 self._dom_ports.add(port)
202 finally:
203 self._lock.release()
204 return port if ok else self._get_domain_port()
205
206 def alloc_port(self, socket_type):
207 if socket_type == 'domain':
208 return self._get_domain_port()
209 else:
210 return self._get_tcp_port()
211
212 # static method for inter-process invokation
213 @staticmethod
214 @contextlib.contextmanager
215 def alloc_port_scoped(allocator, socket_type):
216 port = allocator.alloc_port(socket_type)
217 yield port
218 allocator.free_port(socket_type, port)
219
220 def free_port(self, socket_type, port):
221 self._log.debug('free_port')
222 self._lock.acquire()
223 try:
224 if socket_type == 'domain':
225 self._dom_ports.remove(port)
226 path = domain_socket_path(port)
227 if os.path.exists(path):
228 os.remove(path)
229 else:
230 self._ports.remove(port)
231 except IOError as err:
232 self._log.info('Error while freeing port : %s' % str(err))
233 finally:
234 self._lock.release()
235
236
237class NonAsyncResult(object):
238 def __init__(self, value):
239 self._value = value
240
241 def get(self, timeout=None):
242 return self._value
243
244 def wait(self, timeout=None):
245 pass
246
247 def ready(self):
248 return True
249
250 def successful(self):
251 return self._value == 0
252
253
254class TestDispatcher(object):
255 def __init__(self, testdir, concurrency):
256 self._log = multiprocessing.get_logger()
257 self.testdir = testdir
258 # seems needed for python 2.x to handle keyboard interrupt
259 self._stop = multiprocessing.Event()
260 self._async = concurrency > 1
261 if not self._async:
262 self._pool = None
263 global stop
264 global ports
265 stop = self._stop
266 ports = PortAllocator()
267 else:
268 self._m = multiprocessing.managers.BaseManager()
269 self._m.register('ports', PortAllocator)
270 self._m.start()
271 self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
272 self._report = SummaryReporter(testdir, concurrency > 1)
273 self._log.debug(
274 'TestDispatcher started with %d concurrent jobs' % concurrency)
275
276 def _pool_init(self, address):
277 global stop
278 global m
279 global ports
280 stop = self._stop
281 m = multiprocessing.managers.BaseManager(address)
282 m.connect()
283 ports = m.ports()
284
285 def _dispatch_sync(self, test, cont):
286 r = run_test(self.testdir, test, False)
287 cont(r)
288 return NonAsyncResult(r)
289
290 def _dispatch_async(self, test, cont):
291 return self._pool.apply_async(func=run_test, args=(self.testdir, test,), callback=cont)
292
293 def dispatch(self, test):
294 index = self._report.add_test(test)
295
296 def cont(r):
297 if not self._stop.is_set():
298 self._log.debug('freeing port')
299 self._log.debug('adding result')
300 self._report.add_result(index, r, r == RESULT_TIMEOUT)
301 self._log.debug('finish continuation')
302 fn = self._dispatch_async if self._async else self._dispatch_sync
303 return fn(test, cont)
304
305 def wait(self):
306 if self._async:
307 self._pool.close()
308 self._pool.join()
309 self._m.shutdown()
310 return self._report.end()
311
312 def terminate(self):
313 self._stop.set()
314 if self._async:
315 self._pool.terminate()
316 self._pool.join()
317 self._m.shutdown()