THRIFT-4515: cross server test improvement: graceful test server shutdown
This closes #1509
diff --git a/lib/cpp/src/thrift/TOutput.cpp b/lib/cpp/src/thrift/TOutput.cpp
index bb46263..ae3a9e2 100644
--- a/lib/cpp/src/thrift/TOutput.cpp
+++ b/lib/cpp/src/thrift/TOutput.cpp
@@ -94,7 +94,7 @@
}
void TOutput::perror(const char* message, int errno_copy) {
- std::string out = message + strerror_s(errno_copy);
+ std::string out = message + std::string(": ") + strerror_s(errno_copy);
f_(out.c_str());
}
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index e60bffc..f89b5f7 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -659,6 +659,7 @@
return;
}
}
+ // fallthrough
// Intentionally fall through here, the call to process has written into
// the writeBuffer_
@@ -707,6 +708,7 @@
server_->getIdleWriteBufferLimit());
callsForResize_ = 0;
}
+ // fallthrough
// N.B.: We also intentionally fall through here into the INIT state!
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
index 7bdacb0..7071698 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
@@ -304,6 +304,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
// in the case of SSL_ERROR_SYSCALL we want to wait for an read event again
@@ -350,6 +351,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
// in the case of SSL_ERROR_SYSCALL we want to wait for an write/read event again
@@ -415,6 +417,8 @@
// a certain number
break;
}
+ // fallthrough
+
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
@@ -471,6 +475,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
@@ -515,6 +520,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
@@ -602,6 +608,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
@@ -634,6 +641,7 @@
&& (errno_copy != THRIFT_EAGAIN)) {
break;
}
+ // fallthrough
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
if (isLibeventSafe()) {
diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp
index c90593d..18cadbc 100644
--- a/lib/cpp/src/thrift/transport/TSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSocket.cpp
@@ -809,11 +809,15 @@
string TSocket::getSocketInfo() {
std::ostringstream oss;
- if (host_.empty() || port_ == 0) {
- oss << "<Host: " << getPeerAddress();
- oss << " Port: " << getPeerPort() << ">";
+ if (path_.empty()) {
+ if (host_.empty() || port_ == 0) {
+ oss << "<Host: " << getPeerAddress();
+ oss << " Port: " << getPeerPort() << ">";
+ } else {
+ oss << "<Host: " << host_ << " Port: " << port_ << ">";
+ }
} else {
- oss << "<Host: " << host_ << " Port: " << port_ << ">";
+ oss << "<Path: " << path_ << ">";
}
return oss.str();
}
diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h
index 3779b0d..1c52c47 100644
--- a/lib/cpp/test/concurrency/TimerManagerTests.h
+++ b/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -79,14 +79,13 @@
= shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
{
-
TimerManager timerManager;
-
timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
-
timerManager.start();
-
- assert(timerManager.state() == TimerManager::STARTED);
+ if (timerManager.state() != TimerManager::STARTED) {
+ std::cerr << "timerManager is not in the STARTED state, but should be" << std::endl;
+ return false;
+ }
// Don't create task yet, because its constructor sets the expected completion time, and we
// need to delay between inserting the two tasks into the run queue.
@@ -94,34 +93,27 @@
{
Synchronized s(_monitor);
-
timerManager.add(orphanTask, 10 * timeout);
- try {
- // Wait for 1 second in order to give timerManager a chance to start sleeping in response
- // to adding orphanTask. We need to do this so we can verify that adding the second task
- // kicks the dispatcher out of the current wait and starts the new 1 second wait.
- _monitor.wait(1000);
- assert(
- 0 == "ERROR: This wait should time out. TimerManager dispatcher may have a problem.");
- } catch (TimedOutException&) {
- }
+ THRIFT_SLEEP_USEC(timeout * 1000);
task.reset(new TimerManagerTests::Task(_monitor, timeout));
-
timerManager.add(task, timeout);
-
_monitor.wait();
}
- assert(task->_done);
+ if (!task->_done) {
+ std::cerr << "task is not done, but it should have executed" << std::endl;
+ return false;
+ }
std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
}
- // timerManager.stop(); This is where it happens via destructor
-
- assert(!orphanTask->_done);
+ if (orphanTask->_done) {
+ std::cerr << "orphan task is done, but it should not have executed" << std::endl;
+ return false;
+ }
return true;
}
diff --git a/lib/d/test/thrift_test_server.d b/lib/d/test/thrift_test_server.d
index 71ab917..b582253 100644
--- a/lib/d/test/thrift_test_server.d
+++ b/lib/d/test/thrift_test_server.d
@@ -16,8 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
module thrift_test_server;
+import core.stdc.errno : errno;
+import core.stdc.signal : signal, sigfn_t, SIGINT, SIG_DFL, SIG_ERR;
import core.thread : dur, Thread;
import std.algorithm;
import std.exception : enforce;
@@ -40,6 +43,7 @@
import thrift.transport.framed;
import thrift.transport.http;
import thrift.transport.ssl;
+import thrift.util.cancellation;
import thrift.util.hashset;
import test_utils;
@@ -205,14 +209,44 @@
bool trace_;
}
+shared(bool) gShutdown = false;
+
+nothrow @nogc extern(C) void handleSignal(int sig) {
+ gShutdown = true;
+}
+
+// Runs a thread that waits for shutdown to be
+// signaled and then triggers cancellation,
+// causing the server to stop. While we could
+// use a signalfd for this purpose, we are instead
+// opting for a busy waiting scheme for maximum
+// portability since signalfd is a linux thing.
+
+class ShutdownThread : Thread {
+ this(TCancellationOrigin cancellation) {
+ cancellation_ = cancellation;
+ super(&run);
+ }
+
+private:
+ void run() {
+ while (!gShutdown) {
+ Thread.sleep(dur!("msecs")(25));
+ }
+ cancellation_.trigger();
+ }
+
+ TCancellationOrigin cancellation_;
+}
+
void main(string[] args) {
ushort port = 9090;
ServerType serverType;
ProtocolType protocolType;
size_t numIOThreads = 1;
TransportType transportType;
- bool ssl;
- bool trace;
+ bool ssl = false;
+ bool trace = true;
size_t taskPoolSize = totalCPUs;
getopt(args, "port", &port, "protocol", &protocolType, "server-type",
@@ -279,8 +313,26 @@
auto server = createServer(serverType, numIOThreads, taskPoolSize,
processor, serverSocket, transportFactory, protocolFactory);
+ // Set up SIGINT signal handling
+ sigfn_t oldHandler = signal(SIGINT, &handleSignal);
+ enforce(oldHandler != SIG_ERR,
+ "Could not replace the SIGINT signal handler: errno {0}".format(errno()));
+
+ // Set up a server cancellation trigger
+ auto cancel = new TCancellationOrigin();
+
+ // Set up a listener for the shutdown condition - this will
+ // wake up when the signal occurs and trigger cancellation.
+ auto shutdown = new ShutdownThread(cancel);
+ shutdown.start();
+
+ // Serve from this thread; the signal will stop the server
+ // and control will return here
writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType,
transportType, serverType, ssl ? "(using SSL) ": "", port);
- server.serve();
+ server.serve(cancel);
+ shutdown.join();
+ signal(SIGINT, SIG_DFL);
+
writeln("done.");
}
diff --git a/lib/perl/lib/Thrift/Server.pm b/lib/perl/lib/Thrift/Server.pm
index fc9ca30..f265d45 100644
--- a/lib/perl/lib/Thrift/Server.pm
+++ b/lib/perl/lib/Thrift/Server.pm
@@ -150,27 +150,31 @@
sub serve
{
my $self = shift;
-
+ my $stop = 0;
+
$self->{serverTransport}->listen();
- while (1)
- {
+ while (!$stop) {
my $client = $self->{serverTransport}->accept();
- my $itrans = $self->{inputTransportFactory}->getTransport($client);
- my $otrans = $self->{outputTransportFactory}->getTransport($client);
- my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans);
- my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans);
- eval {
- $self->_clientBegin($iprot, $oprot);
- while (1)
- {
- $self->{processor}->process($iprot, $oprot);
+ if (defined $client) {
+ my $itrans = $self->{inputTransportFactory}->getTransport($client);
+ my $otrans = $self->{outputTransportFactory}->getTransport($client);
+ my $iprot = $self->{inputProtocolFactory}->getProtocol($itrans);
+ my $oprot = $self->{outputProtocolFactory}->getProtocol($otrans);
+ eval {
+ $self->_clientBegin($iprot, $oprot);
+ while (1)
+ {
+ $self->{processor}->process($iprot, $oprot);
+ }
+ }; if($@) {
+ $self->_handleException($@);
}
- }; if($@) {
- $self->_handleException($@);
- }
- $itrans->close();
- $otrans->close();
+ $itrans->close();
+ $otrans->close();
+ } else {
+ $stop = 1;
+ }
}
}
diff --git a/lib/perl/lib/Thrift/ServerSocket.pm b/lib/perl/lib/Thrift/ServerSocket.pm
index 51f83b4..2c4d906 100644
--- a/lib/perl/lib/Thrift/ServerSocket.pm
+++ b/lib/perl/lib/Thrift/ServerSocket.pm
@@ -81,15 +81,24 @@
{
my $self = shift;
- if ( exists $self->{handle} and defined $self->{handle} )
- {
+ if ( exists $self->{handle} and defined $self->{handle} ) {
my $client = $self->{handle}->accept();
my $result = $self->__client();
$result->{handle} = new IO::Select($client);
return $result;
}
- return 0;
+ return undef;
+}
+
+sub close
+{
+ my $self = shift;
+
+ if ( exists $self->{handle} and defined $self->{handle} )
+ {
+ $self->{handle}->close();
+ }
}
###