blob: 8de6ba92fa393ff4e68c5a7f0b23a53ed792752f [file] [log] [blame]
Roger Meier41ad4342015-03-24 22:30:40 +01001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20import contextlib
21import multiprocessing
22import multiprocessing.managers
23import os
Jens Geyeraad06de2015-11-21 14:43:56 +010024import sys
Roger Meier41ad4342015-03-24 22:30:40 +010025import platform
26import random
27import socket
28import signal
29import subprocess
30import threading
31import time
32import traceback
33
34from crossrunner.test import TestEntry, domain_socket_path
35from crossrunner.report import ExecReporter, SummaryReporter
36
37RESULT_TIMEOUT = 128
38RESULT_ERROR = 64
39
40
41class ExecutionContext(object):
42 def __init__(self, cmd, cwd, env, report):
43 self._log = multiprocessing.get_logger()
44 self.report = report
45 self.cmd = cmd
46 self.cwd = cwd
47 self.env = env
48 self.timer = None
49 self.expired = False
50
51 def _expire(self):
52 self._log.info('Timeout')
53 self.expired = True
54 self.kill()
55
56 def kill(self):
57 self._log.debug('Killing process : %d' % self.proc.pid)
58 if platform.system() != 'Windows':
59 try:
60 os.killpg(self.proc.pid, signal.SIGKILL)
61 except Exception as err:
62 self._log.info('Failed to kill process group : %s' % str(err))
63 try:
64 self.proc.kill()
65 except Exception as err:
66 self._log.info('Failed to kill process : %s' % str(err))
67 self.report.killed()
68
69 def _popen_args(self):
70 args = {
71 'cwd': self.cwd,
72 'env': self.env,
73 'stdout': self.report.out,
74 'stderr': subprocess.STDOUT,
75 }
76 # make sure child processes doesn't remain after killing
77 if platform.system() == 'Windows':
78 DETACHED_PROCESS = 0x00000008
79 args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
80 else:
81 args.update(preexec_fn=os.setsid)
82 return args
83
84 def start(self, timeout=0):
Jens Geyeraad06de2015-11-21 14:43:56 +010085 tmp = list()
86 joined = ''
87 for item in self.cmd:
88 tmp.append( item.decode(sys.getfilesystemencoding()))
89 joined = ' '.join(tmp).encode(sys.getfilesystemencoding())
90 self._log.debug('COMMAND: %s', joined)
Roger Meier41ad4342015-03-24 22:30:40 +010091 self._log.debug('WORKDIR: %s', self.cwd)
92 self._log.debug('LOGFILE: %s', self.report.logpath)
93 self.report.begin()
Jens Geyeraad06de2015-11-21 14:43:56 +010094 self.proc = subprocess.Popen(tmp, **self._popen_args())
Roger Meier41ad4342015-03-24 22:30:40 +010095 if timeout > 0:
96 self.timer = threading.Timer(timeout, self._expire)
97 self.timer.start()
98 return self._scoped()
99
100 @contextlib.contextmanager
101 def _scoped(self):
102 yield self
103 self._log.debug('Killing scoped process')
104 self.kill()
105
106 def wait(self):
107 self.proc.communicate()
108 if self.timer:
109 self.timer.cancel()
110 self.report.end(self.returncode)
111
112 @property
113 def returncode(self):
114 return self.proc.returncode if self.proc else None
115
116
117def exec_context(port, testdir, test, prog):
118 report = ExecReporter(testdir, test, prog)
119 prog.build_command(port)
120 return ExecutionContext(prog.command, prog.workdir, prog.env, report)
121
122
123def run_test(testdir, test_dict, async=True, max_retry=3):
124 try:
125 logger = multiprocessing.get_logger()
126 retry_count = 0
127 test = TestEntry(testdir, **test_dict)
128 while True:
129 if stop.is_set():
130 logger.debug('Skipping because shutting down')
131 return None
132 logger.debug('Start')
133 with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
134 logger.debug('Start with port %d' % port)
135 sv = exec_context(port, testdir, test, test.server)
136 cl = exec_context(port, testdir, test, test.client)
137
138 logger.debug('Starting server')
139 with sv.start():
140 if test.delay > 0:
141 logger.debug('Delaying client for %.2f seconds' % test.delay)
142 time.sleep(test.delay)
143 cl_retry_count = 0
144 cl_max_retry = 10
145 cl_retry_wait = 0.5
146 while True:
147 logger.debug('Starting client')
148 cl.start(test.timeout)
149 logger.debug('Waiting client')
150 cl.wait()
151 if not cl.report.maybe_false_positive() or cl_retry_count >= cl_max_retry:
152 if cl_retry_count > 0 and cl_retry_count < cl_max_retry:
153 logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, cl_retry_count, cl_retry_wait))
154 break
155 logger.debug('Server may not be ready, waiting %.2f second...' % cl_retry_wait)
156 time.sleep(cl_retry_wait)
157 cl_retry_count += 1
158
159 if not sv.report.maybe_false_positive() or retry_count >= max_retry:
160 logger.debug('Finish')
161 return RESULT_TIMEOUT if cl.expired else cl.proc.returncode
162 logger.warn('[%s]: Detected socket bind failure, retrying...' % test.server.name)
163 retry_count += 1
164 except (KeyboardInterrupt, SystemExit):
165 logger.info('Interrupted execution')
166 if not async:
167 raise
168 stop.set()
169 return None
170 except Exception as ex:
171 logger.warn('Error while executing test : %s' % str(ex))
172 if not async:
173 raise
174 logger.info(traceback.print_exc())
175 return RESULT_ERROR
176
177
178class PortAllocator(object):
179 def __init__(self):
180 self._log = multiprocessing.get_logger()
181 self._lock = multiprocessing.Lock()
182 self._ports = set()
183 self._dom_ports = set()
184 self._last_alloc = 0
185
186 def _get_tcp_port(self):
187 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
188 sock.bind(('127.0.0.1', 0))
189 port = sock.getsockname()[1]
190 self._lock.acquire()
191 try:
192 ok = port not in self._ports
193 if ok:
194 self._ports.add(port)
195 self._last_alloc = time.time()
196 finally:
197 self._lock.release()
198 sock.close()
199 return port if ok else self._get_tcp_port()
200
201 def _get_domain_port(self):
202 port = random.randint(1024, 65536)
203 self._lock.acquire()
204 try:
205 ok = port not in self._dom_ports
206 if ok:
207 self._dom_ports.add(port)
208 finally:
209 self._lock.release()
210 return port if ok else self._get_domain_port()
211
212 def alloc_port(self, socket_type):
pavlodd08f6e2015-10-08 16:43:56 -0400213 if socket_type in ('domain', 'abstract'):
Roger Meier41ad4342015-03-24 22:30:40 +0100214 return self._get_domain_port()
215 else:
216 return self._get_tcp_port()
217
218 # static method for inter-process invokation
219 @staticmethod
220 @contextlib.contextmanager
221 def alloc_port_scoped(allocator, socket_type):
222 port = allocator.alloc_port(socket_type)
223 yield port
224 allocator.free_port(socket_type, port)
225
226 def free_port(self, socket_type, port):
227 self._log.debug('free_port')
228 self._lock.acquire()
229 try:
230 if socket_type == 'domain':
231 self._dom_ports.remove(port)
232 path = domain_socket_path(port)
233 if os.path.exists(path):
234 os.remove(path)
pavlodd08f6e2015-10-08 16:43:56 -0400235 elif socket_type == 'abstract':
236 self._dom_ports.remove(port)
Roger Meier41ad4342015-03-24 22:30:40 +0100237 else:
238 self._ports.remove(port)
239 except IOError as err:
240 self._log.info('Error while freeing port : %s' % str(err))
241 finally:
242 self._lock.release()
243
244
245class NonAsyncResult(object):
246 def __init__(self, value):
247 self._value = value
248
249 def get(self, timeout=None):
250 return self._value
251
252 def wait(self, timeout=None):
253 pass
254
255 def ready(self):
256 return True
257
258 def successful(self):
259 return self._value == 0
260
261
262class TestDispatcher(object):
263 def __init__(self, testdir, concurrency):
264 self._log = multiprocessing.get_logger()
265 self.testdir = testdir
266 # seems needed for python 2.x to handle keyboard interrupt
267 self._stop = multiprocessing.Event()
268 self._async = concurrency > 1
269 if not self._async:
270 self._pool = None
271 global stop
272 global ports
273 stop = self._stop
274 ports = PortAllocator()
275 else:
276 self._m = multiprocessing.managers.BaseManager()
277 self._m.register('ports', PortAllocator)
278 self._m.start()
279 self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
280 self._report = SummaryReporter(testdir, concurrency > 1)
281 self._log.debug(
282 'TestDispatcher started with %d concurrent jobs' % concurrency)
283
284 def _pool_init(self, address):
285 global stop
286 global m
287 global ports
288 stop = self._stop
289 m = multiprocessing.managers.BaseManager(address)
290 m.connect()
291 ports = m.ports()
292
293 def _dispatch_sync(self, test, cont):
294 r = run_test(self.testdir, test, False)
295 cont(r)
296 return NonAsyncResult(r)
297
298 def _dispatch_async(self, test, cont):
299 return self._pool.apply_async(func=run_test, args=(self.testdir, test,), callback=cont)
300
301 def dispatch(self, test):
302 index = self._report.add_test(test)
303
304 def cont(r):
305 if not self._stop.is_set():
306 self._log.debug('freeing port')
307 self._log.debug('adding result')
308 self._report.add_result(index, r, r == RESULT_TIMEOUT)
309 self._log.debug('finish continuation')
310 fn = self._dispatch_async if self._async else self._dispatch_sync
311 return fn(test, cont)
312
313 def wait(self):
314 if self._async:
315 self._pool.close()
316 self._pool.join()
317 self._m.shutdown()
318 return self._report.end()
319
320 def terminate(self):
321 self._stop.set()
322 if self._async:
323 self._pool.terminate()
324 self._pool.join()
325 self._m.shutdown()