Thrift: standardize coding style
Summary: Standardize indentation, spacing, #defines etc.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664784 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index a5b8f05..7d6fef7 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -17,48 +17,41 @@
using namespace boost;
-/** ThreadManager class
-
- This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
- it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times.
-
- @author marc
- @version $Id:$ */
-
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads. It never actually creates or destroys worker threads, rather
+ * it maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times.
+ *
+ * @author marc
+ * @version $Id:$
+ */
class ThreadManager::Impl : public ThreadManager {
public:
-
Impl() :
_workerCount(0),
_workerMaxCount(0),
_idleCount(0),
- _state(ThreadManager::UNINITIALIZED)
- {}
+ _state(ThreadManager::UNINITIALIZED) {}
- ~Impl() {
- stop();
- }
+ ~Impl() { stop(); }
void start();
void stop();
- const ThreadManager::STATE state() const {
- return _state;
- };
+ const ThreadManager::STATE state() const { return _state; }
shared_ptr<ThreadFactory> threadFactory() const {
-
Synchronized s(_monitor);
-
return _threadFactory;
}
- void threadFactory(shared_ptr<ThreadFactory> value) {
-
- Synchronized s(_monitor);
-
+ void threadFactory(shared_ptr<ThreadFactory> value) {
+ Synchronized s(_monitor);
_threadFactory = value;
}
@@ -66,26 +59,20 @@
void removeWorker(size_t value);
- size_t idleWorkerCount() const {return _idleCount;}
+ size_t idleWorkerCount() const { return _idleCount; }
size_t workerCount() const {
-
Synchronized s(_monitor);
-
return _workerCount;
}
size_t pendingTaskCount() const {
-
Synchronized s(_monitor);
-
return _tasks.size();
}
size_t totalTaskCount() const {
-
- Synchronized s(_monitor);
-
+ Synchronized s(_monitor);
return _tasks.size() + _workerCount - _idleCount;
}
@@ -94,35 +81,26 @@
void remove(shared_ptr<Runnable> task);
private:
-
size_t _workerCount;
-
size_t _workerMaxCount;
-
size_t _idleCount;
-
ThreadManager::STATE _state;
-
shared_ptr<ThreadFactory> _threadFactory;
+
friend class ThreadManager::Task;
-
std::queue<shared_ptr<Task> > _tasks;
-
Monitor _monitor;
-
Monitor _workerMonitor;
friend class ThreadManager::Worker;
-
std::set<shared_ptr<Thread> > _workers;
-
std::set<shared_ptr<Thread> > _deadWorkers;
};
class ThreadManager::Task : public Runnable {
-public:
+ public:
enum STATE {
WAITING,
EXECUTING,
@@ -132,29 +110,24 @@
Task(shared_ptr<Runnable> runnable) :
_runnable(runnable),
- _state(WAITING)
- {}
+ _state(WAITING) {}
- ~Task() {};
+ ~Task() {}
void run() {
- if(_state == EXECUTING) {
+ if (_state == EXECUTING) {
_runnable->run();
_state = COMPLETE;
}
}
private:
-
shared_ptr<Runnable> _runnable;
-
friend class ThreadManager::Worker;
-
STATE _state;
};
class ThreadManager::Worker: public Runnable {
-
enum STATE {
UNINITIALIZED,
STARTING,
@@ -164,177 +137,139 @@
};
public:
-
Worker(ThreadManager::Impl* manager) :
_manager(manager),
_state(UNINITIALIZED),
- _idle(false)
- {}
+ _idle(false) {}
~Worker() {}
- bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;}
+ bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount; }
- /** Worker entry point
-
- As long as worker thread is running, pull tasks off the task queue and execute. */
-
+ /**
+ * Worker entry point
+ *
+ * As long as worker thread is running, pull tasks off the task queue and
+ * execute.
+ */
void run() {
-
- bool active = false;
-
+ bool active = false;
bool notifyManager = false;
- /** Increment worker semaphore and notify manager if worker count reached desired max
-
- Note
- We have to release the monitor and acquire the workerMonitor since that is what the manager
- blocks on for worker add/remove */
-
- {Synchronized s(_manager->_monitor);
-
+ /**
+ * Increment worker semaphore and notify manager if worker count reached
+ * desired max
+ *
+ * Note: We have to release the monitor and acquire the workerMonitor
+ * since that is what the manager blocks on for worker add/remove
+ */
+ {
+ Synchronized s(_manager->_monitor);
active = _manager->_workerCount < _manager->_workerMaxCount;
-
- if(active) {
-
+ if (active) {
_manager->_workerCount++;
-
notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
}
}
- if(notifyManager) {
-
+ if (notifyManager) {
Synchronized s(_manager->_workerMonitor);
-
_manager->_workerMonitor.notify();
-
notifyManager = false;
}
- while(active) {
-
+ while (active) {
shared_ptr<ThreadManager::Task> task;
- /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
-
- Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented
- such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying
- the next blocked thread but eventually the manager will see it. */
-
- {Synchronized s(_manager->_monitor);
-
+ /**
+ * While holding manager monitor block for non-empty task queue (Also
+ * check that the thread hasn't been requested to stop). Once the queue
+ * is non-empty, dequeue a task, release monitor, and execute. If the
+ * worker max count has been decremented such that we exceed it, mark
+ * ourself inactive, decrement the worker count and notify the manager
+ * (technically we're notifying the next blocked thread but eventually
+ * the manager will see it.
+ */
+ {
+ Synchronized s(_manager->_monitor);
active = isActive();
-
- while(active && _manager->_tasks.empty()) {
-
- _manager->_idleCount++;
-
+ while (active && _manager->_tasks.empty()) {
+ _manager->_idleCount++;
_idle = true;
-
- _manager->_monitor.wait();
-
- active = isActive();
-
+ _manager->_monitor.wait();
+ active = isActive();
_idle = false;
-
- _manager->_idleCount--;
+ _manager->_idleCount--;
}
- if(active) {
-
- if(!_manager->_tasks.empty()) {
-
- task = _manager->_tasks.front();
-
- _manager->_tasks.pop();
-
- if(task->_state == ThreadManager::Task::WAITING) {
-
- task->_state = ThreadManager::Task::EXECUTING;
+ if (active) {
+ if (!_manager->_tasks.empty()) {
+ task = _manager->_tasks.front();
+ _manager->_tasks.pop();
+ if (task->_state == ThreadManager::Task::WAITING) {
+ task->_state = ThreadManager::Task::EXECUTING;
}
}
} else {
-
- _idle = true;
-
+ _idle = true;
_manager->_workerCount--;
-
- notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
+ notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
}
}
-
- if(task != NULL) {
-
- if(task->_state == ThreadManager::Task::EXECUTING) {
+
+ if (task != NULL) {
+ if (task->_state == ThreadManager::Task::EXECUTING) {
try {
-
- task->run();
-
- } catch(...) {
-
- // XXX need to log this
+ task->run();
+ } catch(...) {
+ // XXX need to log this
}
}
}
}
-
- {Synchronized s(_manager->_workerMonitor);
-
+
+ {
+ Synchronized s(_manager->_workerMonitor);
_manager->_deadWorkers.insert(this->thread());
-
- if(notifyManager) {
-
- _manager->_workerMonitor.notify();
+ if (notifyManager) {
+ _manager->_workerMonitor.notify();
}
}
-
+
return;
}
-
- private:
-
- ThreadManager::Impl* _manager;
-
- friend class ThreadManager::Impl;
-
- STATE _state;
-
- bool _idle;
+
+ private:
+ ThreadManager::Impl* _manager;
+ friend class ThreadManager::Impl;
+ STATE _state;
+ bool _idle;
};
-void ThreadManager::Impl::addWorker(size_t value) {
-
+
+ void ThreadManager::Impl::addWorker(size_t value) {
std::set<shared_ptr<Thread> > newThreads;
-
- for(size_t ix = 0; ix < value; ix++) {
-
- class ThreadManager::Worker;
-
+ for (size_t ix = 0; ix < value; ix++) {
+ class ThreadManager::Worker;
shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
-
newThreads.insert(_threadFactory->newThread(worker));
}
- {Synchronized s(_monitor);
-
+ {
+ Synchronized s(_monitor);
_workerMaxCount+= value;
-
_workers.insert(newThreads.begin(), newThreads.end());
}
- for(std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
+ for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
-
worker->_state = ThreadManager::Worker::STARTING;
-
(*ix)->start();
}
- {Synchronized s(_workerMonitor);
-
- while(_workerCount != _workerMaxCount) {
+ {
+ Synchronized s(_workerMonitor);
+ while (_workerCount != _workerMaxCount) {
_workerMonitor.wait();
}
}
@@ -342,133 +277,110 @@
void ThreadManager::Impl::start() {
- if(_state == ThreadManager::STOPPED) {
+ if (_state == ThreadManager::STOPPED) {
return;
}
- {Synchronized s(_monitor);
-
- if(_state == ThreadManager::UNINITIALIZED) {
-
- if(_threadFactory == NULL) {throw InvalidArgumentException();}
-
+ {
+ Synchronized s(_monitor);
+ if (_state == ThreadManager::UNINITIALIZED) {
+ if (_threadFactory == NULL) {
+ throw InvalidArgumentException();
+ }
_state = ThreadManager::STARTED;
-
_monitor.notifyAll();
}
- while(_state == STARTING) {
-
+ while (_state == STARTING) {
_monitor.wait();
}
}
}
void ThreadManager::Impl::stop() {
-
bool doStop = false;
-
- if(_state == ThreadManager::STOPPED) {
+ if (_state == ThreadManager::STOPPED) {
return;
}
- {Synchronized s(_monitor);
-
- if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
-
+ {
+ Synchronized s(_monitor);
+ if (!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
doStop = true;
-
_state = ThreadManager::STOPPING;
}
}
- if(doStop) {
-
+ if (doStop) {
removeWorker(_workerCount);
-
_state = ThreadManager::STOPPING;
}
// XXX
- // should be able to block here for transition to STOPPED since we're now using shared_ptrs
-
+ // should be able to block here for transition to STOPPED since we're no
+ // using shared_ptrs
}
void ThreadManager::Impl::removeWorker(size_t value) {
-
std::set<shared_ptr<Thread> > removedThreads;
-
- {Synchronized s(_monitor);
-
- if(value > _workerMaxCount) {
-
+ {
+ Synchronized s(_monitor);
+ if (value > _workerMaxCount) {
throw InvalidArgumentException();
}
_workerMaxCount-= value;
- if(_idleCount < value) {
-
- for(size_t ix = 0; ix < _idleCount; ix++) {
-
+ if (_idleCount < value) {
+ for (size_t ix = 0; ix < _idleCount; ix++) {
_monitor.notify();
}
} else {
-
_monitor.notifyAll();
}
}
- {Synchronized s(_workerMonitor);
+ {
+ Synchronized s(_workerMonitor);
- while(_workerCount != _workerMaxCount) {
+ while (_workerCount != _workerMaxCount) {
_workerMonitor.wait();
}
- for(std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
-
- _workers.erase(*ix);
-
+ for (std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+ _workers.erase(*ix);
}
-
+
_deadWorkers.clear();
}
}
void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
-
Synchronized s(_monitor);
- if(_state != ThreadManager::STARTED) {
-
+ if (_state != ThreadManager::STARTED) {
throw IllegalStateException();
}
_tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
- /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
- task in time. */
-
- if(_idleCount > 0) {
-
+ // If idle thread is available notify it, otherwise all worker threads are
+ // running and will get around to this task in time.
+ if (_idleCount > 0) {
_monitor.notify();
}
}
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
-
- Synchronized s(_monitor);
-
- if(_state != ThreadManager::STARTED) {
-
- throw IllegalStateException();
- }
+ Synchronized s(_monitor);
+ if (_state != ThreadManager::STARTED) {
+ throw IllegalStateException();
+ }
}
class SimpleThreadManager : public ThreadManager::Impl {
public:
-
SimpleThreadManager(size_t workerCount=4) :
_workerCount(workerCount),
_firstTime(true) {
@@ -476,12 +388,10 @@
void start() {
ThreadManager::Impl::start();
-
addWorker(_workerCount);
}
private:
-
const size_t _workerCount;
bool _firstTime;
Monitor _monitor;
@@ -497,4 +407,3 @@
}
}}} // facebook::thrift::concurrency
-