THRIFT-4276:Add SSL support to the C++ Nonblocking Server
Client: C++ Lib
Patch: Divya Thaluru
Github Pull Request:
This closes #1251
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 97c4cd9..d5af12a 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -209,10 +209,8 @@
class Task;
/// Constructor
- TConnection(THRIFT_SOCKET socket,
- TNonblockingIOThread* ioThread,
- const sockaddr* addr,
- socklen_t addrLen) {
+ TConnection(boost::shared_ptr<TSocket> socket,
+ TNonblockingIOThread* ioThread) {
readBuffer_ = NULL;
readBufferSize_ = 0;
@@ -224,8 +222,10 @@
inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_.reset(
new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
- tSocket_.reset(new TSocket());
- init(socket, ioThread, addr, addrLen);
+
+ tSocket_ = socket;
+
+ init(ioThread);
}
~TConnection() { std::free(readBuffer_); }
@@ -242,10 +242,10 @@
void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
- void init(THRIFT_SOCKET socket,
- TNonblockingIOThread* ioThread,
- const sockaddr* addr,
- socklen_t addrLen);
+ void init(TNonblockingIOThread* ioThread);
+
+ /// set socket for connection
+ void setSocket(boost::shared_ptr<TSocket> socket);
/**
* This is called when the application transitions from one state into
@@ -367,13 +367,7 @@
void* connectionContext_;
};
-void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
- TNonblockingIOThread* ioThread,
- const sockaddr* addr,
- socklen_t addrLen) {
- tSocket_->setSocketFD(socket);
- tSocket_->setCachedAddress(addr, addrLen);
-
+void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) {
ioThread_ = ioThread;
server_ = ioThread->getServer();
appState_ = APP_INIT;
@@ -416,6 +410,10 @@
processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
}
+void TNonblockingServer::TConnection::setSocket(boost::shared_ptr<TSocket> socket) {
+ tSocket_ = socket;
+}
+
void TNonblockingServer::TConnection::workSocket() {
int got = 0, left = 0, sent = 0;
uint32_t fetch = 0;
@@ -441,10 +439,14 @@
}
readBufferPos_ += fetch;
} catch (TTransportException& te) {
- GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
- close();
+ //In Nonblocking SSLSocket some operations need to be retried again.
+ //Current approach is parsing exception message, but a better solution needs to be investigated.
+ if(!strstr(te.what(), "retry")) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
- return;
+ return;
+ }
}
if (readBufferPos_ < sizeof(framing.size)) {
@@ -481,8 +483,12 @@
fetch = readWant_ - readBufferPos_;
got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
} catch (TTransportException& te) {
- GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
- close();
+ //In Nonblocking SSLSocket some operations need to be retried again.
+ //Current approach is parsing exception message, but a better solution needs to be investigated.
+ if(!strstr(te.what(), "retry")) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
+ }
return;
}
@@ -748,7 +754,7 @@
appState_ = APP_READ_REQUEST;
// Work the socket right away
- // workSocket();
+ workSocket();
return;
@@ -883,9 +889,7 @@
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
*/
-TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOCKET socket,
- const sockaddr* addr,
- socklen_t addrLen) {
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(boost::shared_ptr<TSocket> socket) {
// Check the stack
Guard g(connMutex_);
@@ -899,12 +903,13 @@
// Check the connection stack to see if we can re-use
TConnection* result = NULL;
if (connectionStack_.empty()) {
- result = new TConnection(socket, ioThread, addr, addrLen);
+ result = new TConnection(socket, ioThread);
++numTConnections_;
} else {
result = connectionStack_.top();
connectionStack_.pop();
- result->init(socket, ioThread, addr, addrLen);
+ result->setSocket(socket);
+ result->init(ioThread);
}
activeConnections_.push_back(result);
return result;
@@ -939,53 +944,35 @@
// Make sure that libevent didn't mess up the socket handles
assert(fd == serverSocket_);
- // Server socket accepted a new connection
- socklen_t addrLen;
- sockaddr_storage addrStorage;
- sockaddr* addrp = (sockaddr*)&addrStorage;
- addrLen = sizeof(addrStorage);
-
// Going to accept a new client socket
- THRIFT_SOCKET clientSocket;
+ boost::shared_ptr<TSocket> clientSocket;
- // Accept as many new clients as possible, even though libevent signaled only
- // one, this helps us to avoid having to go back into the libevent engine so
- // many times
- while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
+ clientSocket = serverTransport_->accept();
+ if (clientSocket) {
// If we're overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Guard g(connMutex_);
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
- ::THRIFT_CLOSESOCKET(clientSocket);
+ clientSocket->close();
return;
} else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
if (!drainPendingTask()) {
// Nothing left to discard, so we drop connection instead.
- ::THRIFT_CLOSESOCKET(clientSocket);
+ clientSocket->close();
return;
}
}
}
- // Explicitly set this socket to NONBLOCK mode
- int flags;
- if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0
- || THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
- GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ",
- THRIFT_GET_SOCKET_ERROR);
- ::THRIFT_CLOSESOCKET(clientSocket);
- return;
- }
-
// Create a new TConnection for this client socket.
- TConnection* clientConnection = createConnection(clientSocket, addrp, addrLen);
+ TConnection* clientConnection = createConnection(clientSocket);
// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
- ::THRIFT_CLOSESOCKET(clientSocket);
+ clientSocket->close();
return;
}
@@ -1009,15 +996,6 @@
clientConnection->close();
}
}
-
- // addrLen is written by the accept() call, so needs to be set before the next call.
- addrLen = sizeof(addrStorage);
- }
-
- // Done looping accept, now we have to make sure the error is due to
- // blocking. Any other error is a problem
- if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
- GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
}
}
@@ -1025,130 +1003,10 @@
* Creates a socket to listen on and binds it to the local port.
*/
void TNonblockingServer::createAndListenOnSocket() {
-#ifdef _WIN32
- TWinsockSingleton::create();
-#endif // _WIN32
-
- THRIFT_SOCKET s;
-
- struct addrinfo hints, *res, *res0;
- int error;
-
- char port[sizeof("65536") + 1];
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = PF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
- sprintf(port, "%d", port_);
-
- // Wildcard address
- error = getaddrinfo(NULL, port, &hints, &res0);
- if (error) {
- throw TException("TNonblockingServer::serve() getaddrinfo "
- + string(THRIFT_GAI_STRERROR(error)));
- }
-
- // Pick the ipv6 address first since ipv4 addresses can be mapped
- // into ipv6 space.
- for (res = res0; res; res = res->ai_next) {
- if (res->ai_family == AF_INET6 || res->ai_next == NULL)
- break;
- }
-
- // Create the server socket
- s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
- if (s == -1) {
- freeaddrinfo(res0);
- throw TException("TNonblockingServer::serve() socket() -1");
- }
-
-#ifdef IPV6_V6ONLY
- if (res->ai_family == AF_INET6) {
- int zero = 0;
- if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
- GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY", THRIFT_GET_SOCKET_ERROR);
- }
- }
-#endif // #ifdef IPV6_V6ONLY
-
- int one = 1;
-
- // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
- setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
-
- if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
- ::THRIFT_CLOSESOCKET(s);
- freeaddrinfo(res0);
- throw TTransportException(TTransportException::NOT_OPEN,
- "TNonblockingServer::serve() bind",
- THRIFT_GET_SOCKET_ERROR);
- }
-
- // Done with the addr info
- freeaddrinfo(res0);
-
- // Set up this file descriptor for listening
- listenSocket(s);
+ serverTransport_->listen();
+ serverSocket_ = serverTransport_->getSocketFD();
}
-/**
- * Takes a socket created by listenSocket() and sets various options on it
- * to prepare for use in the server.
- */
-void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
- // Set socket to nonblocking mode
- int flags;
- if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0
- || THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
- ::THRIFT_CLOSESOCKET(s);
- throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
- }
-
- int one = 1;
- struct linger ling = {0, 0};
-
- // Keepalive to ensure full result flushing
- setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
-
- // Turn linger off to avoid hung sockets
- setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
-
-// Set TCP nodelay if available, MAC OS X Hack
-// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
-#ifndef TCP_NOPUSH
- setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
-#endif
-
-#ifdef TCP_LOW_MIN_RTO
- if (TSocket::getUseLowMinRto()) {
- setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
- }
-#endif
-
- if (listen(s, LISTEN_BACKLOG) == -1) {
- ::THRIFT_CLOSESOCKET(s);
- throw TTransportException(TTransportException::NOT_OPEN, "TNonblockingServer::serve() listen");
- }
-
- // Cool, this socket is good to go, set it as the serverSocket_
- serverSocket_ = s;
-
- if (!port_) {
- struct sockaddr_storage addr;
- socklen_t size = sizeof(addr);
- if (!getsockname(serverSocket_, reinterpret_cast<sockaddr*>(&addr), &size)) {
- if (addr.ss_family == AF_INET6) {
- const struct sockaddr_in6* sin = reinterpret_cast<const struct sockaddr_in6*>(&addr);
- listenPort_ = ntohs(sin->sin6_port);
- } else {
- const struct sockaddr_in* sin = reinterpret_cast<const struct sockaddr_in*>(&addr);
- listenPort_ = ntohs(sin->sin_port);
- }
- } else {
- GlobalOutput.perror("TNonblocking: failed to get listen port: ", THRIFT_GET_SOCKET_ERROR);
- }
- }
-}
void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
threadManager_ = threadManager;
@@ -1205,10 +1063,7 @@
connection->forceClose();
}
-void TNonblockingServer::stop() {
- if (!port_) {
- listenPort_ = 0;
- }
+void TNonblockingServer::stop() {
// Breaks the event loop in all threads so that they end ASAP.
for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
ioThreads_[i]->stop();
@@ -1249,8 +1104,7 @@
assert(ioThreads_.size() == numIOThreads_);
assert(ioThreads_.size() > 0);
- GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
- listenPort_,
+ GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
ioThreads_.size());
// Launch all the secondary IO threads in separate threads