THRIFT-4515: cross server test improvement: graceful test server shutdown
This closes #1509
diff --git a/build/cmake/ConfigureChecks.cmake b/build/cmake/ConfigureChecks.cmake
index e4793d4..6b9c6a3 100644
--- a/build/cmake/ConfigureChecks.cmake
+++ b/build/cmake/ConfigureChecks.cmake
@@ -34,6 +34,7 @@
check_include_file(inttypes.h HAVE_INTTYPES_H)
check_include_file(netdb.h HAVE_NETDB_H)
check_include_file(netinet/in.h HAVE_NETINET_IN_H)
+check_include_file(signal.h HAVE_SIGNAL_H)
check_include_file(stdint.h HAVE_STDINT_H)
check_include_file(unistd.h HAVE_UNISTD_H)
check_include_file(pthread.h HAVE_PTHREAD_H)
diff --git a/build/cmake/config.h.in b/build/cmake/config.h.in
index 21561b2..c5d4d30 100644
--- a/build/cmake/config.h.in
+++ b/build/cmake/config.h.in
@@ -91,6 +91,9 @@
/* Define to 1 if you have the <netinet/in.h> header file. */
#cmakedefine HAVE_NETINET_IN_H 1
+/* Define to 1 if you have the <signal.h> header file. */
+#cmakedefine HAVE_SIGNAL_H 1
+
/* Define to 1 if you have the <stdint.h> header file. */
#cmakedefine HAVE_STDINT_H 1
diff --git a/build/docker/scripts/autotools.sh b/build/docker/scripts/autotools.sh
index 67e4f2d..8388f72 100755
--- a/build/docker/scripts/autotools.sh
+++ b/build/docker/scripts/autotools.sh
@@ -1,10 +1,6 @@
#!/bin/sh
set -ev
-# haxe hxcpp > 3.4.188 will enable c++11 by default, and break the
-# build when compiling C files with clang++ by adding -std=c++11
-export HXCPP_NO_CPP11=1
-
./bootstrap.sh
./configure $*
make check -j3
diff --git a/configure.ac b/configure.ac
index 9efc28b..917f6fa 100755
--- a/configure.ac
+++ b/configure.ac
@@ -633,6 +633,7 @@
AC_CHECK_HEADERS([netdb.h])
AC_CHECK_HEADERS([netinet/in.h])
AC_CHECK_HEADERS([pthread.h])
+AC_CHECK_HEADERS([signal.h])
AC_CHECK_HEADERS([stddef.h])
AC_CHECK_HEADERS([stdlib.h])
AC_CHECK_HEADERS([sys/ioctl.h])
diff --git a/lib/cpp/src/thrift/TOutput.cpp b/lib/cpp/src/thrift/TOutput.cpp
index bb46263..ae3a9e2 100644
--- a/lib/cpp/src/thrift/TOutput.cpp
+++ b/lib/cpp/src/thrift/TOutput.cpp
@@ -94,7 +94,7 @@
}
void TOutput::perror(const char* message, int errno_copy) {
- std::string out = message + strerror_s(errno_copy);
+ std::string out = message + std::string(": ") + strerror_s(errno_copy);
f_(out.c_str());
}
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index e60bffc..f89b5f7 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -659,6 +659,7 @@
return;
}
}
+ // fallthrough
// Intentionally fall through here, the call to process has written into
// the writeBuffer_
@@ -707,6 +708,7 @@
server_->getIdleWriteBufferLimit());
callsForResize_ = 0;
}
+ // fallthrough
// N.B.: We also intentionally fall through here into the INIT state!
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
index 7bdacb0..7071698 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
@@ -304,6 +304,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
// in the case of SSL_ERROR_SYSCALL we want to wait for an read event again
@@ -350,6 +351,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
// in the case of SSL_ERROR_SYSCALL we want to wait for an write/read event again
@@ -415,6 +417,8 @@
// a certain number
break;
}
+ // fallthrough
+
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
@@ -471,6 +475,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
@@ -515,6 +520,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
@@ -602,6 +608,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
@@ -634,6 +641,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp
index c90593d..18cadbc 100644
--- a/lib/cpp/src/thrift/transport/TSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSocket.cpp
@@ -809,11 +809,15 @@
string TSocket::getSocketInfo() {
std::ostringstream oss;
- if (host_.empty() || port_ == 0) {
- oss << "<Host: " << getPeerAddress();
- oss << " Port: " << getPeerPort() << ">";
+ if (path_.empty()) {
+ if (host_.empty() || port_ == 0) {
+ oss << "<Host: " << getPeerAddress();
+ oss << " Port: " << getPeerPort() << ">";
+ } else {
+ oss << "<Host: " << host_ << " Port: " << port_ << ">";
+ }
} else {
- oss << "<Host: " << host_ << " Port: " << port_ << ">";
+ oss << "<Path: " << path_ << ">";
}
return oss.str();
}
diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h
index 3779b0d..1c52c47 100644
--- a/lib/cpp/test/concurrency/TimerManagerTests.h
+++ b/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -79,14 +79,13 @@
= shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
{
-
TimerManager timerManager;
-
timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
-
timerManager.start();
-
- assert(timerManager.state() == TimerManager::STARTED);
+ if (timerManager.state() != TimerManager::STARTED) {
+ std::cerr << "timerManager is not in the STARTED state, but should be" << std::endl;
+ return false;
+ }
// Don't create task yet, because its constructor sets the expected completion time, and we
// need to delay between inserting the two tasks into the run queue.
@@ -94,34 +93,27 @@
{
Synchronized s(_monitor);
-
timerManager.add(orphanTask, 10 * timeout);
- try {
- // Wait for 1 second in order to give timerManager a chance to start sleeping in response
- // to adding orphanTask. We need to do this so we can verify that adding the second task
- // kicks the dispatcher out of the current wait and starts the new 1 second wait.
- _monitor.wait(1000);
- assert(
- 0 == "ERROR: This wait should time out. TimerManager dispatcher may have a problem.");
- } catch (TimedOutException&) {
- }
+ THRIFT_SLEEP_USEC(timeout * 1000);
task.reset(new TimerManagerTests::Task(_monitor, timeout));
-
timerManager.add(task, timeout);
-
_monitor.wait();
}
- assert(task->_done);
+ if (!task->_done) {
+ std::cerr << "task is not done, but it should have executed" << std::endl;
+ return false;
+ }
std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
}
- // timerManager.stop(); This is where it happens via destructor
-
- assert(!orphanTask->_done);
+ if (orphanTask->_done) {
+ std::cerr << "orphan task is done, but it should not have executed" << std::endl;
+ return false;
+ }
return true;
}
diff --git a/lib/d/test/thrift_test_server.d b/lib/d/test/thrift_test_server.d
index 71ab917..b582253 100644
--- a/lib/d/test/thrift_test_server.d
+++ b/lib/d/test/thrift_test_server.d
@@ -16,8 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
module thrift_test_server;
+import core.stdc.errno : errno;
+import core.stdc.signal : signal, sigfn_t, SIGINT, SIG_DFL, SIG_ERR;
import core.thread : dur, Thread;
import std.algorithm;
import std.exception : enforce;
@@ -40,6 +43,7 @@
import thrift.transport.framed;
import thrift.transport.http;
import thrift.transport.ssl;
+import thrift.util.cancellation;
import thrift.util.hashset;
import test_utils;
@@ -205,14 +209,44 @@
bool trace_;
}
+shared(bool) gShutdown = false;
+
+nothrow @nogc extern(C) void handleSignal(int sig) {
+ gShutdown = true;
+}
+
+// Runs a thread that waits for shutdown to be
+// signaled and then triggers cancellation,
+// causing the server to stop. While we could
+// use a signalfd for this purpose, we are instead
+// opting for a busy waiting scheme for maximum
+// portability since signalfd is a linux thing.
+
+class ShutdownThread : Thread {
+ this(TCancellationOrigin cancellation) {
+ cancellation_ = cancellation;
+ super(&run);
+ }
+
+private:
+ void run() {
+ while (!gShutdown) {
+ Thread.sleep(dur!("msecs")(25));
+ }
+ cancellation_.trigger();
+ }
+
+ TCancellationOrigin cancellation_;
+}
+
void main(string[] args) {
ushort port = 9090;
ServerType serverType;
ProtocolType protocolType;
size_t numIOThreads = 1;
TransportType transportType;
- bool ssl;
- bool trace;
+ bool ssl = false;
+ bool trace = true;
size_t taskPoolSize = totalCPUs;
getopt(args, "port", &port, "protocol", &protocolType, "server-type",
@@ -279,8 +313,26 @@
auto server = createServer(serverType, numIOThreads, taskPoolSize,
processor, serverSocket, transportFactory, protocolFactory);
+ // Set up SIGINT signal handling
+ sigfn_t oldHandler = signal(SIGINT, &handleSignal);
+ enforce(oldHandler != SIG_ERR,
+ "Could not replace the SIGINT signal handler: errno {0}".format(errno()));
+
+ // Set up a server cancellation trigger
+ auto cancel = new TCancellationOrigin();
+
+ // Set up a listener for the shutdown condition - this will
+ // wake up when the signal occurs and trigger cancellation.
+ auto shutdown = new ShutdownThread(cancel);
+ shutdown.start();
+
+ // Serve from this thread; the signal will stop the server
+ // and control will return here
writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType,
transportType, serverType, ssl ? "(using SSL) ": "", port);
- server.serve();
+ server.serve(cancel);
+ shutdown.join();
+ signal(SIGINT, SIG_DFL);
+
writeln("done.");
}
diff --git a/lib/perl/lib/Thrift/Server.pm b/lib/perl/lib/Thrift/Server.pm
index fc9ca30..f265d45 100644
--- a/lib/perl/lib/Thrift/Server.pm
+++ b/lib/perl/lib/Thrift/Server.pm
@@ -150,27 +150,31 @@
sub serve
{
my $self = shift;
-
+ my $stop = 0;
+
$self->{serverTransport}->listen();
- while (1)
- {
+ while (!$stop) {
my $client = $self->{serverTransport}->accept();
- my $itrans = $self->{inputTransportFactory}->getTransport($client);
- my $otrans = $self->{outputTransportFactory}->getTransport($client);
- my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans);
- my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans);
- eval {
- $self->_clientBegin($iprot, $oprot);
- while (1)
- {
- $self->{processor}->process($iprot, $oprot);
+ if (defined $client) {
+ my $itrans = $self->{inputTransportFactory}->getTransport($client);
+ my $otrans = $self->{outputTransportFactory}->getTransport($client);
+ my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans);
+ my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans);
+ eval {
+ $self->_clientBegin($iprot, $oprot);
+ while (1)
+ {
+ $self->{processor}->process($iprot, $oprot);
+ }
+ }; if($@) {
+ $self->_handleException($@);
}
- }; if($@) {
- $self->_handleException($@);
- }
- $itrans->close();
- $otrans->close();
+ $itrans->close();
+ $otrans->close();
+ } else {
+ $stop = 1;
+ }
}
}
diff --git a/lib/perl/lib/Thrift/ServerSocket.pm b/lib/perl/lib/Thrift/ServerSocket.pm
index 51f83b4..2c4d906 100644
--- a/lib/perl/lib/Thrift/ServerSocket.pm
+++ b/lib/perl/lib/Thrift/ServerSocket.pm
@@ -81,15 +81,24 @@
{
my $self = shift;
- if ( exists $self->{handle} and defined $self->{handle} )
- {
+ if ( exists $self->{handle} and defined $self->{handle} ) {
my $client = $self->{handle}->accept();
my $result = $self->__client();
$result->{handle} = new IO::Select($client);
return $result;
}
- return 0;
+ return undef;
+}
+
+sub close
+{
+ my $self = shift;
+
+ if ( exists $self->{handle} and defined $self->{handle} )
+ {
+ $self->{handle}->close();
+ }
}
###
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index b93e5ea..1c38124 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -49,6 +49,9 @@
#ifdef HAVE_INTTYPES_H
#include <inttypes.h>
#endif
+#ifdef HAVE_SIGNAL_H
+#include <signal.h>
+#endif
#include <iostream>
#include <stdexcept>
@@ -59,7 +62,6 @@
#include <boost/filesystem.hpp>
#include <thrift/stdcxx.h>
-#include <signal.h>
#if _WIN32
#include <thrift/windows/TWinsockSingleton.h>
#endif
@@ -75,6 +77,17 @@
using namespace thrift::test;
+// to handle a controlled shutdown, signal handling is mandatory
+#ifdef HAVE_SIGNAL_H
+apache::thrift::concurrency::Monitor gMonitor;
+void signal_handler(int signum)
+{
+ if (signum == SIGINT) {
+ gMonitor.notifyAll();
+ }
+}
+#endif
+
class TestHandler : public ThriftTestIf {
public:
TestHandler() {}
@@ -635,6 +648,12 @@
ssl = true;
}
+#if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
+ if (ssl) {
+ signal(SIGPIPE, SIG_IGN); // for OpenSSL, otherwise we end abruptly
+ }
+#endif
+
if (vm.count("abstract-namespace")) {
abstract_namespace = true;
}
@@ -770,14 +789,14 @@
TEvhttpServer nonblockingServer(testBufferProcessor, port);
nonblockingServer.serve();
} else if (transport_type == "framed") {
- stdcxx::shared_ptr<transport::TNonblockingServerTransport> nbSocket;
- nbSocket.reset(
- ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory)
- : new transport::TNonblockingServerSocket(port));
+ stdcxx::shared_ptr<transport::TNonblockingServerTransport> nbSocket;
+ nbSocket.reset(
+ ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory)
+ : new transport::TNonblockingServerSocket(port));
server.reset(new TNonblockingServer(testProcessor, protocolFactory, nbSocket));
} else {
- cerr << "server-type nonblocking requires transport of http or framed" << endl;
- exit(1);
+ cerr << "server-type nonblocking requires transport of http or framed" << endl;
+ exit(1);
}
}
@@ -787,18 +806,23 @@
// if using header
server->setOutputProtocolFactory(stdcxx::shared_ptr<TProtocolFactory>());
}
+
apache::thrift::concurrency::PlatformThreadFactory factory;
factory.setDetached(false);
stdcxx::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server);
stdcxx::shared_ptr<apache::thrift::concurrency::Thread> thread
= factory.newThread(serverThreadRunner);
- thread->start();
- // THRIFT-4515: this needs to be improved
- while (1) {
- THRIFT_SLEEP_SEC(1); // do something other than chew up CPU like crazy
- }
- // NOTREACHED
+#ifdef HAVE_SIGNAL_H
+ signal(SIGINT, signal_handler);
+#endif
+
+ thread->start();
+ gMonitor.waitForever(); // wait for a shutdown signal
+
+#ifdef HAVE_SIGNAL_H
+ signal(SIGINT, SIG_DFL);
+#endif
server->stop();
thread->join();
@@ -808,3 +832,4 @@
cout << "done." << endl;
return 0;
}
+
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py
index 03b0c36..e2d8978 100644
--- a/test/crossrunner/collect.py
+++ b/test/crossrunner/collect.py
@@ -51,6 +51,7 @@
]
DEFAULT_MAX_DELAY = 5
+DEFAULT_SIGNAL = 1
DEFAULT_TIMEOUT = 5
@@ -112,7 +113,7 @@
yield name, impl1, impl2
def maybe_max(key, o1, o2, default):
- """maximum of two if present, otherwise defult value"""
+ """maximum of two if present, otherwise default value"""
v1 = o1.get(key)
v2 = o2.get(key)
return max(v1, v2) if v1 and v2 else v1 or v2 or default
@@ -138,6 +139,7 @@
'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}),
'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}),
'delay': maybe_max('delay', sv, cl, DEFAULT_MAX_DELAY),
+ 'stop_signal': maybe_max('stop_signal', sv, cl, DEFAULT_SIGNAL),
'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT),
'protocol': proto,
'transport': trans,
diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py
index 76324ed..75f36db 100644
--- a/test/crossrunner/report.py
+++ b/test/crossrunner/report.py
@@ -157,8 +157,10 @@
])),
'client': list(map(re.compile, [
'[Cc]onnection refused',
- 'Could not connect to localhost',
+ 'Could not connect to',
'ECONNREFUSED',
+ 'econnrefused', # erl
+ 'CONNECTION-REFUSED-ERROR', # cl
'No such file or directory', # domain socket
])),
}
@@ -174,6 +176,7 @@
def match(line):
for expr in exprs:
if expr.search(line):
+ self._log.info("maybe false positive: %s" % line)
return True
with logfile_open(self.logpath, 'r') as fp:
@@ -204,7 +207,7 @@
def _print_footer(self, returncode=None):
self._print_bar()
if returncode is not None:
- print('Return code: %d' % returncode, file=self.out)
+ print('Return code: %d (negative values indicate kill by signal)' % returncode, file=self.out)
else:
print('Process is killed.', file=self.out)
self._print_exec_time()
@@ -261,7 +264,8 @@
if not with_result:
return '{:24s}{:18s}{:25s}'.format(name[:23], test.protocol[:17], trans[:24])
else:
- return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17], trans[:24], self._result_string(test))
+ return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17],
+ trans[:24], self._result_string(test))
def _print_test_header(self):
self._print_bar()
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index f522bb1..25c58ce 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -23,19 +23,20 @@
import os
import platform
import random
-import signal
import socket
import subprocess
import sys
-import threading
import time
from .compat import str_join
-from .test import TestEntry, domain_socket_path
from .report import ExecReporter, SummaryReporter
+from .test import TestEntry
+from .util import domain_socket_path
-RESULT_TIMEOUT = 128
RESULT_ERROR = 64
+RESULT_TIMEOUT = 128
+SIGNONE = 0
+SIGKILL = 15
# globals
ports = None
@@ -43,35 +44,18 @@
class ExecutionContext(object):
- def __init__(self, cmd, cwd, env, report):
+ def __init__(self, cmd, cwd, env, stop_signal, is_server, report):
self._log = multiprocessing.get_logger()
- self.report = report
self.cmd = cmd
self.cwd = cwd
self.env = env
- self.timer = None
+ self.stop_signal = stop_signal
+ self.is_server = is_server
+ self.report = report
self.expired = False
self.killed = False
self.proc = None
- def _expire(self):
- self._log.info('Timeout')
- self.expired = True
- self.kill()
-
- def kill(self):
- self._log.debug('Killing process : %d' % self.proc.pid)
- self.killed = True
- if platform.system() != 'Windows':
- try:
- os.killpg(self.proc.pid, signal.SIGKILL)
- except Exception:
- self._log.info('Failed to kill process group', exc_info=sys.exc_info())
- try:
- self.proc.kill()
- except Exception:
- self._log.info('Failed to kill process', exc_info=sys.exc_info())
-
def _popen_args(self):
args = {
'cwd': self.cwd,
@@ -87,75 +71,125 @@
args.update(preexec_fn=os.setsid)
return args
- def start(self, timeout=0):
+ def start(self):
joined = str_join(' ', self.cmd)
self._log.debug('COMMAND: %s', joined)
self._log.debug('WORKDIR: %s', self.cwd)
self._log.debug('LOGFILE: %s', self.report.logpath)
self.report.begin()
self.proc = subprocess.Popen(self.cmd, **self._popen_args())
- if timeout > 0:
- self.timer = threading.Timer(timeout, self._expire)
- self.timer.start()
+ self._log.debug(' PID: %d', self.proc.pid)
+ self._log.debug(' PGID: %d', os.getpgid(self.proc.pid))
return self._scoped()
@contextlib.contextmanager
def _scoped(self):
yield self
- self._log.debug('Killing scoped process')
- if self.proc.poll() is None:
- self.kill()
- self.report.killed()
+ if self.is_server:
+ # the server is supposed to run until we stop it
+ if self.returncode is not None:
+ self.report.died()
+ else:
+ if self.stop_signal != SIGNONE:
+ if self.sigwait(self.stop_signal):
+ self.report.end(self.returncode)
+ else:
+ self.report.killed()
+ else:
+ self.sigwait(SIGKILL)
else:
- self._log.debug('Process died unexpectedly')
- self.report.died()
+ # the client is supposed to exit normally
+ if self.returncode is not None:
+ self.report.end(self.returncode)
+ else:
+ self.sigwait(SIGKILL)
+ self.report.killed()
+ self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode))
- def wait(self):
- self.proc.communicate()
- if self.timer:
- self.timer.cancel()
- self.report.end(self.returncode)
+ # Send a signal to the process and then wait for it to end
+ # If the signal requested is SIGNONE, no signal is sent, and
+ # instead we just wait for the process to end; further if it
+ # does not end normally with SIGNONE, we mark it as expired.
+ # If the process fails to end and the signal is not SIGKILL,
+ # it re-runs with SIGKILL so that a real process kill occurs
+ # returns True if the process ended, False if it may not have
+ def sigwait(self, sig=SIGKILL, timeout=2):
+ try:
+ if sig != SIGNONE:
+ self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig))
+ if sig == SIGKILL:
+ self.killed = True
+ try:
+ if platform.system() != 'Windows':
+ os.killpg(os.getpgid(self.proc.pid), sig)
+ else:
+ self.proc.send_signal(sig)
+ except Exception:
+ self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info())
+ self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout))
+ self.proc.communicate(timeout=timeout)
+ self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode))
+ self.report.end(self.returncode)
+ return True
+ except subprocess.TimeoutExpired:
+ self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid))
+ if sig == SIGNONE:
+ self.expired = True
+ return False if sig == SIGKILL else self.sigwait(SIGKILL, 1)
+
+ # called on the client process to wait for it to end naturally
+ def wait(self, timeout):
+ self.sigwait(SIGNONE, timeout)
@property
def returncode(self):
return self.proc.returncode if self.proc else None
-def exec_context(port, logdir, test, prog):
+def exec_context(port, logdir, test, prog, is_server):
report = ExecReporter(logdir, test, prog)
prog.build_command(port)
- return ExecutionContext(prog.command, prog.workdir, prog.env, report)
+ return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report)
def run_test(testdir, logdir, test_dict, max_retry, async=True):
logger = multiprocessing.get_logger()
- def ensure_socket_open(proc, port, max_delay):
- sleeped = 0.1
- time.sleep(sleeped)
- sleep_step = 0.2
+ def ensure_socket_open(sv, port, test):
+ slept = 0.1
+ time.sleep(slept)
+ sleep_step = 0.1
while True:
- # Create sockets every iteration because refused sockets cannot be
- # reused on some systems.
- sock4 = socket.socket()
- sock6 = socket.socket(family=socket.AF_INET6)
- try:
- if sock4.connect_ex(('127.0.0.1', port)) == 0 \
- or sock6.connect_ex(('::1', port)) == 0:
- return True
- if proc.poll() is not None:
- logger.warn('server process is exited')
- return False
- if sleeped > max_delay:
- logger.warn('sleeped for %f seconds but server port is not open' % sleeped)
- return False
- time.sleep(sleep_step)
- sleeped += sleep_step
- finally:
- sock4.close()
- sock6.close()
- logger.debug('waited %f sec for server port open' % sleeped)
- return True
+ if slept > test.delay:
+ logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept))
+ return False
+ if test.socket == 'domain':
+ if not os.path.exists(domain_socket_path(port)):
+ logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+ time.sleep(sleep_step)
+ slept += sleep_step
+ elif test.socket == 'abstract':
+ return True
+ else:
+ # Create sockets every iteration because refused sockets cannot be
+ # reused on some systems.
+ sock4 = socket.socket()
+ sock6 = socket.socket(family=socket.AF_INET6)
+ try:
+ if sock4.connect_ex(('127.0.0.1', port)) == 0 \
+ or sock6.connect_ex(('::1', port)) == 0:
+ return True
+ if sv.proc.poll() is not None:
+ logger.warn('[{0}] server process is exited'.format(sv.proc.pid))
+ return False
+ logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+ time.sleep(sleep_step)
+ slept += sleep_step
+ finally:
+ sock4.close()
+ sock6.close()
+ logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept))
+ return True
try:
max_bind_retry = 3
@@ -169,31 +203,27 @@
logger.debug('Start')
with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
logger.debug('Start with port %d' % port)
- sv = exec_context(port, logdir, test, test.server)
- cl = exec_context(port, logdir, test, test.client)
+ sv = exec_context(port, logdir, test, test.server, True)
+ cl = exec_context(port, logdir, test, test.client, False)
logger.debug('Starting server')
with sv.start():
- if test.socket in ('domain', 'abstract'):
- time.sleep(0.1)
- port_ok = True
- else:
- port_ok = ensure_socket_open(sv.proc, port, test.delay)
+ port_ok = ensure_socket_open(sv, port, test)
if port_ok:
connect_retry_count = 0
- max_connect_retry = 3
- connect_retry_wait = 0.5
+ max_connect_retry = 12
+ connect_retry_wait = 0.25
while True:
if sv.proc.poll() is not None:
logger.info('not starting client because server process is absent')
break
logger.debug('Starting client')
- cl.start(test.timeout)
- logger.debug('Waiting client')
- cl.wait()
+ cl.start()
+ logger.debug('Waiting client (up to %d secs)' % test.timeout)
+ cl.wait(test.timeout)
if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
- logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
+ logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
# Wait for 50ms to see if server does not die at the end.
time.sleep(0.05)
break
@@ -205,12 +235,18 @@
logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
bind_retry_count += 1
else:
- if cl.expired:
- result = RESULT_TIMEOUT
+ result = RESULT_TIMEOUT if cl.expired else cl.returncode if cl.proc.poll() is not None else RESULT_ERROR
+
+ # For servers that handle a controlled shutdown by signal
+ # if they are killed, or return an error code, that is a
+ # problem. For servers that are not signal-aware, we simply
+ # kill them off; if we didn't kill them off, something else
+ # happened (crashed?)
+ if test.server.stop_signal != 0:
+ if sv.killed or sv.returncode > 0:
+ result |= RESULT_ERROR
else:
- result = cl.proc.returncode if cl.proc else RESULT_ERROR
if not sv.killed:
- # Server died without being killed.
result |= RESULT_ERROR
if result == 0 or retry_count >= max_retry:
diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py
index 74fd916..633e926 100644
--- a/test/crossrunner/test.py
+++ b/test/crossrunner/test.py
@@ -22,22 +22,20 @@
import os
import sys
from .compat import path_join
-from .util import merge_dict
-
-
-def domain_socket_path(port):
- return '/tmp/ThriftTest.thrift.%d' % port
+from .util import merge_dict, domain_socket_path
class TestProgram(object):
- def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None,
+ def __init__(self, kind, name, protocol, transport, socket, workdir, stop_signal, command, env=None,
extra_args=[], extra_args2=[], join_args=False, **kwargs):
+
self.kind = kind
self.name = name
self.protocol = protocol
self.transport = transport
self.socket = socket
self.workdir = workdir
+ self.stop_signal = stop_signal
self.command = None
self._base_command = self._fix_cmd_path(command)
if env:
diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py
index e2d195a..c214df8 100644
--- a/test/crossrunner/util.py
+++ b/test/crossrunner/util.py
@@ -20,6 +20,10 @@
import copy
+def domain_socket_path(port):
+ return '/tmp/ThriftTest.thrift.%d' % port
+
+
def merge_dict(base, update):
"""Update dict concatenating list values"""
res = copy.deepcopy(base)
diff --git a/test/perl/TestServer.pl b/test/perl/TestServer.pl
index 7d8f929..e8c1cfa 100644
--- a/test/perl/TestServer.pl
+++ b/test/perl/TestServer.pl
@@ -26,6 +26,8 @@
use Getopt::Long qw(GetOptions);
use Time::HiRes qw(gettimeofday);
+$SIG{INT} = \&sigint_handler;
+
use lib '../../lib/perl/lib';
use lib 'gen-perl';
@@ -146,6 +148,12 @@
my $server = new Thrift::SimpleServer($processor, $serversocket, $transport, $protocol);
print "Starting \"simple\" server ($opts{transport}/$opts{protocol}) listen on: $listening_on\n";
$server->serve();
+print "done.\n";
+
+sub sigint_handler {
+ print "received SIGINT, stopping...\n";
+ $server->stop();
+}
###
### Test server implementation
diff --git a/test/test.py b/test/test.py
index 5a015ea..24e7c4e 100755
--- a/test/test.py
+++ b/test/test.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -18,12 +18,13 @@
# under the License.
#
-# Apache Thrift - integration test suite
+#
+# Apache Thrift - integration (cross) test suite
#
# tests different server-client, protocol and transport combinations
#
-# This script supports python 2.7 and later.
-# python 3.x is recommended for better stability.
+# This script requires python 3.x due to the improvements in
+# subprocess management that are needed for reliability.
#
from __future__ import print_function
@@ -38,6 +39,12 @@
import crossrunner
from crossrunner.compat import path_join
+# 3.3 introduced subprocess timeouts on waiting for child
+req_version = (3, 3)
+cur_version = sys.version_info
+assert (cur_version >= req_version), "Python 3.3 or later is required for proper operation."
+
+
ROOT_DIR = os.path.dirname(os.path.realpath(os.path.dirname(__file__)))
TEST_DIR_RELATIVE = 'test'
TEST_DIR = path_join(ROOT_DIR, TEST_DIR_RELATIVE)
@@ -161,9 +168,11 @@
options.update_failures, options.print_failures)
elif options.features is not None:
features = options.features or ['.*']
- res = run_feature_tests(server_match, features, options.jobs, options.skip_known_failures, options.retry_count, options.regex)
+ res = run_feature_tests(server_match, features, options.jobs,
+ options.skip_known_failures, options.retry_count, options.regex)
else:
- res = run_cross_tests(server_match, client_match, options.jobs, options.skip_known_failures, options.retry_count, options.regex)
+ res = run_cross_tests(server_match, client_match, options.jobs,
+ options.skip_known_failures, options.retry_count, options.regex)
return 0 if res else 1
diff --git a/test/tests.json b/test/tests.json
index 671c667..9c7668d 100644
--- a/test/tests.json
+++ b/test/tests.json
@@ -63,7 +63,8 @@
"name": "d",
"server": {
"command": [
- "thrift_test_server"
+ "thrift_test_server",
+ "--trace"
]
},
"client": {
@@ -438,12 +439,12 @@
"compact",
"json"
],
- "server": {
+ "server": {
"command": [
"dotnet",
"run",
- "--no-build",
- "--project=Server/Server.csproj",
+ "--no-build",
+ "--project=Server/Server.csproj",
"server"
]
},
@@ -452,8 +453,8 @@
"command": [
"dotnet",
"run",
- "--no-build",
- "--project=Client/Client.csproj",
+ "--no-build",
+ "--project=Client/Client.csproj",
"client"
]
},