C++ Thrift coding style changes
Summary: Make underscore for class members consistent
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664818 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index 38226dc..c21b0a5 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -13,13 +13,19 @@
namespace facebook { namespace thrift {
class Exception : public std::exception {
-private:
- const std::string _message;
-
public:
- Exception(const std::string message) : _message(message) {}
+ Exception(const std::string message) :
+ message_(message) {}
+
~Exception() throw () {}
- const char* what() {return _message.c_str();}
+
+ const char* what() {
+ return message_.c_str();
+ }
+
+private:
+ const std::string message_;
+
};
}} // facebook::thrift
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
index 57532a3..518c77f 100644
--- a/lib/cpp/src/concurrency/Monitor.cc
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -22,14 +22,14 @@
public:
Impl() :
- mutexInitialized(false),
- condInitialized(false) {
+ mutexInitialized_(false),
+ condInitialized_(false) {
try {
- assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
- mutexInitialized = true;
- assert(pthread_cond_init(&_pthread_cond, NULL) == 0);
- condInitialized = true;
+ assert(pthread_mutex_init(&pthread_mutex_, NULL) == 0);
+ mutexInitialized_ = true;
+ assert(pthread_cond_init(&pthread_cond_, NULL) == 0);
+ condInitialized_ = true;
} catch(...) {
cleanup();
}
@@ -37,21 +37,23 @@
~Impl() { cleanup(); }
- void lock() const { pthread_mutex_lock(&_pthread_mutex); }
+ void lock() const { pthread_mutex_lock(&pthread_mutex_); }
- void unlock() const { pthread_mutex_unlock(&_pthread_mutex); }
+ void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
void wait(long long timeout) const {
// XXX Need to assert that caller owns mutex
assert(timeout >= 0LL);
if (timeout == 0LL) {
- assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0);
+ assert(pthread_cond_wait(&pthread_cond_, &pthread_mutex_) == 0);
} else {
struct timespec abstime;
long long now = Util::currentTime();
Util::toTimespec(abstime, now + timeout);
- int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
+ int result = pthread_cond_timedwait(&pthread_cond_,
+ &pthread_mutex_,
+ &abstime);
if (result == ETIMEDOUT) {
assert(Util::currentTime() >= (now + timeout));
}
@@ -60,46 +62,46 @@
void notify() {
// XXX Need to assert that caller owns mutex
- assert(pthread_cond_signal(&_pthread_cond) == 0);
+ assert(pthread_cond_signal(&pthread_cond_) == 0);
}
void notifyAll() {
// XXX Need to assert that caller owns mutex
- assert(pthread_cond_broadcast(&_pthread_cond) == 0);
+ assert(pthread_cond_broadcast(&pthread_cond_) == 0);
}
private:
void cleanup() {
- if (mutexInitialized) {
- mutexInitialized = false;
- assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
+ if (mutexInitialized_) {
+ mutexInitialized_ = false;
+ assert(pthread_mutex_destroy(&pthread_mutex_) == 0);
}
- if (condInitialized) {
- condInitialized = false;
- assert(pthread_cond_destroy(&_pthread_cond) == 0);
+ if (condInitialized_) {
+ condInitialized_ = false;
+ assert(pthread_cond_destroy(&pthread_cond_) == 0);
}
}
- mutable pthread_mutex_t _pthread_mutex;
- mutable bool mutexInitialized;
- mutable pthread_cond_t _pthread_cond;
- mutable bool condInitialized;
+ mutable pthread_mutex_t pthread_mutex_;
+ mutable bool mutexInitialized_;
+ mutable pthread_cond_t pthread_cond_;
+ mutable bool condInitialized_;
};
-Monitor::Monitor() : _impl(new Monitor::Impl()) {}
+Monitor::Monitor() : impl_(new Monitor::Impl()) {}
-Monitor::~Monitor() { delete _impl; }
+Monitor::~Monitor() { delete impl_; }
-void Monitor::lock() const { _impl->lock(); }
+void Monitor::lock() const { impl_->lock(); }
-void Monitor::unlock() const { _impl->unlock(); }
+void Monitor::unlock() const { impl_->unlock(); }
-void Monitor::wait(long long timeout) const { _impl->wait(timeout); }
+void Monitor::wait(long long timeout) const { impl_->wait(timeout); }
-void Monitor::notify() const { _impl->notify(); }
+void Monitor::notify() const { impl_->notify(); }
-void Monitor::notifyAll() const { _impl->notifyAll(); }
+void Monitor::notifyAll() const { impl_->notifyAll(); }
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h
index 62a8344..e67b71b 100644
--- a/lib/cpp/src/concurrency/Monitor.h
+++ b/lib/cpp/src/concurrency/Monitor.h
@@ -39,22 +39,23 @@
class Impl;
- Impl* _impl;
+ Impl* impl_;
};
class Synchronized {
public:
- Synchronized(const Monitor& value) : _monitor(value) {
- _monitor.lock();
+ Synchronized(const Monitor& value) :
+ monitor_(value) {
+ monitor_.lock();
}
~Synchronized() {
- _monitor.unlock();
+ monitor_.unlock();
}
private:
- const Monitor& _monitor;
+ const Monitor& monitor_;
};
diff --git a/lib/cpp/src/concurrency/Mutex.cc b/lib/cpp/src/concurrency/Mutex.cc
index 416341e..1f116a3 100644
--- a/lib/cpp/src/concurrency/Mutex.cc
+++ b/lib/cpp/src/concurrency/Mutex.cc
@@ -13,32 +13,32 @@
*/
class Mutex::impl {
public:
- impl() : initialized(false) {
- assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
- initialized = true;
+ impl() : initialized_(false) {
+ assert(pthread_mutex_init(&pthread_mutex_, NULL) == 0);
+ initialized_ = true;
}
~impl() {
- if (initialized) {
- initialized = false;
- assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
+ if (initialized_) {
+ initialized_ = false;
+ assert(pthread_mutex_destroy(&pthread_mutex_) == 0);
}
}
- void lock() const { pthread_mutex_lock(&_pthread_mutex); }
+ void lock() const { pthread_mutex_lock(&pthread_mutex_); }
- void unlock() const { pthread_mutex_unlock(&_pthread_mutex); }
+ void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
private:
- mutable pthread_mutex_t _pthread_mutex;
- mutable bool initialized;
+ mutable pthread_mutex_t pthread_mutex_;
+ mutable bool initialized_;
};
-Mutex::Mutex() : _impl(new Mutex::impl()) {}
+Mutex::Mutex() : impl_(new Mutex::impl()) {}
-void Mutex::lock() const { _impl->lock(); }
+void Mutex::lock() const { impl_->lock(); }
-void Mutex::unlock() const { _impl->unlock(); }
+void Mutex::unlock() const { impl_->unlock(); }
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
index 9eceb49..de52bbd 100644
--- a/lib/cpp/src/concurrency/Mutex.h
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -18,20 +18,20 @@
private:
class impl;
- impl* _impl;
+ impl* impl_;
};
class MutexMonitor {
public:
- MutexMonitor(const Mutex& value) : _mutex(value) {
- _mutex.lock();
+ MutexMonitor(const Mutex& value) : mutex_(value) {
+ mutex_.lock();
}
~MutexMonitor() {
- _mutex.unlock();
+ mutex_.unlock();
}
private:
- const Mutex& _mutex;
+ const Mutex& mutex_;
};
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index 130976c..74a3ec3 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -33,21 +33,21 @@
static void* threadMain(void* arg);
private:
- pthread_t _pthread;
- STATE _state;
- int _policy;
- int _priority;
- int _stackSize;
- weak_ptr<PthreadThread> _self;
+ pthread_t pthread_;
+ STATE state_;
+ int policy_;
+ int priority_;
+ int stackSize_;
+ weak_ptr<PthreadThread> self_;
public:
PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
- _pthread(0),
- _state(uninitialized),
- _policy(policy),
- _priority(priority),
- _stackSize(stackSize) {
+ pthread_(0),
+ state_(uninitialized),
+ policy_(policy),
+ priority_(priority),
+ stackSize_(stackSize) {
this->Thread::runnable(runnable);
}
@@ -55,38 +55,38 @@
~PthreadThread() {}
void start() {
- if (_state != uninitialized) {
+ if (state_ != uninitialized) {
return;
}
- _state = starting;
+ state_ = starting;
pthread_attr_t thread_attr;
assert(pthread_attr_init(&thread_attr) == 0);
assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
// Set thread stack size
- assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
+ assert(pthread_attr_setstacksize(&thread_attr, MB * stackSize_) == 0);
// Set thread policy
- assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
+ assert(pthread_attr_setschedpolicy(&thread_attr, policy_) == 0);
struct sched_param sched_param;
- sched_param.sched_priority = _priority;
+ sched_param.sched_priority = priority_;
// Set thread priority
assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
// Create reference
shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
- *selfRef = _self.lock();
- assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0);
+ *selfRef = self_.lock();
+ assert(pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) == 0);
}
void join() {
- if (_state != stopped) {
+ if (state_ != stopped) {
void* ignore;
- pthread_join(_pthread, &ignore);
+ pthread_join(pthread_, &ignore);
}
}
@@ -96,7 +96,7 @@
void weakRef(shared_ptr<PthreadThread> self) {
assert(self.get() == this);
- _self = weak_ptr<PthreadThread>(self);
+ self_ = weak_ptr<PthreadThread>(self);
}
};
@@ -109,14 +109,14 @@
return (void*)0;
}
- if (thread->_state != starting) {
+ if (thread->state_ != starting) {
return (void*)0;
}
- thread->_state = starting;
+ thread->state_ = starting;
thread->runnable()->run();
- if (thread->_state != stopping && thread->_state != stopped) {
- thread->_state = stopping;
+ if (thread->state_ != stopping && thread->state_ != stopped) {
+ thread->state_ = stopping;
}
return (void*)0;
@@ -128,10 +128,10 @@
class PosixThreadFactory::Impl {
private:
- POLICY _policy;
- PRIORITY _priority;
- int _stackSize;
- bool _detached;
+ POLICY policy_;
+ PRIORITY priority_;
+ int stackSize_;
+ bool detached_;
/**
* Converts generic posix thread schedule policy enums into pthread
@@ -173,10 +173,10 @@
public:
Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
- _policy(policy),
- _priority(priority),
- _stackSize(stackSize),
- _detached(detached) {}
+ policy_(policy),
+ priority_(priority),
+ stackSize_(stackSize),
+ detached_(detached) {}
/**
* Creates a new POSIX thread to run the runnable object
@@ -184,17 +184,17 @@
* @param runnable A runnable object
*/
shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
- shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable));
+ shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable));
result->weakRef(result);
runnable->thread(result);
return result;
}
- int stackSize() const { return _stackSize; }
+ int stackSize() const { return stackSize_; }
- void stackSize(int value) { _stackSize = value; }
+ void stackSize(int value) { stackSize_ = value; }
- PRIORITY priority() const { return _priority; }
+ PRIORITY priority() const { return priority_; }
/**
* Sets priority.
@@ -202,20 +202,20 @@
* XXX
* Need to handle incremental priorities properly.
*/
- void priority(PRIORITY value) { _priority = value; }
+ void priority(PRIORITY value) { priority_ = value; }
};
PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
- _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
+ impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
-shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return _impl->newThread(runnable); }
+shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
-int PosixThreadFactory::stackSize() const { return _impl->stackSize(); }
+int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
-void PosixThreadFactory::stackSize(int value) { _impl->stackSize(value); }
+void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
-PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return _impl->priority(); }
+PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
-void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { _impl->priority(value); }
+void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
index a56999c..4ad9933 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -79,7 +79,7 @@
private:
class Impl;
- shared_ptr<Impl> _impl;
+ shared_ptr<Impl> impl_;
};
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index 24d5908..600b508 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -26,16 +26,16 @@
* Gets the thread object that is hosting this runnable object - can return
* an empty shared pointer if no references remain on thet thread object
*/
- virtual shared_ptr<Thread> thread() { return _thread.lock(); }
+ virtual shared_ptr<Thread> thread() { return thread_.lock(); }
/**
* Sets the thread that is executing this object. This is only meant for
* use by concrete implementations of Thread.
*/
- virtual void thread(shared_ptr<Thread> value) { _thread = value; }
+ virtual void thread(shared_ptr<Thread> value) { thread_ = value; }
private:
- weak_ptr<Thread> _thread;
+ weak_ptr<Thread> thread_;
};
/**
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index 7d6fef7..895d1cd 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -32,10 +32,10 @@
public:
Impl() :
- _workerCount(0),
- _workerMaxCount(0),
- _idleCount(0),
- _state(ThreadManager::UNINITIALIZED) {}
+ workerCount_(0),
+ workerMaxCount_(0),
+ idleCount_(0),
+ state_(ThreadManager::UNINITIALIZED) {}
~Impl() { stop(); }
@@ -43,37 +43,41 @@
void stop();
- const ThreadManager::STATE state() const { return _state; }
+ const ThreadManager::STATE state() const {
+ return state_;
+ }
shared_ptr<ThreadFactory> threadFactory() const {
- Synchronized s(_monitor);
- return _threadFactory;
+ Synchronized s(monitor_);
+ return threadFactory_;
}
void threadFactory(shared_ptr<ThreadFactory> value) {
- Synchronized s(_monitor);
- _threadFactory = value;
+ Synchronized s(monitor_);
+ threadFactory_ = value;
}
void addWorker(size_t value);
void removeWorker(size_t value);
- size_t idleWorkerCount() const { return _idleCount; }
+ size_t idleWorkerCount() const {
+ return idleCount_;
+ }
size_t workerCount() const {
- Synchronized s(_monitor);
- return _workerCount;
+ Synchronized s(monitor_);
+ return workerCount_;
}
size_t pendingTaskCount() const {
- Synchronized s(_monitor);
- return _tasks.size();
+ Synchronized s(monitor_);
+ return tasks_.size();
}
size_t totalTaskCount() const {
- Synchronized s(_monitor);
- return _tasks.size() + _workerCount - _idleCount;
+ Synchronized s(monitor_);
+ return tasks_.size() + workerCount_ - idleCount_;
}
void add(shared_ptr<Runnable> value);
@@ -81,21 +85,21 @@
void remove(shared_ptr<Runnable> task);
private:
- size_t _workerCount;
- size_t _workerMaxCount;
- size_t _idleCount;
- ThreadManager::STATE _state;
- shared_ptr<ThreadFactory> _threadFactory;
+ size_t workerCount_;
+ size_t workerMaxCount_;
+ size_t idleCount_;
+ ThreadManager::STATE state_;
+ shared_ptr<ThreadFactory> threadFactory_;
friend class ThreadManager::Task;
- std::queue<shared_ptr<Task> > _tasks;
- Monitor _monitor;
- Monitor _workerMonitor;
+ std::queue<shared_ptr<Task> > tasks_;
+ Monitor monitor_;
+ Monitor workerMonitor_;
friend class ThreadManager::Worker;
- std::set<shared_ptr<Thread> > _workers;
- std::set<shared_ptr<Thread> > _deadWorkers;
+ std::set<shared_ptr<Thread> > workers_;
+ std::set<shared_ptr<Thread> > deadWorkers_;
};
class ThreadManager::Task : public Runnable {
@@ -109,22 +113,22 @@
};
Task(shared_ptr<Runnable> runnable) :
- _runnable(runnable),
- _state(WAITING) {}
+ runnable_(runnable),
+ state_(WAITING) {}
~Task() {}
void run() {
- if (_state == EXECUTING) {
- _runnable->run();
- _state = COMPLETE;
+ if (state_ == EXECUTING) {
+ runnable_->run();
+ state_ = COMPLETE;
}
}
private:
- shared_ptr<Runnable> _runnable;
+ shared_ptr<Runnable> runnable_;
friend class ThreadManager::Worker;
- STATE _state;
+ STATE state_;
};
class ThreadManager::Worker: public Runnable {
@@ -138,13 +142,15 @@
public:
Worker(ThreadManager::Impl* manager) :
- _manager(manager),
- _state(UNINITIALIZED),
- _idle(false) {}
+ manager_(manager),
+ state_(UNINITIALIZED),
+ idle_(false) {}
~Worker() {}
- bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount; }
+ bool isActive() const {
+ return manager_->workerCount_ <= manager_->workerMaxCount_;
+ }
/**
* Worker entry point
@@ -164,17 +170,17 @@
* since that is what the manager blocks on for worker add/remove
*/
{
- Synchronized s(_manager->_monitor);
- active = _manager->_workerCount < _manager->_workerMaxCount;
+ Synchronized s(manager_->monitor_);
+ active = manager_->workerCount_ < manager_->workerMaxCount_;
if (active) {
- _manager->_workerCount++;
- notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
+ manager_->workerCount_++;
+ notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
}
}
if (notifyManager) {
- Synchronized s(_manager->_workerMonitor);
- _manager->_workerMonitor.notify();
+ Synchronized s(manager_->workerMonitor_);
+ manager_->workerMonitor_.notify();
notifyManager = false;
}
@@ -191,34 +197,34 @@
* the manager will see it.
*/
{
- Synchronized s(_manager->_monitor);
+ Synchronized s(manager_->monitor_);
active = isActive();
- while (active && _manager->_tasks.empty()) {
- _manager->_idleCount++;
- _idle = true;
- _manager->_monitor.wait();
+ while (active && manager_->tasks_.empty()) {
+ manager_->idleCount_++;
+ idle_ = true;
+ manager_->monitor_.wait();
active = isActive();
- _idle = false;
- _manager->_idleCount--;
+ idle_ = false;
+ manager_->idleCount_--;
}
if (active) {
- if (!_manager->_tasks.empty()) {
- task = _manager->_tasks.front();
- _manager->_tasks.pop();
- if (task->_state == ThreadManager::Task::WAITING) {
- task->_state = ThreadManager::Task::EXECUTING;
+ if (!manager_->tasks_.empty()) {
+ task = manager_->tasks_.front();
+ manager_->tasks_.pop();
+ if (task->state_ == ThreadManager::Task::WAITING) {
+ task->state_ = ThreadManager::Task::EXECUTING;
}
}
} else {
- _idle = true;
- _manager->_workerCount--;
- notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
+ idle_ = true;
+ manager_->workerCount_--;
+ notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
}
}
if (task != NULL) {
- if (task->_state == ThreadManager::Task::EXECUTING) {
+ if (task->state_ == ThreadManager::Task::EXECUTING) {
try {
task->run();
} catch(...) {
@@ -229,10 +235,10 @@
}
{
- Synchronized s(_manager->_workerMonitor);
- _manager->_deadWorkers.insert(this->thread());
+ Synchronized s(manager_->workerMonitor_);
+ manager_->deadWorkers_.insert(this->thread());
if (notifyManager) {
- _manager->_workerMonitor.notify();
+ manager_->workerMonitor_.notify();
}
}
@@ -240,10 +246,10 @@
}
private:
- ThreadManager::Impl* _manager;
+ ThreadManager::Impl* manager_;
friend class ThreadManager::Impl;
- STATE _state;
- bool _idle;
+ STATE state_;
+ bool idle_;
};
@@ -252,68 +258,68 @@
for (size_t ix = 0; ix < value; ix++) {
class ThreadManager::Worker;
shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
- newThreads.insert(_threadFactory->newThread(worker));
+ newThreads.insert(threadFactory_->newThread(worker));
}
{
- Synchronized s(_monitor);
- _workerMaxCount+= value;
- _workers.insert(newThreads.begin(), newThreads.end());
+ Synchronized s(monitor_);
+ workerMaxCount_ += value;
+ workers_.insert(newThreads.begin(), newThreads.end());
}
for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
- worker->_state = ThreadManager::Worker::STARTING;
+ worker->state_ = ThreadManager::Worker::STARTING;
(*ix)->start();
}
{
- Synchronized s(_workerMonitor);
- while (_workerCount != _workerMaxCount) {
- _workerMonitor.wait();
+ Synchronized s(workerMonitor_);
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
}
}
}
void ThreadManager::Impl::start() {
- if (_state == ThreadManager::STOPPED) {
+ if (state_ == ThreadManager::STOPPED) {
return;
}
{
- Synchronized s(_monitor);
- if (_state == ThreadManager::UNINITIALIZED) {
- if (_threadFactory == NULL) {
+ Synchronized s(monitor_);
+ if (state_ == ThreadManager::UNINITIALIZED) {
+ if (threadFactory_ == NULL) {
throw InvalidArgumentException();
}
- _state = ThreadManager::STARTED;
- _monitor.notifyAll();
+ state_ = ThreadManager::STARTED;
+ monitor_.notifyAll();
}
- while (_state == STARTING) {
- _monitor.wait();
+ while (state_ == STARTING) {
+ monitor_.wait();
}
}
}
void ThreadManager::Impl::stop() {
bool doStop = false;
- if (_state == ThreadManager::STOPPED) {
+ if (state_ == ThreadManager::STOPPED) {
return;
}
{
- Synchronized s(_monitor);
- if (!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
+ Synchronized s(monitor_);
+ if (!state_ != ThreadManager::STOPPING && state_ != ThreadManager::STOPPED) {
doStop = true;
- _state = ThreadManager::STOPPING;
+ state_ = ThreadManager::STOPPING;
}
}
if (doStop) {
- removeWorker(_workerCount);
- _state = ThreadManager::STOPPING;
+ removeWorker(workerCount_);
+ state_ = ThreadManager::STOPPING;
}
// XXX
@@ -324,56 +330,56 @@
void ThreadManager::Impl::removeWorker(size_t value) {
std::set<shared_ptr<Thread> > removedThreads;
{
- Synchronized s(_monitor);
- if (value > _workerMaxCount) {
+ Synchronized s(monitor_);
+ if (value > workerMaxCount_) {
throw InvalidArgumentException();
}
- _workerMaxCount-= value;
+ workerMaxCount_ -= value;
- if (_idleCount < value) {
- for (size_t ix = 0; ix < _idleCount; ix++) {
- _monitor.notify();
+ if (idleCount_ < value) {
+ for (size_t ix = 0; ix < idleCount_; ix++) {
+ monitor_.notify();
}
} else {
- _monitor.notifyAll();
+ monitor_.notifyAll();
}
}
{
- Synchronized s(_workerMonitor);
+ Synchronized s(workerMonitor_);
- while (_workerCount != _workerMaxCount) {
- _workerMonitor.wait();
+ while (workerCount_ != workerMaxCount_) {
+ workerMonitor_.wait();
}
- for (std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
- _workers.erase(*ix);
+ for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
+ workers_.erase(*ix);
}
- _deadWorkers.clear();
+ deadWorkers_.clear();
}
}
void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
- Synchronized s(_monitor);
+ Synchronized s(monitor_);
- if (_state != ThreadManager::STARTED) {
+ if (state_ != ThreadManager::STARTED) {
throw IllegalStateException();
}
- _tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
+ tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
// If idle thread is available notify it, otherwise all worker threads are
// running and will get around to this task in time.
- if (_idleCount > 0) {
- _monitor.notify();
+ if (idleCount_ > 0) {
+ monitor_.notify();
}
}
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
- Synchronized s(_monitor);
- if (_state != ThreadManager::STARTED) {
+ Synchronized s(monitor_);
+ if (state_ != ThreadManager::STARTED) {
throw IllegalStateException();
}
}
@@ -382,19 +388,19 @@
public:
SimpleThreadManager(size_t workerCount=4) :
- _workerCount(workerCount),
- _firstTime(true) {
+ workerCount_(workerCount),
+ firstTime_(true) {
}
void start() {
ThreadManager::Impl::start();
- addWorker(_workerCount);
+ addWorker(workerCount_);
}
private:
- const size_t _workerCount;
- bool _firstTime;
- Monitor _monitor;
+ const size_t workerCount_;
+ bool firstTime_;
+ Monitor monitor_;
};
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index f48df4e..4286440 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -28,8 +28,8 @@
};
Task(shared_ptr<Runnable> runnable) :
- _runnable(runnable),
- _state(WAITING) {}
+ runnable_(runnable),
+ state_(WAITING) {}
~Task() {
//debug
@@ -37,24 +37,24 @@
}
void run() {
- if (_state == EXECUTING) {
- _runnable->run();
- _state = COMPLETE;
+ if (state_ == EXECUTING) {
+ runnable_->run();
+ state_ = COMPLETE;
}
}
private:
- shared_ptr<Runnable> _runnable;
+ shared_ptr<Runnable> runnable_;
class TimerManager::Dispatcher;
friend class TimerManager::Dispatcher;
- STATE _state;
+ STATE state_;
};
class TimerManager::Dispatcher: public Runnable {
public:
Dispatcher(TimerManager* manager) :
- _manager(manager) {}
+ manager_(manager) {}
~Dispatcher() {
// debug
@@ -64,45 +64,45 @@
/**
* Dispatcher entry point
*
- * As long as dispatcher thread is running, pull tasks off the task _taskMap
+ * As long as dispatcher thread is running, pull tasks off the task taskMap_
* and execute.
*/
void run() {
{
- Synchronized s(_manager->_monitor);
- if (_manager->_state == TimerManager::STARTING) {
- _manager->_state = TimerManager::STARTED;
- _manager->_monitor.notifyAll();
+ Synchronized s(manager_->monitor_);
+ if (manager_->state_ == TimerManager::STARTING) {
+ manager_->state_ = TimerManager::STARTED;
+ manager_->monitor_.notifyAll();
}
}
do {
std::set<shared_ptr<TimerManager::Task> > expiredTasks;
{
- Synchronized s(_manager->_monitor);
+ Synchronized s(manager_->monitor_);
task_iterator expiredTaskEnd;
long long now = Util::currentTime();
- while (_manager->_state == TimerManager::STARTED &&
- (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
+ while (manager_->state_ == TimerManager::STARTED &&
+ (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
long long timeout = 0LL;
- if (!_manager->_taskMap.empty()) {
- timeout = _manager->_taskMap.begin()->first - now;
+ if (!manager_->taskMap_.empty()) {
+ timeout = manager_->taskMap_.begin()->first - now;
}
- assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
- _manager->_monitor.wait(timeout);
+ assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
+ manager_->monitor_.wait(timeout);
now = Util::currentTime();
}
- if (_manager->_state == TimerManager::STARTED) {
- for (task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
+ if (manager_->state_ == TimerManager::STARTED) {
+ for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
shared_ptr<TimerManager::Task> task = ix->second;
expiredTasks.insert(task);
- if (task->_state == TimerManager::Task::WAITING) {
- task->_state = TimerManager::Task::EXECUTING;
+ if (task->state_ == TimerManager::Task::WAITING) {
+ task->state_ = TimerManager::Task::EXECUTING;
}
- _manager->_taskCount--;
+ manager_->taskCount_--;
}
- _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
+ manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
}
}
@@ -110,27 +110,27 @@
(*ix)->run();
}
- } while (_manager->_state == TimerManager::STARTED);
+ } while (manager_->state_ == TimerManager::STARTED);
{
- Synchronized s(_manager->_monitor);
- if (_manager->_state == TimerManager::STOPPING) {
- _manager->_state = TimerManager::STOPPED;
- _manager->_monitor.notify();
+ Synchronized s(manager_->monitor_);
+ if (manager_->state_ == TimerManager::STOPPING) {
+ manager_->state_ = TimerManager::STOPPED;
+ manager_->monitor_.notify();
}
}
return;
}
private:
- TimerManager* _manager;
+ TimerManager* manager_;
friend class TimerManager;
};
TimerManager::TimerManager() :
- _taskCount(0),
- _state(TimerManager::UNINITIALIZED),
- _dispatcher(shared_ptr<Dispatcher>(new Dispatcher(this))) {
+ taskCount_(0),
+ state_(TimerManager::UNINITIALIZED),
+ dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
}
@@ -140,7 +140,7 @@
// the monitor here, since stop already takes care of reentrancy.
std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
- if (_state != STOPPED) {
+ if (state_ != STOPPED) {
try {
stop();
} catch(...) {
@@ -154,69 +154,69 @@
void TimerManager::start() {
bool doStart = false;
{
- Synchronized s(_monitor);
- if (_threadFactory == NULL) {
+ Synchronized s(monitor_);
+ if (threadFactory_ == NULL) {
throw InvalidArgumentException();
}
- if (_state == TimerManager::UNINITIALIZED) {
- _state = TimerManager::STARTING;
+ if (state_ == TimerManager::UNINITIALIZED) {
+ state_ = TimerManager::STARTING;
doStart = true;
}
}
if (doStart) {
- _dispatcherThread = _threadFactory->newThread(_dispatcher);
- _dispatcherThread->start();
+ dispatcherThread_ = threadFactory_->newThread(dispatcher_);
+ dispatcherThread_->start();
}
{
- Synchronized s(_monitor);
- while (_state == TimerManager::STARTING) {
- _monitor.wait();
+ Synchronized s(monitor_);
+ while (state_ == TimerManager::STARTING) {
+ monitor_.wait();
}
- assert(_state != TimerManager::STARTING);
+ assert(state_ != TimerManager::STARTING);
}
}
void TimerManager::stop() {
bool doStop = false;
{
- Synchronized s(_monitor);
- if (_state == TimerManager::UNINITIALIZED) {
- _state = TimerManager::STOPPED;
- } else if (_state != STOPPING && _state != STOPPED) {
+ Synchronized s(monitor_);
+ if (state_ == TimerManager::UNINITIALIZED) {
+ state_ = TimerManager::STOPPED;
+ } else if (state_ != STOPPING && state_ != STOPPED) {
doStop = true;
- _state = STOPPING;
- _monitor.notifyAll();
+ state_ = STOPPING;
+ monitor_.notifyAll();
}
- while (_state != STOPPED) {
- _monitor.wait();
+ while (state_ != STOPPED) {
+ monitor_.wait();
}
}
if (doStop) {
// Clean up any outstanding tasks
- for (task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
- _taskMap.erase(ix);
+ for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) {
+ taskMap_.erase(ix);
}
// Remove dispatcher's reference to us.
- _dispatcher->_manager = NULL;
+ dispatcher_->manager_ = NULL;
}
}
shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
- Synchronized s(_monitor);
- return _threadFactory;
+ Synchronized s(monitor_);
+ return threadFactory_;
}
void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
- Synchronized s(_monitor);
- _threadFactory = value;
+ Synchronized s(monitor_);
+ threadFactory_ = value;
}
size_t TimerManager::taskCount() const {
- return _taskCount;
+ return taskCount_;
}
void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
@@ -224,19 +224,19 @@
timeout += now;
{
- Synchronized s(_monitor);
- if (_state != TimerManager::STARTED) {
+ Synchronized s(monitor_);
+ if (state_ != TimerManager::STARTED) {
throw IllegalStateException();
}
- _taskCount++;
- _taskMap.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
+ taskCount_++;
+ taskMap_.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
// If the task map was empty, or if we have an expiration that is earlier
// than any previously seen, kick the dispatcher so it can update its
// timeout
- if (_taskCount == 1 || timeout < _taskMap.begin()->first) {
- _monitor.notify();
+ if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
+ monitor_.notify();
}
}
}
@@ -257,13 +257,13 @@
void TimerManager::remove(shared_ptr<Runnable> task) {
- Synchronized s(_monitor);
- if (_state != TimerManager::STARTED) {
+ Synchronized s(monitor_);
+ if (state_ != TimerManager::STARTED) {
throw IllegalStateException();
}
}
-const TimerManager::STATE TimerManager::state() const { return _state; }
+const TimerManager::STATE TimerManager::state() const { return state_; }
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h
index 50a0c13..78782d9 100644
--- a/lib/cpp/src/concurrency/TimerManager.h
+++ b/lib/cpp/src/concurrency/TimerManager.h
@@ -86,17 +86,17 @@
virtual const STATE state() const;
private:
- shared_ptr<const ThreadFactory> _threadFactory;
+ shared_ptr<const ThreadFactory> threadFactory_;
class Task;
friend class Task;
- std::multimap<long long, shared_ptr<Task> > _taskMap;
- size_t _taskCount;
- Monitor _monitor;
- STATE _state;
+ std::multimap<long long, shared_ptr<Task> > taskMap_;
+ size_t taskCount_;
+ Monitor monitor_;
+ STATE state_;
class Dispatcher;
friend class Dispatcher;
- shared_ptr<Dispatcher> _dispatcher;
- shared_ptr<Thread> _dispatcherThread;
+ shared_ptr<Dispatcher> dispatcher_;
+ shared_ptr<Thread> dispatcherThread_;
};
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/server/TNonblockingServer.cc b/lib/cpp/src/server/TNonblockingServer.cc
new file mode 100644
index 0000000..14fb5bc
--- /dev/null
+++ b/lib/cpp/src/server/TNonblockingServer.cc
@@ -0,0 +1,476 @@
+#include "TNonblockingServer.h"
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+
+namespace facebook { namespace thrift { namespace server {
+
+void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
+ socket_ = socket;
+ server_ = s;
+ appState_ = APP_INIT;
+ eventFlags_ = 0;
+
+ readBufferPos_ = 0;
+ readWant_ = 0;
+
+ writeBuffer_ = NULL;
+ writeBufferSize_ = 0;
+ writeBufferPos_ = 0;
+
+ socketState_ = SOCKET_RECV;
+ appState_ = APP_INIT;
+
+ // Set flags, which also registers the event
+ setFlags(eventFlags);
+}
+
+void TConnection::workSocket() {
+ int flags;
+
+ switch (socketState_) {
+ case SOCKET_RECV:
+ // It is an error to be in this state if we already have all the data
+ assert(readBufferPos_ < readWant_);
+
+ // How much space is availble, and how much will we fetch
+ uint32_t avail = readBufferSize_ - readBufferPos_;
+ uint32_t fetch = readWant_ - readBufferPos_;
+
+ // Double the buffer size until it is big enough
+ if (fetch > avail) {
+ while (fetch > avail) {
+ readBufferSize_ *= 2;
+ }
+ readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
+ if (readBuffer_ == NULL) {
+ perror("TConnection::workSocket() realloc");
+ close();
+ return;
+ }
+ }
+
+ // Read from the socket
+ int got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
+
+ if (got > 0) {
+ // Move along in the buffer
+ readBufferPos_ += got;
+
+ // Check that we did not overdo it
+ assert(readBufferPos_ <= readWant_);
+
+ // We are done reading, move onto the next state
+ if (readBufferPos_ == readWant_) {
+ transition();
+ }
+ return;
+ } else if (got == -1) {
+ // Blocking errors are okay, just move on
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return;
+ }
+
+ if (errno != ECONNRESET) {
+ perror("TConnection::workSocket() recv -1");
+ }
+ }
+
+ // Whenever we get down here it means a remote disconnect
+ close();
+
+ return;
+
+ case SOCKET_SEND:
+ // Should never have position past size
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // If there is no data to send, then let us move on
+ if (writeBufferPos_ == writeBufferSize_) {
+ fprintf(stderr, "WARNING: Send state with no data to send\n");
+ transition();
+ return;
+ }
+
+ flags = 0;
+ #ifdef MSG_NOSIGNAL
+ // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+ // check for the EPIPE return condition and close the socket in that case
+ flags |= MSG_NOSIGNAL;
+ #endif // ifdef MSG_NOSIGNAL
+
+ int left = writeBufferSize_ - writeBufferPos_;
+ int sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
+
+ if (sent <= 0) {
+ // Blocking errors are okay, just move on
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return;
+ }
+ if (errno != EPIPE) {
+ perror("TConnection::workSocket() send -1");
+ }
+ close();
+ return;
+ }
+
+ writeBufferPos_ += sent;
+
+ // Did we overdo it?
+ assert(writeBufferPos_ <= writeBufferSize_);
+
+ // We are done!
+ if (writeBufferPos_ == writeBufferSize_) {
+ transition();
+ }
+
+ return;
+
+ default:
+ fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
+ assert(0);
+ }
+}
+
+/**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+void TConnection::transition() {
+ // Switch upon the state that we are currently in and move to a new state
+ switch (appState_) {
+
+ case APP_READ_REQUEST:
+ // We are done reading the request, package the read buffer into transport
+ // and get back some data from the dispatch function
+ inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+ outputTransport_->resetBuffer();
+
+ try {
+ // Invoke the processor
+ server_->getProcessor()->process(inputTransport_, outputTransport_);
+ } catch (TTransportException &x) {
+ fprintf(stderr, "Server::process %s\n", x.getMessage().c_str());
+ close();
+ return;
+ } catch (...) {
+ fprintf(stderr, "Server::process() unknown exception\n");
+ close();
+ return;
+ }
+
+
+ // Get the result of the operation
+ outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+
+ // If the function call generated return data, then move into the send
+ // state and get going
+ if (writeBufferSize_ > 0) {
+
+ // Move into write state
+ writeBufferPos_ = 0;
+ socketState_ = SOCKET_SEND;
+ appState_ = APP_SEND_RESULT;
+
+ // Socket into write mode
+ setWrite();
+
+ // Try to work the socket immediately
+ workSocket();
+
+ return;
+ }
+
+ // In this case, the request was asynchronous and we should fall through
+ // right back into the read frame header state
+
+ case APP_SEND_RESULT:
+
+ // N.B.: We also intentionally fall through here into the INIT state!
+
+ case APP_INIT:
+
+ // Clear write buffer variables
+ writeBuffer_ = NULL;
+ writeBufferPos_ = 0;
+ writeBufferSize_ = 0;
+
+ // Set up read buffer for getting 4 bytes
+ readBufferPos_ = 0;
+ readWant_ = 4;
+
+ // Into read4 state we go
+ socketState_ = SOCKET_RECV;
+ appState_ = APP_READ_FRAME_SIZE;
+
+ // Register read event
+ setRead();
+
+ // Try to work the socket right away
+ workSocket();
+
+ return;
+
+ case APP_READ_FRAME_SIZE:
+ // We just read the request length, deserialize it
+ int sz = *(int32_t*)readBuffer_;
+ sz = (int32_t)ntohl(sz);
+
+ if (sz <= 0) {
+ fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
+ close();
+ return;
+ }
+
+ // Reset the read buffer
+ readWant_ = (uint32_t)sz;
+ readBufferPos_= 0;
+
+ // Move into read request state
+ appState_ = APP_READ_REQUEST;
+
+ // Work the socket right away
+ workSocket();
+
+ return;
+
+ default:
+ fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
+ assert(0);
+ }
+}
+
+void TConnection::setFlags(short eventFlags) {
+ // Catch the do nothing case
+ if (eventFlags_ == eventFlags) {
+ return;
+ }
+
+ // Delete a previously existing event
+ if (eventFlags_ != 0) {
+ if (event_del(&event_) == -1) {
+ perror("TConnection::setFlags event_del");
+ return;
+ }
+ }
+
+ // Update in memory structure
+ eventFlags_ = eventFlags;
+
+ /**
+ * event_set:
+ *
+ * Prepares the event structure &event to be used in future calls to
+ * event_add() and event_del(). The event will be prepared to call the
+ * event_handler using the 'sock' file descriptor to monitor events.
+ *
+ * The events can be either EV_READ, EV_WRITE, or both, indicating
+ * that an application can read or write from the file respectively without
+ * blocking.
+ *
+ * The event_handler will be called with the file descriptor that triggered
+ * the event and the type of event which will be one of: EV_TIMEOUT,
+ * EV_SIGNAL, EV_READ, EV_WRITE.
+ *
+ * The additional flag EV_PERSIST makes an event_add() persistent until
+ * event_del() has been called.
+ *
+ * Once initialized, the &event struct can be used repeatedly with
+ * event_add() and event_del() and does not need to be reinitialized unless
+ * the event_handler and/or the argument to it are to be changed. However,
+ * when an ev structure has been added to libevent using event_add() the
+ * structure must persist until the event occurs (assuming EV_PERSIST
+ * is not set) or is removed using event_del(). You may not reuse the same
+ * ev structure for multiple monitored descriptors; each descriptor needs
+ * its own ev.
+ */
+ event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
+
+ // Add the event
+ if (event_add(&event_, 0) == -1) {
+ perror("TConnection::setFlags(): coult not event_add");
+ }
+}
+
+/**
+ * Closes a connection
+ */
+void TConnection::close() {
+ // Delete the registered libevent
+ if (event_del(&event_) == -1) {
+ perror("TConnection::close() event_del");
+ }
+
+ // Close the socket
+ if (socket_ > 0) {
+ ::close(socket_);
+ }
+ socket_ = 0;
+
+ // Give this object back to the server that owns it
+ server_->returnConnection(this);
+}
+
+/**
+ * Creates a new connection either by reusing an object off the stack or
+ * by allocating a new one entirely
+ */
+TConnection* TNonblockingServer::createConnection(int socket, short flags) {
+ // Check the stack
+ if (connectionStack_.empty()) {
+ return new TConnection(socket, flags, this);
+ } else {
+ TConnection* result = connectionStack_.top();
+ connectionStack_.pop();
+ result->init(socket, flags, this);
+ return result;
+ }
+}
+
+/**
+ * Returns a connection to the stack
+ */
+void TNonblockingServer::returnConnection(TConnection* connection) {
+ connectionStack_.push(connection);
+}
+
+/**
+ * Server socket had something happen
+ */
+void TNonblockingServer::handleEvent(int fd, short which) {
+ // Make sure that libevent didn't fuck up the socket handles
+ assert(fd == serverSocket_);
+
+ // Server socket accepted a new connection
+ socklen_t addrLen;
+ struct sockaddr addr;
+ addrLen = sizeof(addr);
+
+ // Going to accept a new client socket
+ int clientSocket;
+
+ // Accept as many new clients as possible, even though libevent signaled only
+ // one, this helps us to avoid having to go back into the libevent engine so
+ // many times
+ while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
+
+ // Explicitly set this socket to NONBLOCK mode
+ int flags;
+ if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
+ fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
+ perror("thriftServerEventHandler: set O_NONBLOCK");
+ close(clientSocket);
+ return;
+ }
+
+ // Create a new TConnection for this client socket.
+ TConnection* clientConnection =
+ createConnection(clientSocket, EV_READ | EV_PERSIST);
+
+ // Fail fast if we could not create a TConnection object
+ if (clientConnection == NULL) {
+ fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
+ close(clientSocket);
+ return;
+ }
+
+ // Put this client connection into the proper state
+ clientConnection->transition();
+ }
+
+ // Done looping accept, now we have to make sure the error is due to
+ // blocking. Any other error is a problem
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ perror("thriftServerEventHandler: accept()");
+ }
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+ // Initialize libevent
+ event_init();
+
+ // Print some libevent stats
+ fprintf(stderr,
+ "libevent %s method %s\n",
+ event_get_version(),
+ event_get_method());
+
+ // Create the server socket
+ serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
+ if (serverSocket_ == -1) {
+ perror("TNonblockingServer::serve() socket() -1");
+ return;
+ }
+
+ // Set socket to nonblocking mode
+ int flags;
+ if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
+ fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
+ perror("TNonblockingServer::serve() O_NONBLOCK");
+ ::close(serverSocket_);
+ return;
+ }
+
+ int one = 1;
+ struct linger ling = {0, 0};
+
+ // Set reuseaddr to avoid 2MSL delay on server restart
+ setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+
+ // Keepalive to ensure full result flushing
+ setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
+
+ // Turn linger off to avoid hung sockets
+ setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+
+ // Set TCP nodelay if available, MAC OS X Hack
+ // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+ #ifndef TCP_NOPUSH
+ setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
+ #endif
+
+ struct sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port_);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
+ perror("TNonblockingServer::serve() bind");
+ close(serverSocket_);
+ return;
+ }
+
+ if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
+ perror("TNonblockingServer::serve() listen");
+ close(serverSocket_);
+ return;
+ }
+
+ // Register the server event
+ struct event serverEvent;
+ event_set(&serverEvent,
+ serverSocket_,
+ EV_READ | EV_PERSIST,
+ TNonblockingServer::eventHandler,
+ this);
+
+ // Add the event and start up the server
+ if (event_add(&serverEvent, 0) == -1) {
+ perror("TNonblockingServer::serve(): coult not event_add");
+ return;
+ }
+
+ // Run libevent engine, never returns, invokes calls to event_handler
+ event_loop(0);
+}
+
+}}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
new file mode 100644
index 0000000..565486c
--- /dev/null
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -0,0 +1,195 @@
+#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
+#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
+
+#include "Thrift.h"
+#include "server/TServer.h"
+#include "transport/TMemoryBuffer.h"
+#include <stack>
+#include <event.h>
+
+#
+
+namespace facebook { namespace thrift { namespace server {
+
+using boost::shared_ptr;
+
+// Forward declaration of class
+class TConnection;
+
+/**
+ * This is a non-blocking server in C++ for high performance that operates a
+ * single IO thread. It assumes that all incoming requests are framed with a
+ * 4 byte length indicator and writes out responses using the same framing.
+ *
+ * It does not use the TServerTransport framework, but rather has socket
+ * operations hardcoded for use with select.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TNonblockingServer : public TServer {
+ private:
+
+ // Listen backlog
+ static const int LISTEN_BACKLOG = 1024;
+
+ // Server socket file descriptor
+ int serverSocket_;
+
+ // Port server runs on
+ int port_;
+
+ /**
+ * This is a stack of all the objects that have been created but that
+ * are NOT currently in use. When we close a connection, we place it on this
+ * stack so that the object can be reused later, rather than freeing the
+ * memory and reallocating a new object later.
+ */
+ std::stack<TConnection*> connectionStack_;
+
+ void handleEvent(int fd, short which);
+
+ public:
+ TNonblockingServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerOptions> options,
+ int port) :
+ TServer(processor, options), serverSocket_(0), port_(port) {}
+
+ ~TNonblockingServer() {}
+
+ TConnection* createConnection(int socket, short flags);
+
+ void returnConnection(TConnection* connection);
+
+ static void eventHandler(int fd, short which, void* v) {
+ ((TNonblockingServer*)v)->handleEvent(fd, which);
+ }
+
+ void serve();
+};
+
+/**
+ * Two states for sockets, recv and send mode
+ */
+enum TSocketState {
+ SOCKET_RECV,
+ SOCKET_SEND
+};
+
+/**
+ * Four states for the nonblocking servr:
+ * 1) initialize
+ * 2) read 4 byte frame size
+ * 3) read frame of data
+ * 4) send back data (if any)
+ */
+enum TAppState {
+ APP_INIT,
+ APP_READ_FRAME_SIZE,
+ APP_READ_REQUEST,
+ APP_SEND_RESULT
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TConnection {
+ private:
+
+ // Server handle
+ TNonblockingServer* server_;
+
+ // Socket handle
+ int socket_;
+
+ // Libevent object
+ struct event event_;
+
+ // Libevent flags
+ short eventFlags_;
+
+ // Socket mode
+ TSocketState socketState_;
+
+ // Application state
+ TAppState appState_;
+
+ // How much data needed to read
+ uint32_t readWant_;
+
+ // Where in the read buffer are we
+ uint32_t readBufferPos_;
+
+ // Read buffer
+ uint8_t* readBuffer_;
+
+ // Read buffer size
+ uint32_t readBufferSize_;
+
+ // Write buffer
+ uint8_t* writeBuffer_;
+
+ // Write buffer size
+ uint32_t writeBufferSize_;
+
+ // How far through writing are we?
+ uint32_t writeBufferPos_;
+
+ // Transport to read from
+ shared_ptr<TMemoryBuffer> inputTransport_;
+
+ // Transport that processor writes to
+ shared_ptr<TMemoryBuffer> outputTransport_;
+
+ // Go into read mode
+ void setRead() {
+ setFlags(EV_READ | EV_PERSIST);
+ }
+
+ // Go into write mode
+ void setWrite() {
+ setFlags(EV_WRITE | EV_PERSIST);
+ }
+
+ // Set event flags
+ void setFlags(short eventFlags);
+
+ // Libevent handlers
+ void workSocket();
+
+ // Close this client and reset
+ void close();
+
+ public:
+
+ // Constructor
+ TConnection(int socket, short eventFlags, TNonblockingServer *s) {
+ readBuffer_ = (uint8_t*)malloc(1024);
+ if (readBuffer_ == NULL) {
+ throw new facebook::thrift::Exception("Out of memory.");
+ }
+ readBufferSize_ = 1024;
+
+ // Allocate input and output tranpsorts
+ inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
+ outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+
+ init(socket, eventFlags, s);
+ }
+
+ // Initialize
+ void init(int socket, short eventFlags, TNonblockingServer *s);
+
+ // Transition into a new state
+ void transition();
+
+ // Handler wrapper
+ static void eventHandler(int fd, short which, void* v) {
+ assert(fd = ((TConnection*)v)->socket_);
+ ((TConnection*)v)->workSocket();
+ }
+};
+
+}}} // facebook::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index ddb320d..eb23b45 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -26,6 +26,10 @@
virtual ~TServer() {}
virtual void serve() = 0;
+ shared_ptr<TProcessor> getProcessor() {
+ return processor_;
+ }
+
protected:
TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
diff --git a/lib/cpp/src/server/TThreadPoolServer.cc b/lib/cpp/src/server/TThreadPoolServer.cc
index 4285b05..43f7463 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cc
+++ b/lib/cpp/src/server/TThreadPoolServer.cc
@@ -12,19 +12,15 @@
using namespace facebook::thrift::transport;
class TThreadPoolServer::Task: public Runnable {
-
- shared_ptr<TProcessor> _processor;
- shared_ptr<TTransport> _input;
- shared_ptr<TTransport> _output;
-
+
public:
Task(shared_ptr<TProcessor> processor,
shared_ptr<TTransport> input,
shared_ptr<TTransport> output) :
- _processor(processor),
- _input(input),
- _output(output) {
+ processor_(processor),
+ input_(input),
+ output_(output) {
}
~Task() {}
@@ -32,16 +28,22 @@
void run() {
while(true) {
try {
- _processor->process(_input, _output);
+ processor_->process(input_, output_);
} catch (TTransportException& ttx) {
break;
} catch(...) {
break;
}
}
- _input->close();
- _output->close();
+ input_->close();
+ output_->close();
}
+
+ private:
+ shared_ptr<TProcessor> processor_;
+ shared_ptr<TTransport> input_;
+ shared_ptr<TTransport> output_;
+
};
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
diff --git a/lib/cpp/src/transport/TMemoryBuffer.cc b/lib/cpp/src/transport/TMemoryBuffer.cc
new file mode 100644
index 0000000..084f297
--- /dev/null
+++ b/lib/cpp/src/transport/TMemoryBuffer.cc
@@ -0,0 +1,45 @@
+#include "TMemoryBuffer.h"
+
+namespace facebook { namespace thrift { namespace transport {
+
+uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) {
+ // Check avaible data for reading
+ uint32_t avail = wPos_ - rPos_;
+
+ // Device how much to give
+ uint32_t give = len;
+ if (avail < len) {
+ give = avail;
+ }
+
+ // Copy into buffer and increment rPos_
+ memcpy(buf, buffer_ + rPos_, give);
+ rPos_ += give;
+
+ return give;
+}
+
+void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) {
+ // Check available space
+ uint32_t avail = bufferSize_ - wPos_;
+
+ // Grow the buffer
+ if (len > avail) {
+ if (!owner_) {
+ throw TTransportException("Insufficient space in external MemoryBuffer");
+ }
+ while (len > avail) {
+ bufferSize_ *= 2;
+ buffer_ = (uint8_t*)realloc(buffer_, bufferSize_);
+ if (buffer_ == NULL) {
+ throw TTransportException("Out of memory.");
+ }
+ }
+ }
+
+ // Copy into the buffer and increment wPos_
+ memcpy(buffer_ + wPos_, buf, len);
+ wPos_ += len;
+}
+
+}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TMemoryBuffer.h b/lib/cpp/src/transport/TMemoryBuffer.h
new file mode 100644
index 0000000..397b186
--- /dev/null
+++ b/lib/cpp/src/transport/TMemoryBuffer.h
@@ -0,0 +1,116 @@
+#ifndef _THRIFT_TRANSPORT_TMEMORYBUFFER_H_
+#define _THRIFT_TRANSPORT_TMEMORYBUFFER_H_ 1
+
+#include "TTransport.h"
+#include <string>
+
+namespace facebook { namespace thrift { namespace transport {
+
+/**
+ * A memory buffer is a tranpsort that simply reads from and writes to an
+ * in memory buffer. Anytime you call write on it, the data is simply placed
+ * into a buffer, and anytime you call read, data is read from that buffer.
+ *
+ * The buffers are allocated using C constructs malloc,realloc, and the size
+ * doubles as necessary.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TMemoryBuffer : public TTransport {
+ public:
+ TMemoryBuffer() {
+ owner_ = true;
+ bufferSize_ = 1024;
+ buffer_ = (uint8_t*)malloc(bufferSize_);
+ if (buffer_ == NULL) {
+ throw TTransportException("Out of memory");
+ }
+ wPos_ = 0;
+ rPos_ = 0;
+ }
+
+ TMemoryBuffer(uint32_t sz) {
+ owner_ = true;
+ bufferSize_ = sz;
+ buffer_ = (uint8_t*)malloc(bufferSize_);
+ if (buffer_ == NULL) {
+ throw TTransportException("Out of memory");
+ }
+ wPos_ = 0;
+ rPos_ = 0;
+ }
+
+ TMemoryBuffer(uint8_t* buf, int sz) {
+ owner_ = false;
+ buffer_ = buf;
+ bufferSize_ = sz;
+ wPos_ = sz;
+ rPos_ = 0;
+ }
+
+ ~TMemoryBuffer() {
+ if (owner_) {
+ if (buffer_ != NULL) {
+ free(buffer_);
+ buffer_ = NULL;
+ }
+ }
+ }
+
+ bool isOpen() {
+ return true;
+ }
+
+
+ void open() {}
+
+ void close() {}
+
+ void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
+ *bufPtr = buffer_;
+ *sz = bufferSize_;
+ }
+
+ void resetBuffer() {
+ wPos_ = 0;
+ rPos_ = 0;
+ }
+
+ void resetBuffer(uint8_t* buf, uint32_t sz) {
+ if (owner_) {
+ if (buffer_ != NULL) {
+ free(buffer_);
+ }
+ }
+ owner_ = false;
+ buffer_ = buf;
+ bufferSize_ = sz;
+ wPos_ = sz;
+ rPos_ = 0;
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ private:
+ // Data buffer
+ uint8_t* buffer_;
+
+ // Allocated buffer size
+ uint32_t bufferSize_;
+
+ // Where the write is at
+ uint32_t wPos_;
+
+ // Where the reader is at
+ uint32_t rPos_;
+
+ // Is this object the owner of the buffer?
+ bool owner_;
+
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
index d65d25b..1a20bd5 100644
--- a/lib/cpp/src/transport/TTransport.h
+++ b/lib/cpp/src/transport/TTransport.h
@@ -63,9 +63,14 @@
*/
virtual uint32_t readAll(uint8_t* buf, uint32_t len) {
uint32_t have = 0;
+ uint32_t get = 0;
while (have < len) {
- have += read(buf+have, len-have);
+ get = read(buf+have, len-have);
+ if (get <= 0) {
+ throw TTransportException("No more data to read.");
+ }
+ have += get;
}
return have;