blob: 40ec5741984b1d876fadef502324e1b1612f595f [file] [log] [blame]
// Copyright (c) 2006- Facebook
// Distributed under the Thrift Software License
//
// See accompanying file LICENSE or visit the Thrift site at:
// http://developers.facebook.com/thrift/
#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
#include <Thrift.h>
#include <server/TServer.h>
#include <transport/TBufferTransports.h>
#include <concurrency/ThreadManager.h>
#include <stack>
#include <string>
#include <errno.h>
#include <cstdlib>
#include <event.h>
namespace apache { namespace thrift { namespace server {
using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
// Forward declaration of class
class TConnection;
/**
* This is a non-blocking server in C++ for high performance that operates a
* single IO thread. It assumes that all incoming requests are framed with a
* 4 byte length indicator and writes out responses using the same framing.
*
* It does not use the TServerTransport framework, but rather has socket
* operations hardcoded for use with select.
*
* @author Mark Slee <mcslee@facebook.com>
*/
class TNonblockingServer : public TServer {
private:
// Listen backlog
static const int LISTEN_BACKLOG = 1024;
// Server socket file descriptor
int serverSocket_;
// Port server runs on
int port_;
// For processing via thread pool, may be NULL
boost::shared_ptr<ThreadManager> threadManager_;
// Is thread pool processing?
bool threadPoolProcessing_;
// The event base for libevent
event_base* eventBase_;
// Event struct, for use with eventBase_
struct event serverEvent_;
// Number of TConnection object we've created
size_t numTConnections_;
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
* stack so that the object can be reused later, rather than freeing the
* memory and reallocating a new object later.
*/
std::stack<TConnection*> connectionStack_;
void handleEvent(int fd, short which);
public:
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
int port) :
TServer(processor),
serverSocket_(-1),
port_(port),
threadPoolProcessing_(false),
eventBase_(NULL),
numTConnections_(0) {}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
int port,
boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(-1),
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
numTConnections_(0) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
setOutputProtocolFactory(protocolFactory);
setThreadManager(threadManager);
}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TTransportFactory> inputTransportFactory,
boost::shared_ptr<TTransportFactory> outputTransportFactory,
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
int port,
boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(0),
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
numTConnections_(0) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
setOutputProtocolFactory(outputProtocolFactory);
setThreadManager(threadManager);
}
~TNonblockingServer() {}
void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
threadManager_ = threadManager;
threadPoolProcessing_ = (threadManager != NULL);
}
boost::shared_ptr<ThreadManager> getThreadManager() {
return threadManager_;
}
bool isThreadPoolProcessing() const {
return threadPoolProcessing_;
}
void addTask(boost::shared_ptr<Runnable> task) {
threadManager_->add(task);
}
event_base* getEventBase() const {
return eventBase_;
}
void incrementNumConnections() {
++numTConnections_;
}
void decrementNumConnections() {
--numTConnections_;
}
size_t getNumConnections() {
return numTConnections_;
}
size_t getNumIdleConnections() {
return connectionStack_.size();
}
TConnection* createConnection(int socket, short flags);
void returnConnection(TConnection* connection);
static void eventHandler(int fd, short which, void* v) {
((TNonblockingServer*)v)->handleEvent(fd, which);
}
void listenSocket();
void listenSocket(int fd);
void registerEvents(event_base* base);
void serve();
};
/**
* Two states for sockets, recv and send mode
*/
enum TSocketState {
SOCKET_RECV,
SOCKET_SEND
};
/**
* Four states for the nonblocking servr:
* 1) initialize
* 2) read 4 byte frame size
* 3) read frame of data
* 4) send back data (if any)
*/
enum TAppState {
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
APP_SEND_RESULT
};
/**
* Represents a connection that is handled via libevent. This connection
* essentially encapsulates a socket that has some associated libevent state.
*/
class TConnection {
private:
class Task;
// Server handle
TNonblockingServer* server_;
// Socket handle
int socket_;
// Libevent object
struct event event_;
// Libevent flags
short eventFlags_;
// Socket mode
TSocketState socketState_;
// Application state
TAppState appState_;
// How much data needed to read
uint32_t readWant_;
// Where in the read buffer are we
uint32_t readBufferPos_;
// Read buffer
uint8_t* readBuffer_;
// Read buffer size
uint32_t readBufferSize_;
// Write buffer
uint8_t* writeBuffer_;
// Write buffer size
uint32_t writeBufferSize_;
// How far through writing are we?
uint32_t writeBufferPos_;
// How many times have we read since our last buffer reset?
uint32_t numReadsSinceReset_;
// How many times have we written since our last buffer reset?
uint32_t numWritesSinceReset_;
// Task handle
int taskHandle_;
// Task event
struct event taskEvent_;
// Transport to read from
boost::shared_ptr<TMemoryBuffer> inputTransport_;
// Transport that processor writes to
boost::shared_ptr<TMemoryBuffer> outputTransport_;
// extra transport generated by transport factory (e.g. BufferedRouterTransport)
boost::shared_ptr<TTransport> factoryInputTransport_;
boost::shared_ptr<TTransport> factoryOutputTransport_;
// Protocol decoder
boost::shared_ptr<TProtocol> inputProtocol_;
// Protocol encoder
boost::shared_ptr<TProtocol> outputProtocol_;
// Go into read mode
void setRead() {
setFlags(EV_READ | EV_PERSIST);
}
// Go into write mode
void setWrite() {
setFlags(EV_WRITE | EV_PERSIST);
}
// Set socket idle
void setIdle() {
setFlags(0);
}
// Set event flags
void setFlags(short eventFlags);
// Libevent handlers
void workSocket();
// Close this client and reset
void close();
public:
// Constructor
TConnection(int socket, short eventFlags, TNonblockingServer *s) {
readBuffer_ = (uint8_t*)std::malloc(1024);
if (readBuffer_ == NULL) {
throw new apache::thrift::TException("Out of memory.");
}
readBufferSize_ = 1024;
numReadsSinceReset_ = 0;
numWritesSinceReset_ = 0;
// Allocate input and output tranpsorts
// these only need to be allocated once per TConnection (they don't need to be
// reallocated on init() call)
inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
init(socket, eventFlags, s);
server_->incrementNumConnections();
}
~TConnection() {
server_->decrementNumConnections();
}
// Initialize
void init(int socket, short eventFlags, TNonblockingServer *s);
// Transition into a new state
void transition();
// Handler wrapper
static void eventHandler(int fd, short /* which */, void* v) {
assert(fd == ((TConnection*)v)->socket_);
((TConnection*)v)->workSocket();
}
// Handler wrapper for task block
static void taskHandler(int fd, short /* which */, void* v) {
assert(fd == ((TConnection*)v)->taskHandle_);
if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
}
((TConnection*)v)->transition();
}
};
}}} // apache::thrift::server
#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_