THRIFT-1305. cpp: make TConnection a private inner class of
TNonblockingServer
Patch: Adam Simpkins
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1162987 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index f43a1c9..ed1001f 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -54,7 +54,277 @@
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransportException;
-class TConnection::Task: public Runnable {
+/// Three states for sockets: recv frame size, recv data, and send mode
+enum TSocketState {
+ SOCKET_RECV_FRAMING,
+ SOCKET_RECV,
+ SOCKET_SEND
+};
+
+/**
+ * Five states for the nonblocking server:
+ * 1) initialize
+ * 2) read 4 byte frame size
+ * 3) read frame of data
+ * 4) send back data (if any)
+ * 5) force immediate connection close
+ */
+enum TAppState {
+ APP_INIT,
+ APP_READ_FRAME_SIZE,
+ APP_READ_REQUEST,
+ APP_WAIT_TASK,
+ APP_SEND_RESULT,
+ APP_CLOSE_CONNECTION
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TNonblockingServer::TConnection {
+ private:
+
+ /// Server handle
+ TNonblockingServer* server_;
+
+ /// Object wrapping network socket
+ boost::shared_ptr<TSocket> tSocket_;
+
+ /// Libevent object
+ struct event event_;
+
+ /// Libevent flags
+ short eventFlags_;
+
+ /// Socket mode
+ TSocketState socketState_;
+
+ /// Application state
+ TAppState appState_;
+
+ /// How much data needed to read
+ uint32_t readWant_;
+
+ /// Where in the read buffer are we
+ uint32_t readBufferPos_;
+
+ /// Read buffer
+ uint8_t* readBuffer_;
+
+ /// Read buffer size
+ uint32_t readBufferSize_;
+
+ /// Write buffer
+ uint8_t* writeBuffer_;
+
+ /// Write buffer size
+ uint32_t writeBufferSize_;
+
+ /// How far through writing are we?
+ uint32_t writeBufferPos_;
+
+ /// Largest size of write buffer seen since buffer was constructed
+ size_t largestWriteBufferSize_;
+
+ /// Count of the number of calls for use with getResizeBufferEveryN().
+ int32_t callsForResize_;
+
+ /// Task handle
+ int taskHandle_;
+
+ /// Task event
+ struct event taskEvent_;
+
+ /// Transport to read from
+ boost::shared_ptr<TMemoryBuffer> inputTransport_;
+
+ /// Transport that processor writes to
+ boost::shared_ptr<TMemoryBuffer> outputTransport_;
+
+ /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
+ boost::shared_ptr<TTransport> factoryInputTransport_;
+ boost::shared_ptr<TTransport> factoryOutputTransport_;
+
+ /// Protocol decoder
+ boost::shared_ptr<TProtocol> inputProtocol_;
+
+ /// Protocol encoder
+ boost::shared_ptr<TProtocol> outputProtocol_;
+
+ /// Server event handler, if any
+ boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+
+ /// Thrift call context, if any
+ void *connectionContext_;
+
+ /// Go into read mode
+ void setRead() {
+ setFlags(EV_READ | EV_PERSIST);
+ }
+
+ /// Go into write mode
+ void setWrite() {
+ setFlags(EV_WRITE | EV_PERSIST);
+ }
+
+ /// Set socket idle
+ void setIdle() {
+ setFlags(0);
+ }
+
+ /**
+ * Set event flags for this connection.
+ *
+ * @param eventFlags flags we pass to libevent for the connection.
+ */
+ void setFlags(short eventFlags);
+
+ /**
+ * Libevent handler called (via our static wrapper) when the connection
+ * socket had something happen. Rather than use the flags libevent passed,
+ * we use the connection state to determine whether we need to read or
+ * write the socket.
+ */
+ void workSocket();
+
+ /// Close this connection and free or reset its resources.
+ void close();
+
+ public:
+
+ class Task;
+
+ /// Constructor
+ TConnection(int socket, short eventFlags, TNonblockingServer *s,
+ const sockaddr* addr, socklen_t addrLen) {
+ readBuffer_ = NULL;
+ readBufferSize_ = 0;
+
+ // Allocate input and output transports
+ // these only need to be allocated once per TConnection (they don't need to be
+ // reallocated on init() call)
+ inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
+ outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
+ tSocket_.reset(new TSocket());
+
+ init(socket, eventFlags, s, addr, addrLen);
+ server_->incrementNumConnections();
+ }
+
+ ~TConnection() {
+ std::free(readBuffer_);
+ server_->decrementNumConnections();
+ }
+
+ /**
+ * Check buffers against any size limits and shrink it if exceeded.
+ *
+ * @param readLimit we reduce read buffer size to this (if nonzero).
+ * @param writeLimit if nonzero and write buffer is larger, replace it.
+ */
+ void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
+
+ /// Initialize
+ void init(int socket, short eventFlags, TNonblockingServer *s,
+ const sockaddr* addr, socklen_t addrLen);
+
+ /**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+ void transition();
+
+ /**
+ * C-callable event handler for connection events. Provides a callback
+ * that libevent can understand which invokes connection_->workSocket().
+ *
+ * @param fd the descriptor the event occurred on.
+ * @param which the flags associated with the event.
+ * @param v void* callback arg where we placed TConnection's "this".
+ */
+ static void eventHandler(int fd, short /* which */, void* v) {
+ assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
+ ((TConnection*)v)->workSocket();
+ }
+
+ /**
+ * C-callable event handler for signaling task completion. Provides a
+ * callback that libevent can understand that will read a connection
+ * object's address from a pipe and call connection->transition() for
+ * that object.
+ *
+ * @param fd the descriptor the event occurred on.
+ */
+ static void taskHandler(int fd, short /* which */, void* /* v */) {
+ TConnection* connection;
+ ssize_t nBytes;
+ while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*)))
+ == sizeof(TConnection*)) {
+ connection->transition();
+ }
+ if (nBytes > 0) {
+ throw TException("TConnection::taskHandler unexpected partial read");
+ }
+ if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
+ }
+ }
+
+ /**
+ * Notification to server that processing has ended on this request.
+ * Can be called either when processing is completed or when a waiting
+ * task has been preemptively terminated (on overload).
+ *
+ * @return true if successful, false if unable to notify (check errno).
+ */
+ bool notifyServer() {
+ TConnection* connection = this;
+ if (write(server_->getNotificationSendFD(), (const void*)&connection,
+ sizeof(TConnection*)) != sizeof(TConnection*)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /// Force connection shutdown for this connection.
+ void forceClose() {
+ appState_ = APP_CLOSE_CONNECTION;
+ if (!notifyServer()) {
+ throw TException("TConnection::forceClose: failed write on notify pipe");
+ }
+ }
+
+ /// return the server this connection was initialized for.
+ TNonblockingServer* getServer() {
+ return server_;
+ }
+
+ /// get state of connection.
+ TAppState getState() {
+ return appState_;
+ }
+
+ /// return the TSocket transport wrapping this network connection
+ boost::shared_ptr<TSocket> getTSocket() const {
+ return tSocket_;
+ }
+
+ /// return the server event handler if any
+ boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
+ return serverEventHandler_;
+ }
+
+ /// return the Thrift connection context if any
+ void* getConnectionContext() {
+ return connectionContext_;
+ }
+
+};
+
+class TNonblockingServer::TConnection::Task: public Runnable {
public:
Task(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocol> input,
@@ -109,8 +379,10 @@
void* connectionContext_;
};
-void TConnection::init(int socket, short eventFlags, TNonblockingServer* s,
- const sockaddr* addr, socklen_t addrLen) {
+void TNonblockingServer::TConnection::init(int socket, short eventFlags,
+ TNonblockingServer* s,
+ const sockaddr* addr,
+ socklen_t addrLen) {
tSocket_->setSocketFD(socket);
tSocket_->setCachedAddress(addr, addrLen);
@@ -150,7 +422,7 @@
}
}
-void TConnection::workSocket() {
+void TNonblockingServer::TConnection::workSocket() {
int got=0, left=0, sent=0;
uint32_t fetch = 0;
@@ -276,7 +548,10 @@
* another. This means that it has finished writing the data that it needed
* to, or finished receiving the data that it needed to.
*/
-void TConnection::transition() {
+void TNonblockingServer::TConnection::transition() {
+
+ int sz = 0;
+
// Switch upon the state that we are currently in and move to a new state
switch (appState_) {
@@ -460,7 +735,7 @@
}
}
-void TConnection::setFlags(short eventFlags) {
+void TNonblockingServer::TConnection::setFlags(short eventFlags) {
// Catch the do nothing case
if (eventFlags_ == eventFlags) {
return;
@@ -522,7 +797,7 @@
/**
* Closes a connection
*/
-void TConnection::close() {
+void TNonblockingServer::TConnection::close() {
// Delete the registered libevent
if (event_del(&event_) == -1) {
GlobalOutput.perror("TConnection::close() event_del", errno);
@@ -543,8 +818,9 @@
server_->returnConnection(this);
}
-void TConnection::checkIdleBufferMemLimit(size_t readLimit,
- size_t writeLimit) {
+void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
+ size_t readLimit,
+ size_t writeLimit) {
if (readLimit > 0 && readBufferSize_ > readLimit) {
free(readBuffer_);
readBuffer_ = NULL;
@@ -585,9 +861,10 @@
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
*/
-TConnection* TNonblockingServer::createConnection(int socket, short flags,
- const sockaddr* addr,
- socklen_t addrLen) {
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(
+ int socket, short flags,
+ const sockaddr* addr,
+ socklen_t addrLen) {
// Check the stack
if (connectionStack_.empty()) {
return new TConnection(socket, flags, this, addr, addrLen);
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 21b8d95..e4e0e64 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -41,9 +41,6 @@
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
-// Forward declaration of class
-class TConnection;
-
#ifdef LIBEVENT_VERSION_NUMBER
#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
@@ -94,6 +91,8 @@
class TNonblockingServer : public TServer {
private:
+ class TConnection;
+
/// Listen backlog
static const int LISTEN_BACKLOG = 1024;
@@ -698,276 +697,6 @@
void stop();
};
-/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState {
- SOCKET_RECV_FRAMING,
- SOCKET_RECV,
- SOCKET_SEND
-};
-
-/**
- * Five states for the nonblocking servr:
- * 1) initialize
- * 2) read 4 byte frame size
- * 3) read frame of data
- * 4) send back data (if any)
- * 5) force immediate connection close
- */
-enum TAppState {
- APP_INIT,
- APP_READ_FRAME_SIZE,
- APP_READ_REQUEST,
- APP_WAIT_TASK,
- APP_SEND_RESULT,
- APP_CLOSE_CONNECTION
-};
-
-/**
- * Represents a connection that is handled via libevent. This connection
- * essentially encapsulates a socket that has some associated libevent state.
- */
-class TConnection {
- private:
-
- /// Server handle
- TNonblockingServer* server_;
-
- /// Object wrapping network socket
- boost::shared_ptr<TSocket> tSocket_;
-
- /// Libevent object
- struct event event_;
-
- /// Libevent flags
- short eventFlags_;
-
- /// Socket mode
- TSocketState socketState_;
-
- /// Application state
- TAppState appState_;
-
- /// How much data needed to read
- uint32_t readWant_;
-
- /// Where in the read buffer are we
- uint32_t readBufferPos_;
-
- /// Read buffer
- uint8_t* readBuffer_;
-
- /// Read buffer size
- uint32_t readBufferSize_;
-
- /// Write buffer
- uint8_t* writeBuffer_;
-
- /// Write buffer size
- uint32_t writeBufferSize_;
-
- /// How far through writing are we?
- uint32_t writeBufferPos_;
-
- /// Largest size of write buffer seen since buffer was constructed
- size_t largestWriteBufferSize_;
-
- /// Count of the number of calls for use with getResizeBufferEveryN().
- int32_t callsForResize_;
-
- /// Task handle
- int taskHandle_;
-
- /// Task event
- struct event taskEvent_;
-
- /// Transport to read from
- boost::shared_ptr<TMemoryBuffer> inputTransport_;
-
- /// Transport that processor writes to
- boost::shared_ptr<TMemoryBuffer> outputTransport_;
-
- /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
- boost::shared_ptr<TTransport> factoryInputTransport_;
- boost::shared_ptr<TTransport> factoryOutputTransport_;
-
- /// Protocol decoder
- boost::shared_ptr<TProtocol> inputProtocol_;
-
- /// Protocol encoder
- boost::shared_ptr<TProtocol> outputProtocol_;
-
- /// Server event handler, if any
- boost::shared_ptr<TServerEventHandler> serverEventHandler_;
-
- /// Thrift call context, if any
- void *connectionContext_;
-
- /// Go into read mode
- void setRead() {
- setFlags(EV_READ | EV_PERSIST);
- }
-
- /// Go into write mode
- void setWrite() {
- setFlags(EV_WRITE | EV_PERSIST);
- }
-
- /// Set socket idle
- void setIdle() {
- setFlags(0);
- }
-
- /**
- * Set event flags for this connection.
- *
- * @param eventFlags flags we pass to libevent for the connection.
- */
- void setFlags(short eventFlags);
-
- /**
- * Libevent handler called (via our static wrapper) when the connection
- * socket had something happen. Rather than use the flags libevent passed,
- * we use the connection state to determine whether we need to read or
- * write the socket.
- */
- void workSocket();
-
- /// Close this connection and free or reset its resources.
- void close();
-
- public:
-
- class Task;
-
- /// Constructor
- TConnection(int socket, short eventFlags, TNonblockingServer *s,
- const sockaddr* addr, socklen_t addrLen) {
- readBuffer_ = NULL;
- readBufferSize_ = 0;
-
- // Allocate input and output tranpsorts
- // these only need to be allocated once per TConnection (they don't need to be
- // reallocated on init() call)
- inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
- outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
- tSocket_.reset(new TSocket());
-
- init(socket, eventFlags, s, addr, addrLen);
- server_->incrementNumConnections();
- }
-
- ~TConnection() {
- std::free(readBuffer_);
- server_->decrementNumConnections();
- }
-
- /**
- * Check buffers against any size limits and shrink it if exceeded.
- *
- * @param readLimit we reduce read buffer size to this (if nonzero).
- * @param writeLimit if nonzero and write buffer is larger, replace it.
- */
- void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
-
- /// Initialize
- void init(int socket, short eventFlags, TNonblockingServer *s,
- const sockaddr* addr, socklen_t addrLen);
-
- /**
- * This is called when the application transitions from one state into
- * another. This means that it has finished writing the data that it needed
- * to, or finished receiving the data that it needed to.
- */
- void transition();
-
- /**
- * C-callable event handler for connection events. Provides a callback
- * that libevent can understand which invokes connection_->workSocket().
- *
- * @param fd the descriptor the event occured on.
- * @param which the flags associated with the event.
- * @param v void* callback arg where we placed TConnection's "this".
- */
- static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
- assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
- ((TConnection*)v)->workSocket();
- }
-
- /**
- * C-callable event handler for signaling task completion. Provides a
- * callback that libevent can understand that will read a connection
- * object's address from a pipe and call connection->transition() for
- * that object.
- *
- * @param fd the descriptor the event occured on.
- */
- static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
- TConnection* connection;
- ssize_t nBytes;
- while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
- == sizeof(TConnection*)) {
- connection->transition();
- }
- if (nBytes > 0) {
- throw TException("TConnection::taskHandler unexpected partial read");
- }
- if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
- GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
- }
- }
-
- /**
- * Notification to server that processing has ended on this request.
- * Can be called either when processing is completed or when a waiting
- * task has been preemptively terminated (on overload).
- *
- * @return true if successful, false if unable to notify (check errno).
- */
- bool notifyServer() {
- TConnection* connection = this;
- if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
- sizeof(TConnection*), 0) != sizeof(TConnection*)) {
- return false;
- }
-
- return true;
- }
-
- /// Force connection shutdown for this connection.
- void forceClose() {
- appState_ = APP_CLOSE_CONNECTION;
- if (!notifyServer()) {
- throw TException("TConnection::forceClose: failed write on notify pipe");
- }
- }
-
- /// return the server this connection was initialized for.
- TNonblockingServer* getServer() {
- return server_;
- }
-
- /// get state of connection.
- TAppState getState() {
- return appState_;
- }
-
- /// return the TSocket transport wrapping this network connection
- boost::shared_ptr<TSocket> getTSocket() const {
- return tSocket_;
- }
-
- /// return the server event handler if any
- boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
- return serverEventHandler_;
- }
-
- /// return the Thrift connection context if any
- void* getConnectionContext() {
- return connectionContext_;
- }
-
-};
-
}}} // apache::thrift::server
#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_