blob: 20a334096d679300bc86a48110a4179a90553dcc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
#include <Thrift.h>
#include <server/TServer.h>
#include <transport/TBufferTransports.h>
#include <transport/TSocket.h>
#include <concurrency/ThreadManager.h>
#include <climits>
#include <stack>
#include <string>
#include <errno.h>
#include <cstdlib>
#include <unistd.h>
#include <event.h>
namespace apache { namespace thrift { namespace server {
using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::transport::TSocket;
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
// Forward declaration of class
class TConnection;
#ifdef LIBEVENT_VERSION_NUMBER
#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
#else
// assume latest version 1 series
#define LIBEVENT_VERSION_MAJOR 1
#define LIBEVENT_VERSION_MINOR 14
#define LIBEVENT_VERSION_REL 13
#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
#endif
#if LIBEVENT_VERSION_NUMBER < 0x02000000
typedef int evutil_socket_t;
#endif
#ifndef SOCKOPT_CAST_T
#define SOCKOPT_CAST_T void
#endif
template<class T>
inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
}
template<class T>
inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
return reinterpret_cast<SOCKOPT_CAST_T*>(v);
}
/**
* This is a non-blocking server in C++ for high performance that operates a
* single IO thread. It assumes that all incoming requests are framed with a
* 4 byte length indicator and writes out responses using the same framing.
*
* It does not use the TServerTransport framework, but rather has socket
* operations hardcoded for use with select.
*
*/
/// Overload condition actions.
enum TOverloadAction {
T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
};
class TNonblockingServer : public TServer {
private:
/// Listen backlog
static const int LISTEN_BACKLOG = 1024;
/// Default limit on size of idle connection pool
static const size_t CONNECTION_STACK_LIMIT = 1024;
/// Default limit on total number of connected sockets
static const int MAX_CONNECTIONS = INT_MAX;
/// Default limit on connections in handler/task processing
static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
/// Default size of write buffer
static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
/// Maximum size of read buffer allocated to idle connection (0 = unlimited)
static const int IDLE_READ_BUFFER_LIMIT = 1024;
/// Maximum size of write buffer allocated to idle connection (0 = unlimited)
static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
/// # of calls before resizing oversized buffers (0 = check only on close)
static const int RESIZE_BUFFER_EVERY_N = 512;
/// Server socket file descriptor
int serverSocket_;
/// Port server runs on
int port_;
/// For processing via thread pool, may be NULL
boost::shared_ptr<ThreadManager> threadManager_;
/// Is thread pool processing?
bool threadPoolProcessing_;
/// The event base for libevent
event_base* eventBase_;
bool ownEventBase_;
/// Event struct, used with eventBase_ for connection events
struct event serverEvent_;
/// Event struct, used with eventBase_ for task completion notification
struct event notificationEvent_;
/// Number of TConnection object we've created
size_t numTConnections_;
/// Number of Connections processing or waiting to process
size_t numActiveProcessors_;
/// Limit for how many TConnection objects to cache
size_t connectionStackLimit_;
/// Limit for number of connections processing or waiting to process
size_t maxActiveProcessors_;
/// Limit for number of open connections
size_t maxConnections_;
/// Time in milliseconds before an unperformed task expires (0 == infinite).
int64_t taskExpireTime_;
/**
* Hysteresis for overload state. This is the fraction of the overload
* value that needs to be reached before the overload state is cleared;
* must be <= 1.0.
*/
double overloadHysteresis_;
/// Action to take when we're overloaded.
TOverloadAction overloadAction_;
/**
* The write buffer is initialized (and when idleWriteBufferLimit_ is checked
* and found to be exceeded, reinitialized) to this size.
*/
size_t writeBufferDefaultSize_;
/**
* Max read buffer size for an idle TConnection. When we place an idle
* TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
* we will free the buffer (such that it will be reinitialized by the next
* received frame) if it has exceeded this limit. 0 disables this check.
*/
size_t idleReadBufferLimit_;
/**
* Max write buffer size for an idle connection. When we place an idle
* TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
* we insure that its write buffer is <= to this size; otherwise we
* replace it with a new one of writeBufferDefaultSize_ bytes to insure that
* idle connections don't hog memory. 0 disables this check.
*/
size_t idleWriteBufferLimit_;
/**
* Every N calls we check the buffer size limits on a connected TConnection.
* 0 disables (i.e. the checks are only done when a connection closes).
*/
int32_t resizeBufferEveryN_;
/// Set if we are currently in an overloaded state.
bool overloaded_;
/// Count of connections dropped since overload started
uint32_t nConnectionsDropped_;
/// Count of connections dropped on overload since server started
uint64_t nTotalConnectionsDropped_;
/// File descriptors for pipe used for task completion notification.
evutil_socket_t notificationPipeFDs_[2];
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
* stack so that the object can be reused later, rather than freeing the
* memory and reallocating a new object later.
*/
std::stack<TConnection*> connectionStack_;
/**
* Called when server socket had something happen. We accept all waiting
* client connections on listen socket fd and assign TConnection objects
* to handle those requests.
*
* @param fd the listen socket.
* @param which the event flag that triggered the handler.
*/
void handleEvent(int fd, short which);
public:
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
int port) :
TServer(processor),
serverSocket_(-1),
port_(port),
threadPoolProcessing_(false),
eventBase_(NULL),
ownEventBase_(false),
numTConnections_(0),
numActiveProcessors_(0),
connectionStackLimit_(CONNECTION_STACK_LIMIT),
maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
maxConnections_(MAX_CONNECTIONS),
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
int port,
boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(-1),
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
ownEventBase_(false),
numTConnections_(0),
numActiveProcessors_(0),
connectionStackLimit_(CONNECTION_STACK_LIMIT),
maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
maxConnections_(MAX_CONNECTIONS),
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
setOutputProtocolFactory(protocolFactory);
setThreadManager(threadManager);
}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TTransportFactory> inputTransportFactory,
boost::shared_ptr<TTransportFactory> outputTransportFactory,
boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
int port,
boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
serverSocket_(-1),
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
ownEventBase_(false),
numTConnections_(0),
numActiveProcessors_(0),
connectionStackLimit_(CONNECTION_STACK_LIMIT),
maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
maxConnections_(MAX_CONNECTIONS),
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
setOutputProtocolFactory(outputProtocolFactory);
setThreadManager(threadManager);
}
~TNonblockingServer();
void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
boost::shared_ptr<ThreadManager> getThreadManager() {
return threadManager_;
}
/**
* Get the maximum number of unused TConnection we will hold in reserve.
*
* @return the current limit on TConnection pool size.
*/
size_t getConnectionStackLimit() const {
return connectionStackLimit_;
}
/**
* Set the maximum number of unused TConnection we will hold in reserve.
*
* @param sz the new limit for TConnection pool size.
*/
void setConnectionStackLimit(size_t sz) {
connectionStackLimit_ = sz;
}
bool isThreadPoolProcessing() const {
return threadPoolProcessing_;
}
void addTask(boost::shared_ptr<Runnable> task) {
threadManager_->add(task, 0LL, taskExpireTime_);
}
event_base* getEventBase() const {
return eventBase_;
}
/// Increment our count of the number of connected sockets.
void incrementNumConnections() {
++numTConnections_;
}
/// Decrement our count of the number of connected sockets.
void decrementNumConnections() {
--numTConnections_;
}
/**
* Return the count of sockets currently connected to.
*
* @return count of connected sockets.
*/
size_t getNumConnections() const {
return numTConnections_;
}
/**
* Return the count of connection objects allocated but not in use.
*
* @return count of idle connection objects.
*/
size_t getNumIdleConnections() const {
return connectionStack_.size();
}
/**
* Return count of number of connections which are currently processing.
* This is defined as a connection where all data has been received and
* either assigned a task (when threading) or passed to a handler (when
* not threading), and where the handler has not yet returned.
*
* @return # of connections currently processing.
*/
size_t getNumActiveProcessors() const {
return numActiveProcessors_;
}
/// Increment the count of connections currently processing.
void incrementActiveProcessors() {
++numActiveProcessors_;
}
/// Decrement the count of connections currently processing.
void decrementActiveProcessors() {
if (numActiveProcessors_ > 0) {
--numActiveProcessors_;
}
}
/**
* Get the maximum # of connections allowed before overload.
*
* @return current setting.
*/
size_t getMaxConnections() const {
return maxConnections_;
}
/**
* Set the maximum # of connections allowed before overload.
*
* @param maxConnections new setting for maximum # of connections.
*/
void setMaxConnections(size_t maxConnections) {
maxConnections_ = maxConnections;
}
/**
* Get the maximum # of connections waiting in handler/task before overload.
*
* @return current setting.
*/
size_t getMaxActiveProcessors() const {
return maxActiveProcessors_;
}
/**
* Set the maximum # of connections waiting in handler/task before overload.
*
* @param maxActiveProcessors new setting for maximum # of active processes.
*/
void setMaxActiveProcessors(size_t maxActiveProcessors) {
maxActiveProcessors_ = maxActiveProcessors;
}
/**
* Get fraction of maximum limits before an overload condition is cleared.
*
* @return hysteresis fraction
*/
double getOverloadHysteresis() const {
return overloadHysteresis_;
}
/**
* Set fraction of maximum limits before an overload condition is cleared.
* A good value would probably be between 0.5 and 0.9.
*
* @param hysteresisFraction fraction <= 1.0.
*/
void setOverloadHysteresis(double hysteresisFraction) {
if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
overloadHysteresis_ = hysteresisFraction;
}
}
/**
* Get the action the server will take on overload.
*
* @return a TOverloadAction enum value for the currently set action.
*/
TOverloadAction getOverloadAction() const {
return overloadAction_;
}
/**
* Set the action the server is to take on overload.
*
* @param overloadAction a TOverloadAction enum value for the action.
*/
void setOverloadAction(TOverloadAction overloadAction) {
overloadAction_ = overloadAction;
}
/**
* Get the time in milliseconds after which a task expires (0 == infinite).
*
* @return a 64-bit time in milliseconds.
*/
int64_t getTaskExpireTime() const {
return taskExpireTime_;
}
/**
* Set the time in milliseconds after which a task expires (0 == infinite).
*
* @param taskExpireTime a 64-bit time in milliseconds.
*/
void setTaskExpireTime(int64_t taskExpireTime) {
taskExpireTime_ = taskExpireTime;
}
/**
* Determine if the server is currently overloaded.
* This function checks the maximums for open connections and connections
* currently in processing, and sets an overload condition if they are
* exceeded. The overload will persist until both values are below the
* current hysteresis fraction of their maximums.
*
* @return true if an overload condition exists, false if not.
*/
bool serverOverloaded();
/** Pop and discard next task on threadpool wait queue.
*
* @return true if a task was discarded, false if the wait queue was empty.
*/
bool drainPendingTask();
/**
* Get the starting size of a TConnection object's write buffer.
*
* @return # bytes we initialize a TConnection object's write buffer to.
*/
size_t getWriteBufferDefaultSize() const {
return writeBufferDefaultSize_;
}
/**
* Set the starting size of a TConnection object's write buffer.
*
* @param size # bytes we initialize a TConnection object's write buffer to.
*/
void setWriteBufferDefaultSize(size_t size) {
writeBufferDefaultSize_ = size;
}
/**
* Get the maximum size of read buffer allocated to idle TConnection objects.
*
* @return # bytes beyond which we will dealloc idle buffer.
*/
size_t getIdleReadBufferLimit() const {
return idleReadBufferLimit_;
}
/**
* [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
* Get the maximum size of read buffer allocated to idle TConnection objects.
*
* @return # bytes beyond which we will dealloc idle buffer.
*/
size_t getIdleBufferMemLimit() const {
return idleReadBufferLimit_;
}
/**
* Set the maximum size read buffer allocated to idle TConnection objects.
* If a TConnection object is found (either on connection close or between
* calls when resizeBufferEveryN_ is set) with more than this much memory
* allocated to its read buffer, we free it and allow it to be reinitialized
* on the next received frame.
*
* @param limit of bytes beyond which we will shrink buffers when checked.
*/
void setIdleReadBufferLimit(size_t limit) {
idleReadBufferLimit_ = limit;
}
/**
* [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
* Set the maximum size read buffer allocated to idle TConnection objects.
* If a TConnection object is found (either on connection close or between
* calls when resizeBufferEveryN_ is set) with more than this much memory
* allocated to its read buffer, we free it and allow it to be reinitialized
* on the next received frame.
*
* @param limit of bytes beyond which we will shrink buffers when checked.
*/
void setIdleBufferMemLimit(size_t limit) {
idleReadBufferLimit_ = limit;
}
/**
* Get the maximum size of write buffer allocated to idle TConnection objects.
*
* @return # bytes beyond which we will reallocate buffers when checked.
*/
size_t getIdleWriteBufferLimit() const {
return idleWriteBufferLimit_;
}
/**
* Set the maximum size write buffer allocated to idle TConnection objects.
* If a TConnection object is found (either on connection close or between
* calls when resizeBufferEveryN_ is set) with more than this much memory
* allocated to its write buffer, we destroy and construct that buffer with
* writeBufferDefaultSize_ bytes.
*
* @param limit of bytes beyond which we will shrink buffers when idle.
*/
void setIdleWriteBufferLimit(size_t limit) {
idleWriteBufferLimit_ = limit;
}
/**
* Get # of calls made between buffer size checks. 0 means disabled.
*
* @return # of calls between buffer size checks.
*/
int32_t getResizeBufferEveryN() const {
return resizeBufferEveryN_;
}
/**
* Check buffer sizes every "count" calls. This allows buffer limits
* to be enforced for persistant connections with a controllable degree
* of overhead. 0 disables checks except at connection close.
*
* @param count the number of calls between checks, or 0 to disable
*/
void setResizeBufferEveryN(int32_t count) {
resizeBufferEveryN_ = count;
}
/**
* Return an initialized connection object. Creates or recovers from
* pool a TConnection and initializes it with the provided socket FD
* and flags.
*
* @param socket FD of socket associated with this connection.
* @param flags initial lib_event flags for this connection.
* @param addr the sockaddr of the client
* @param addrLen the length of addr
* @return pointer to initialized TConnection object.
*/
TConnection* createConnection(int socket, short flags,
const sockaddr* addr, socklen_t addrLen);
/**
* Returns a connection to pool or deletion. If the connection pool
* (a stack) isn't full, place the connection object on it, otherwise
* just delete it.
*
* @param connection the TConection being returned.
*/
void returnConnection(TConnection* connection);
/**
* Callback function that the threadmanager calls when a task reaches
* its expiration time. It is needed to clean up the expired connection.
*
* @param task the runnable associated with the expired task.
*/
void expireClose(boost::shared_ptr<Runnable> task);
/**
* C-callable event handler for listener events. Provides a callback
* that libevent can understand which invokes server->handleEvent().
*
* @param fd the descriptor the event occured on.
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TNonblockingServer's "this".
*/
static void eventHandler(evutil_socket_t fd, short which, void* v) {
((TNonblockingServer*)v)->handleEvent(fd, which);
}
/// Creates a socket to listen on and binds it to the local port.
void listenSocket();
/**
* Takes a socket created by listenSocket() and sets various options on it
* to prepare for use in the server.
*
* @param fd descriptor of socket to be initialized/
*/
void listenSocket(int fd);
/// Create the pipe used to notify I/O process of task completion.
void createNotificationPipe();
/**
* Get notification pipe send descriptor.
*
* @return write fd for pipe.
*/
int getNotificationSendFD() const {
return notificationPipeFDs_[1];
}
/**
* Get notification pipe receive descriptor.
*
* @return read fd of pipe.
*/
int getNotificationRecvFD() const {
return notificationPipeFDs_[0];
}
/**
* Register the core libevent events onto the proper base.
*
* @param base pointer to the event base to be initialized.
* @param ownEventBase if true, this server is responsible for
* freeing the event base memory.
*/
void registerEvents(event_base* base, bool ownEventBase = true);
/**
* Main workhorse function, starts up the server listening on a port and
* loops over the libevent handler.
*/
void serve();
/**
* May be called from a separate thread to cause serve() to return.
*/
void stop();
};
/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState {
SOCKET_RECV_FRAMING,
SOCKET_RECV,
SOCKET_SEND
};
/**
* Five states for the nonblocking servr:
* 1) initialize
* 2) read 4 byte frame size
* 3) read frame of data
* 4) send back data (if any)
* 5) force immediate connection close
*/
enum TAppState {
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
APP_SEND_RESULT,
APP_CLOSE_CONNECTION
};
/**
* Represents a connection that is handled via libevent. This connection
* essentially encapsulates a socket that has some associated libevent state.
*/
class TConnection {
private:
/// Server handle
TNonblockingServer* server_;
/// Object wrapping network socket
boost::shared_ptr<TSocket> tSocket_;
/// Libevent object
struct event event_;
/// Libevent flags
short eventFlags_;
/// Socket mode
TSocketState socketState_;
/// Application state
TAppState appState_;
/// How much data needed to read
uint32_t readWant_;
/// Where in the read buffer are we
uint32_t readBufferPos_;
/// Read buffer
uint8_t* readBuffer_;
/// Read buffer size
uint32_t readBufferSize_;
/// Write buffer
uint8_t* writeBuffer_;
/// Write buffer size
uint32_t writeBufferSize_;
/// How far through writing are we?
uint32_t writeBufferPos_;
/// Largest size of write buffer seen since buffer was constructed
size_t largestWriteBufferSize_;
/// Count of the number of calls for use with getResizeBufferEveryN().
int32_t callsForResize_;
/// Task handle
int taskHandle_;
/// Task event
struct event taskEvent_;
/// Transport to read from
boost::shared_ptr<TMemoryBuffer> inputTransport_;
/// Transport that processor writes to
boost::shared_ptr<TMemoryBuffer> outputTransport_;
/// extra transport generated by transport factory (e.g. BufferedRouterTransport)
boost::shared_ptr<TTransport> factoryInputTransport_;
boost::shared_ptr<TTransport> factoryOutputTransport_;
/// Protocol decoder
boost::shared_ptr<TProtocol> inputProtocol_;
/// Protocol encoder
boost::shared_ptr<TProtocol> outputProtocol_;
/// Server event handler, if any
boost::shared_ptr<TServerEventHandler> serverEventHandler_;
/// Thrift call context, if any
void *connectionContext_;
/// Go into read mode
void setRead() {
setFlags(EV_READ | EV_PERSIST);
}
/// Go into write mode
void setWrite() {
setFlags(EV_WRITE | EV_PERSIST);
}
/// Set socket idle
void setIdle() {
setFlags(0);
}
/**
* Set event flags for this connection.
*
* @param eventFlags flags we pass to libevent for the connection.
*/
void setFlags(short eventFlags);
/**
* Libevent handler called (via our static wrapper) when the connection
* socket had something happen. Rather than use the flags libevent passed,
* we use the connection state to determine whether we need to read or
* write the socket.
*/
void workSocket();
/// Close this connection and free or reset its resources.
void close();
public:
class Task;
/// Constructor
TConnection(int socket, short eventFlags, TNonblockingServer *s,
const sockaddr* addr, socklen_t addrLen) {
readBuffer_ = NULL;
readBufferSize_ = 0;
// Allocate input and output tranpsorts
// these only need to be allocated once per TConnection (they don't need to be
// reallocated on init() call)
inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
tSocket_.reset(new TSocket());
init(socket, eventFlags, s, addr, addrLen);
server_->incrementNumConnections();
}
~TConnection() {
std::free(readBuffer_);
server_->decrementNumConnections();
}
/**
* Check buffers against any size limits and shrink it if exceeded.
*
* @param readLimit we reduce read buffer size to this (if nonzero).
* @param writeLimit if nonzero and write buffer is larger, replace it.
*/
void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
void init(int socket, short eventFlags, TNonblockingServer *s,
const sockaddr* addr, socklen_t addrLen);
/**
* This is called when the application transitions from one state into
* another. This means that it has finished writing the data that it needed
* to, or finished receiving the data that it needed to.
*/
void transition();
/**
* C-callable event handler for connection events. Provides a callback
* that libevent can understand which invokes connection_->workSocket().
*
* @param fd the descriptor the event occured on.
* @param which the flags associated with the event.
* @param v void* callback arg where we placed TConnection's "this".
*/
static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
((TConnection*)v)->workSocket();
}
/**
* C-callable event handler for signaling task completion. Provides a
* callback that libevent can understand that will read a connection
* object's address from a pipe and call connection->transition() for
* that object.
*
* @param fd the descriptor the event occured on.
*/
static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
TConnection* connection;
ssize_t nBytes;
while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
== sizeof(TConnection*)) {
connection->transition();
}
if (nBytes > 0) {
throw TException("TConnection::taskHandler unexpected partial read");
}
if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
}
}
/**
* Notification to server that processing has ended on this request.
* Can be called either when processing is completed or when a waiting
* task has been preemptively terminated (on overload).
*
* @return true if successful, false if unable to notify (check errno).
*/
bool notifyServer() {
TConnection* connection = this;
if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
sizeof(TConnection*), 0) != sizeof(TConnection*)) {
return false;
}
return true;
}
/// Force connection shutdown for this connection.
void forceClose() {
appState_ = APP_CLOSE_CONNECTION;
if (!notifyServer()) {
throw TException("TConnection::forceClose: failed write on notify pipe");
}
}
/// return the server this connection was initialized for.
TNonblockingServer* getServer() {
return server_;
}
/// get state of connection.
TAppState getState() {
return appState_;
}
/// return the TSocket transport wrapping this network connection
boost::shared_ptr<TSocket> getTSocket() const {
return tSocket_;
}
/// return the server event handler if any
boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
return serverEventHandler_;
}
/// return the Thrift connection context if any
void* getConnectionContext() {
return connectionContext_;
}
};
}}} // apache::thrift::server
#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_