Thrift: standardize coding style
Summary: Standardize indentation, spacing, #defines etc.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664784 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index ce27539..4cbcd65 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -1,5 +1,5 @@
-#ifndef T_PROCESSOR_H
-#define T_PROCESSOR_H
+#ifndef _THRIFT_TPROCESSOR_H_
+#define _THRIFT_TPROCESSOR_H_ 1
#include <string>
#include <transport/TTransport.h>
@@ -30,4 +30,4 @@
}} // facebook::thrift
-#endif
+#endif // #ifndef _THRIFT_PROCESSOR_H_
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index 410d20e..38226dc 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -1,5 +1,5 @@
-#ifndef THRIFT_H
-#define THRIFT_H
+#ifndef _THRIFT_THRIFT_H_
+#define _THRIFT_THRIFT_H_ 1
#include <netinet/in.h>
#include <inttypes.h>
@@ -24,4 +24,4 @@
}} // facebook::thrift
-#endif
+#endif // #ifndef _THRIFT_THRIFT_H_
diff --git a/lib/cpp/src/concurrency/Exception.h b/lib/cpp/src/concurrency/Exception.h
index 5907f02..67eb153 100644
--- a/lib/cpp/src/concurrency/Exception.h
+++ b/lib/cpp/src/concurrency/Exception.h
@@ -1,5 +1,5 @@
-#if !defined(_concurrency_Exception_h_)
-#define _concurrency_Exception_h_ 1
+#ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
+#define _THRIFT_CONCURRENCY_EXCEPTION_H_ 1
#include <exception>
@@ -17,4 +17,4 @@
}}} // facebook::thrift::concurrency
-#endif // !defined(_concurrency_Exception_h_)
+#endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
index 7493ec3..57532a3 100644
--- a/lib/cpp/src/concurrency/Monitor.cc
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -9,125 +9,97 @@
#include <pthread.h>
-
namespace facebook { namespace thrift { namespace concurrency {
-/** Monitor implementation using the POSIX pthread library
-
- @author marc
- @version $Id:$ */
-
+/**
+ * Monitor implementation using the POSIX pthread library
+ *
+ * @author marc
+ * @version $Id:$
+ */
class Monitor::Impl {
public:
- Impl() :
+ Impl() :
mutexInitialized(false),
condInitialized(false) {
try {
-
assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
-
mutexInitialized = true;
-
assert(pthread_cond_init(&_pthread_cond, NULL) == 0);
-
condInitialized = true;
-
} catch(...) {
cleanup();
}
}
- ~Impl() {cleanup();}
+ ~Impl() { cleanup(); }
- void lock() const {pthread_mutex_lock(&_pthread_mutex);}
+ void lock() const { pthread_mutex_lock(&_pthread_mutex); }
- void unlock() const {pthread_mutex_unlock(&_pthread_mutex);}
+ void unlock() const { pthread_mutex_unlock(&_pthread_mutex); }
void wait(long long timeout) const {
// XXX Need to assert that caller owns mutex
-
assert(timeout >= 0LL);
-
- if(timeout == 0LL) {
-
+ if (timeout == 0LL) {
assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0);
-
} else {
-
struct timespec abstime;
-
long long now = Util::currentTime();
-
Util::toTimespec(abstime, now + timeout);
-
- int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
-
- if(result == ETIMEDOUT) {
-
+ int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
+ if (result == ETIMEDOUT) {
assert(Util::currentTime() >= (now + timeout));
}
}
}
void notify() {
-
// XXX Need to assert that caller owns mutex
-
assert(pthread_cond_signal(&_pthread_cond) == 0);
}
void notifyAll() {
-
// XXX Need to assert that caller owns mutex
-
assert(pthread_cond_broadcast(&_pthread_cond) == 0);
}
-private:
+ private:
void cleanup() {
-
- if(mutexInitialized) {
-
+ if (mutexInitialized) {
mutexInitialized = false;
-
assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
}
- if(condInitialized) {
-
+ if (condInitialized) {
condInitialized = false;
-
assert(pthread_cond_destroy(&_pthread_cond) == 0);
}
}
mutable pthread_mutex_t _pthread_mutex;
-
mutable bool mutexInitialized;
-
mutable pthread_cond_t _pthread_cond;
-
mutable bool condInitialized;
};
Monitor::Monitor() : _impl(new Monitor::Impl()) {}
-Monitor::~Monitor() { delete _impl;}
+Monitor::~Monitor() { delete _impl; }
-void Monitor::lock() const {_impl->lock();}
+void Monitor::lock() const { _impl->lock(); }
-void Monitor::unlock() const {_impl->unlock();}
+void Monitor::unlock() const { _impl->unlock(); }
-void Monitor::wait(long long timeout) const {_impl->wait(timeout);}
+void Monitor::wait(long long timeout) const { _impl->wait(timeout); }
-void Monitor::notify() const {_impl->notify();}
+void Monitor::notify() const { _impl->notify(); }
-void Monitor::notifyAll() const {_impl->notifyAll();}
+void Monitor::notifyAll() const { _impl->notifyAll(); }
}}} // facebook::thrift::concurrency
-
diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h
index 13dec18..62a8344 100644
--- a/lib/cpp/src/concurrency/Monitor.h
+++ b/lib/cpp/src/concurrency/Monitor.h
@@ -1,18 +1,22 @@
-#if !defined(_concurrency_Monitor_h_)
-#define _concurrency_Monitor_h_ 1
+#ifndef _THRIFT_CONCURRENCY_MONITOR_H_
+#define _THRIFT_CONCURRENCY_MONITOR_H_ 1
namespace facebook { namespace thrift { namespace concurrency {
-/** A monitor is a combination mutex and condition-event. Waiting and notifying condition events requires that the caller own the mutex. Mutex
- lock and unlock operations can be performed independently of condition events. This is more or less analogous to java.lang.Object multi-thread
- operations
-
- Note that all methods are const. Monitors implement logical constness, not bit constness. This allows const methods to call monitor
- methods without needing to cast away constness or change to non-const signatures.
-
- @author marc
- @version $Id:$ */
-
+/**
+ * A monitor is a combination mutex and condition-event. Waiting and
+ * notifying condition events requires that the caller own the mutex. Mutex
+ * lock and unlock operations can be performed independently of condition
+ * events. This is more or less analogous to java.lang.Object multi-thread
+ * operations
+ *
+ * Note that all methods are const. Monitors implement logical constness, not
+ * bit constness. This allows const methods to call monitor methods without
+ * needing to cast away constness or change to non-const signatures.
+ *
+ * @author marc
+ * @version $Id:$
+ */
class Monitor {
public:
@@ -56,4 +60,4 @@
}}} // facebook::thrift::concurrency
-#endif // !defined(_concurrency_Monitor_h_)
+#endif // #ifndef _THRIFT_CONCURRENCY_MONITOR_H_
diff --git a/lib/cpp/src/concurrency/Mutex.cc b/lib/cpp/src/concurrency/Mutex.cc
index 8282e73..416341e 100644
--- a/lib/cpp/src/concurrency/Mutex.cc
+++ b/lib/cpp/src/concurrency/Mutex.cc
@@ -3,41 +3,42 @@
#include <assert.h>
#include <pthread.h>
-/** Implementation of Mutex class using POSIX mutex
-
- @author marc
- @version $Id:$ */
-
namespace facebook { namespace thrift { namespace concurrency {
+/**
+ * Implementation of Mutex class using POSIX mutex
+ *
+ * @author marc
+ * @version $Id:$
+ */
class Mutex::impl {
-public:
+ public:
impl() : initialized(false) {
assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
initialized = true;
}
~impl() {
- if(initialized) {
+ if (initialized) {
initialized = false;
assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
}
}
- void lock() const {pthread_mutex_lock(&_pthread_mutex);}
+ void lock() const { pthread_mutex_lock(&_pthread_mutex); }
- void unlock() const {pthread_mutex_unlock(&_pthread_mutex);}
+ void unlock() const { pthread_mutex_unlock(&_pthread_mutex); }
-private:
+ private:
mutable pthread_mutex_t _pthread_mutex;
mutable bool initialized;
};
Mutex::Mutex() : _impl(new Mutex::impl()) {}
-void Mutex::lock() const {_impl->lock();}
+void Mutex::lock() const { _impl->lock(); }
-void Mutex::unlock() const {_impl->unlock();}
+void Mutex::unlock() const { _impl->unlock(); }
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
index 20d4c0b..9eceb49 100644
--- a/lib/cpp/src/concurrency/Mutex.h
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -1,39 +1,31 @@
-#if !defined(_concurrency_mutex_h_)
-#define _concurrency_mutex_h_ 1
+#ifndef _THRIFT_CONCURRENCY_MUTEX_H_
+#define _THRIFT_CONCURRENCY_MUTEX_H_ 1
namespace facebook { namespace thrift { namespace concurrency {
-/** A simple mutex class
-
- @author marc
- @version $Id:$ */
-
+/**
+ * A simple mutex class
+ *
+ * @author marc
+ * @version $Id:$
+ */
class Mutex {
-
public:
-
Mutex();
-
virtual ~Mutex() {}
-
virtual void lock() const;
-
virtual void unlock() const;
private:
-
class impl;
-
impl* _impl;
};
class MutexMonitor {
- public:
-
+ public:
MutexMonitor(const Mutex& value) : _mutex(value) {
_mutex.lock();
}
-
~MutexMonitor() {
_mutex.unlock();
}
@@ -45,4 +37,4 @@
}}} // facebook::thrift::concurrency
-#endif // !defined(_concurrency_mutex_h_)
+#endif // #ifndef _THRIFT_CONCURRENCY_MUTEX_H_
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index f07db86..5df87ec 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -11,40 +11,36 @@
using namespace boost;
-/** The POSIX thread class.
-
- @author marc
- @version $Id:$ */
-
+/**
+ * The POSIX thread class.
+ *
+ * @author marc
+ * @version $Id:$
+ */
class PthreadThread: public Thread {
+ public:
-public:
- enum STATE {uninitialized,
- starting,
- started,
- stopping,
- stopped
+ enum STATE {
+ uninitialized,
+ starting,
+ started,
+ stopping,
+ stopped
};
static const int MB = 1024 * 1024;
static void* threadMain(void* arg);
-private:
-
+ private:
pthread_t _pthread;
-
STATE _state;
-
int _policy;
-
int _priority;
-
int _stackSize;
-
weak_ptr<PthreadThread> _self;
-public:
+ public:
PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
_pthread(0),
@@ -56,58 +52,47 @@
this->Thread::runnable(runnable);
}
- ~PthreadThread() {
- }
+ ~PthreadThread() {}
void start() {
-
- if(_state != uninitialized) {
+ if (_state != uninitialized) {
return;
}
_state = starting;
pthread_attr_t thread_attr;
-
assert(pthread_attr_init(&thread_attr) == 0);
-
assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
// Set thread stack size
-
assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
// Set thread policy
-
assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
struct sched_param sched_param;
sched_param.sched_priority = _priority;
// Set thread priority
-
assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
+ // Create reference
shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
-
*selfRef = _self.lock();
-
assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0);
}
void join() {
-
- if(_state != stopped) {
-
+ if (_state != stopped) {
void* ignore;
-
pthread_join(_pthread, &ignore);
}
}
- shared_ptr<Runnable> runnable() const {return Thread::runnable();}
+ shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
- void runnable(shared_ptr<Runnable> value) {Thread::runnable(value);}
+ void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
void weakRef(shared_ptr<PthreadThread> self) {
assert(self.get() == this);
@@ -117,46 +102,41 @@
void* PthreadThread::threadMain(void* arg) {
// XXX need a lock here when testing thread state
-
shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
-
delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
- if(thread == NULL) {
+ if (thread == NULL) {
return (void*)0;
}
- if(thread->_state != starting) {
+ if (thread->_state != starting) {
return (void*)0;
}
thread->_state = starting;
-
thread->runnable()->run();
-
- if(thread->_state != stopping && thread->_state != stopped) {
+ if (thread->_state != stopping && thread->_state != stopped) {
thread->_state = stopping;
}
return (void*)0;
}
-/** POSIX Thread factory implementation */
-
+/**
+ * POSIX Thread factory implementation
+ */
class PosixThreadFactory::Impl {
-private:
-
+ private:
POLICY _policy;
-
PRIORITY _priority;
-
int _stackSize;
-
bool _detached;
- /** Converts generic posix thread schedule policy enums into pthread API values. */
-
+ /**
+ * Converts generic posix thread schedule policy enums into pthread
+ * API values.
+ */
static int toPthreadPolicy(POLICY policy) {
switch(policy) {
case OTHER: return SCHED_OTHER; break;
@@ -166,83 +146,76 @@
}
}
- /** Converts relative thread priorities to absolute value based on posix thread scheduler policy
-
- The idea is simply to divide up the priority range for the given policy into the correpsonding relative
- priority level (lowest..highest) and then pro-rate accordingly. */
-
+ /**
+ * Converts relative thread priorities to absolute value based on posix
+ * thread scheduler policy
+ *
+ * The idea is simply to divide up the priority range for the given policy
+ * into the correpsonding relative priority level (lowest..highest) and
+ * then pro-rate accordingly.
+ */
static int toPthreadPriority(POLICY policy, PRIORITY priority) {
-
int pthread_policy = toPthreadPolicy(policy);
-
int min_priority = sched_get_priority_min(pthread_policy);
-
int max_priority = sched_get_priority_max(pthread_policy);
-
int quanta = (HIGHEST - LOWEST) + 1;
-
float stepsperquanta = (max_priority - min_priority) / quanta;
if(priority <= HIGHEST) {
-
return (int)(min_priority + stepsperquanta * priority);
} else {
-
// should never get here for priority increments.
-
assert(false);
-
return (int)(min_priority + stepsperquanta * NORMAL);
}
}
-public:
+ public:
Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
_policy(policy),
_priority(priority),
_stackSize(stackSize),
- _detached(detached) {
- }
+ _detached(detached) {}
- /** Creates a new POSIX thread to run the runnable object
-
- @param runnable A runnable object */
-
+ /**
+ * Creates a new POSIX thread to run the runnable object
+ *
+ * @param runnable A runnable object
+ */
shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
-
shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable));
result->weakRef(result);
runnable->thread(result);
return result;
}
- int stackSize() const { return _stackSize;}
+ int stackSize() const { return _stackSize; }
- void stackSize(int value) { _stackSize = value;}
+ void stackSize(int value) { _stackSize = value; }
- PRIORITY priority() const { return _priority;}
+ PRIORITY priority() const { return _priority; }
- /** Sets priority.
-
- XXX
- Need to handle incremental priorities properly. */
-
- void priority(PRIORITY value) { _priority = value;}
-
+ /**
+ * Sets priority.
+ *
+ * XXX
+ * Need to handle incremental priorities properly.
+ */
+ void priority(PRIORITY value) { _priority = value; }
};
PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
_impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
-shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const {return _impl->newThread(runnable);}
+shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return _impl->newThread(runnable); }
-int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
+int PosixThreadFactory::stackSize() const { return _impl->stackSize(); }
-void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);}
+void PosixThreadFactory::stackSize(int value) { _impl->stackSize(value); }
-PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();}
+PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return _impl->priority(); }
-void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);}
+void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { _impl->priority(value); }
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
index 0095cf8..a56999c 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -1,5 +1,5 @@
-#if !defined(_concurrency_PosixThreadFactory_h_)
-#define _concurrency_PosixThreadFactory_h_ 1
+#ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1
#include "Thread.h"
@@ -9,28 +9,33 @@
using namespace boost;
-/** A thread factory to create posix threads
-
- @author marc
- @version $Id:$ */
-
+/**
+ * A thread factory to create posix threads
+ *
+ * @author marc
+ * @version $Id:$
+ */
class PosixThreadFactory : public ThreadFactory {
public:
- /** POSIX Thread scheduler policies */
-
+ /**
+ * POSIX Thread scheduler policies
+ */
enum POLICY {
OTHER,
FIFO,
ROUND_ROBIN
};
- /** POSIX Thread scheduler relative priorities,
-
- Absolute priority is determined by scheduler policy and OS. This enumeration specifies relative priorities such that one can
- specify a priority withing a giving scheduler policy without knowing the absolute value of the priority. */
-
+ /**
+ * POSIX Thread scheduler relative priorities,
+ *
+ * Absolute priority is determined by scheduler policy and OS. This
+ * enumeration specifies relative priorities such that one can specify a
+ * priority withing a giving scheduler policy without knowing the absolute
+ * value of the priority.
+ */
enum PRIORITY {
LOWEST = 0,
LOWER = 1,
@@ -46,36 +51,37 @@
PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false);
// From ThreadFactory;
-
shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const;
- /** Sets stack size for created threads
-
- @param value size in megabytes */
-
+ /**
+ * Sets stack size for created threads
+ *
+ * @param value size in megabytes
+ */
virtual void stackSize(int value);
- /** Gets stack size for created threads
-
- @return int size in megabytes */
-
+ /**
+ * Gets stack size for created threads
+ *
+ * @return int size in megabytes
+ */
virtual int stackSize() const;
- /** Sets priority relative to current policy */
-
+ /**
+ * Sets priority relative to current policy
+ */
virtual void priority(PRIORITY priority);
- /** Gets priority relative to current policy */
-
+ /**
+ * Gets priority relative to current policy
+ */
virtual PRIORITY priority() const;
private:
-
class Impl;
-
shared_ptr<Impl> _impl;
};
}}} // facebook::thrift::concurrency
-#endif // !defined(_concurrency_PosixThreadFactory_h_)
+#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index ea6b999..24d5908 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -1,5 +1,5 @@
-#if !defined(_concurrency_Thread_h_)
-#define _concurrency_Thread_h_ 1
+#ifndef _THRIFT_CONCURRENCY_THREAD_H_
+#define _THRIFT_CONCURRENCY_THREAD_H_ 1
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
@@ -10,77 +10,84 @@
class Thread;
-/** Minimal runnable class. More or less analogous to java.lang.Runnable.
-
- @author marc
- @version $Id:$ */
-
+/**
+ * Minimal runnable class. More or less analogous to java.lang.Runnable.
+ *
+ * @author marc
+ * @version $Id:$
+ */
class Runnable {
public:
-
virtual ~Runnable() {};
-
virtual void run() = 0;
- /** Gets the thread object that is hosting this runnable object - can return an empty shared pointer if no references remain on thet thread object */
+ /**
+ * Gets the thread object that is hosting this runnable object - can return
+ * an empty shared pointer if no references remain on thet thread object
+ */
+ virtual shared_ptr<Thread> thread() { return _thread.lock(); }
- virtual shared_ptr<Thread> thread() {return _thread.lock();}
-
- /** Sets the thread that is executing this object. This is only meant for use by concrete implementations of Thread. */
-
- virtual void thread(shared_ptr<Thread> value) {_thread = value;}
+ /**
+ * Sets the thread that is executing this object. This is only meant for
+ * use by concrete implementations of Thread.
+ */
+ virtual void thread(shared_ptr<Thread> value) { _thread = value; }
private:
-
weak_ptr<Thread> _thread;
};
-/** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread
- (minus all the thread group, priority, mode and other baggage, since that is difficult to abstract across platforms and is left for platform-specific
- ThreadFactory implemtations to deal with - @see facebook::thrift::concurrency::ThreadFactory) */
-
-
+/**
+ * Minimal thread class. Returned by thread factory bound to a Runnable object
+ * and ready to start execution. More or less analogous to java.lang.Thread
+ * (minus all the thread group, priority, mode and other baggage, since that
+ * is difficult to abstract across platforms and is left for platform-specific
+ * ThreadFactory implemtations to deal with
+ *
+ * @see facebook::thrift::concurrency::ThreadFactory)
+ */
class Thread {
public:
-
virtual ~Thread() {};
- /** Starts the thread. Does platform specific thread creation and configuration then invokes the run method of the Runnable object bound to this
- thread. */
-
+ /**
+ * Starts the thread. Does platform specific thread creation and
+ * configuration then invokes the run method of the Runnable object bound
+ * to this thread.
+ */
virtual void start() = 0;
- /** Join this thread
-
- Current thread blocks until this target thread completes. */
-
+ /**
+ * Join this thread. Current thread blocks until this target thread
+ * completes.
+ */
virtual void join() = 0;
- /** Gets the runnable object this thread is hosting */
-
- virtual shared_ptr<Runnable> runnable() const {return _runnable;}
+ /**
+ * Gets the runnable object this thread is hosting
+ */
+ virtual shared_ptr<Runnable> runnable() const { return _runnable; }
protected:
-
- virtual void runnable(shared_ptr<Runnable> value) {_runnable = value;}
+ virtual void runnable(shared_ptr<Runnable> value) { _runnable = value; }
private:
shared_ptr<Runnable> _runnable;
};
-/** Factory to create platform-specific thread object and bind them to Runnable object for execution */
-
+/**
+ * Factory to create platform-specific thread object and bind them to Runnable
+ * object for execution
+ */
class ThreadFactory {
public:
-
virtual ~ThreadFactory() {}
-
virtual shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const = 0;
};
}}} // facebook::thrift::concurrency
-#endif // !defined(_concurrency_Thread_h_)
+#endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index a5b8f05..7d6fef7 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -17,48 +17,41 @@
using namespace boost;
-/** ThreadManager class
-
- This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
- it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times.
-
- @author marc
- @version $Id:$ */
-
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads. It never actually creates or destroys worker threads, rather
+ * it maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times.
+ *
+ * @author marc
+ * @version $Id:$
+ */
class ThreadManager::Impl : public ThreadManager {
public:
-
Impl() :
_workerCount(0),
_workerMaxCount(0),
_idleCount(0),
- _state(ThreadManager::UNINITIALIZED)
- {}
+ _state(ThreadManager::UNINITIALIZED) {}
- ~Impl() {
- stop();
- }
+ ~Impl() { stop(); }
void start();
void stop();
- const ThreadManager::STATE state() const {
- return _state;
- };
+ const ThreadManager::STATE state() const { return _state; }
shared_ptr<ThreadFactory> threadFactory() const {
-
Synchronized s(_monitor);
-
return _threadFactory;
}
- void threadFactory(shared_ptr<ThreadFactory> value) {
-
- Synchronized s(_monitor);
-
+ void threadFactory(shared_ptr<ThreadFactory> value) {
+ Synchronized s(_monitor);
_threadFactory = value;
}
@@ -66,26 +59,20 @@
void removeWorker(size_t value);
- size_t idleWorkerCount() const {return _idleCount;}
+ size_t idleWorkerCount() const { return _idleCount; }
size_t workerCount() const {
-
Synchronized s(_monitor);
-
return _workerCount;
}
size_t pendingTaskCount() const {
-
Synchronized s(_monitor);
-
return _tasks.size();
}
size_t totalTaskCount() const {
-
- Synchronized s(_monitor);
-
+ Synchronized s(_monitor);
return _tasks.size() + _workerCount - _idleCount;
}
@@ -94,35 +81,26 @@
void remove(shared_ptr<Runnable> task);
private:
-
size_t _workerCount;
-
size_t _workerMaxCount;
-
size_t _idleCount;
-
ThreadManager::STATE _state;
-
shared_ptr<ThreadFactory> _threadFactory;
+
friend class ThreadManager::Task;
-
std::queue<shared_ptr<Task> > _tasks;
-
Monitor _monitor;
-
Monitor _workerMonitor;
friend class ThreadManager::Worker;
-
std::set<shared_ptr<Thread> > _workers;
-
std::set<shared_ptr<Thread> > _deadWorkers;
};
class ThreadManager::Task : public Runnable {
-public:
+ public:
enum STATE {
WAITING,
EXECUTING,
@@ -132,29 +110,24 @@
Task(shared_ptr<Runnable> runnable) :
_runnable(runnable),
- _state(WAITING)
- {}
+ _state(WAITING) {}
- ~Task() {};
+ ~Task() {}
void run() {
- if(_state == EXECUTING) {
+ if (_state == EXECUTING) {
_runnable->run();
_state = COMPLETE;
}
}
private:
-
shared_ptr<Runnable> _runnable;
-
friend class ThreadManager::Worker;
-
STATE _state;
};
class ThreadManager::Worker: public Runnable {
-
enum STATE {
UNINITIALIZED,
STARTING,
@@ -164,177 +137,139 @@
};
public:
-
Worker(ThreadManager::Impl* manager) :
_manager(manager),
_state(UNINITIALIZED),
- _idle(false)
- {}
+ _idle(false) {}
~Worker() {}
- bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;}
+ bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount; }
- /** Worker entry point
-
- As long as worker thread is running, pull tasks off the task queue and execute. */
-
+ /**
+ * Worker entry point
+ *
+ * As long as worker thread is running, pull tasks off the task queue and
+ * execute.
+ */
void run() {
-
- bool active = false;
-
+ bool active = false;
bool notifyManager = false;
- /** Increment worker semaphore and notify manager if worker count reached desired max
-
- Note
- We have to release the monitor and acquire the workerMonitor since that is what the manager
- blocks on for worker add/remove */
-
- {Synchronized s(_manager->_monitor);
-
+ /**
+ * Increment worker semaphore and notify manager if worker count reached
+ * desired max
+ *
+ * Note: We have to release the monitor and acquire the workerMonitor
+ * since that is what the manager blocks on for worker add/remove
+ */
+ {
+ Synchronized s(_manager->_monitor);
active = _manager->_workerCount < _manager->_workerMaxCount;
-
- if(active) {
-
+ if (active) {
_manager->_workerCount++;
-
notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
}
}
- if(notifyManager) {
-
+ if (notifyManager) {
Synchronized s(_manager->_workerMonitor);
-
_manager->_workerMonitor.notify();
-
notifyManager = false;
}
- while(active) {
-
+ while (active) {
shared_ptr<ThreadManager::Task> task;
- /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
-
- Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented
- such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying
- the next blocked thread but eventually the manager will see it. */
-
- {Synchronized s(_manager->_monitor);
-
+ /**
+ * While holding manager monitor block for non-empty task queue (Also
+ * check that the thread hasn't been requested to stop). Once the queue
+ * is non-empty, dequeue a task, release monitor, and execute. If the
+ * worker max count has been decremented such that we exceed it, mark
+ * ourself inactive, decrement the worker count and notify the manager
+ * (technically we're notifying the next blocked thread but eventually
+ * the manager will see it.
+ */
+ {
+ Synchronized s(_manager->_monitor);
active = isActive();
-
- while(active && _manager->_tasks.empty()) {
-
- _manager->_idleCount++;
-
+ while (active && _manager->_tasks.empty()) {
+ _manager->_idleCount++;
_idle = true;
-
- _manager->_monitor.wait();
-
- active = isActive();
-
+ _manager->_monitor.wait();
+ active = isActive();
_idle = false;
-
- _manager->_idleCount--;
+ _manager->_idleCount--;
}
- if(active) {
-
- if(!_manager->_tasks.empty()) {
-
- task = _manager->_tasks.front();
-
- _manager->_tasks.pop();
-
- if(task->_state == ThreadManager::Task::WAITING) {
-
- task->_state = ThreadManager::Task::EXECUTING;
+ if (active) {
+ if (!_manager->_tasks.empty()) {
+ task = _manager->_tasks.front();
+ _manager->_tasks.pop();
+ if (task->_state == ThreadManager::Task::WAITING) {
+ task->_state = ThreadManager::Task::EXECUTING;
}
}
} else {
-
- _idle = true;
-
+ _idle = true;
_manager->_workerCount--;
-
- notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
+ notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
}
}
-
- if(task != NULL) {
-
- if(task->_state == ThreadManager::Task::EXECUTING) {
+
+ if (task != NULL) {
+ if (task->_state == ThreadManager::Task::EXECUTING) {
try {
-
- task->run();
-
- } catch(...) {
-
- // XXX need to log this
+ task->run();
+ } catch(...) {
+ // XXX need to log this
}
}
}
}
-
- {Synchronized s(_manager->_workerMonitor);
-
+
+ {
+ Synchronized s(_manager->_workerMonitor);
_manager->_deadWorkers.insert(this->thread());
-
- if(notifyManager) {
-
- _manager->_workerMonitor.notify();
+ if (notifyManager) {
+ _manager->_workerMonitor.notify();
}
}
-
+
return;
}
-
- private:
-
- ThreadManager::Impl* _manager;
-
- friend class ThreadManager::Impl;
-
- STATE _state;
-
- bool _idle;
+
+ private:
+ ThreadManager::Impl* _manager;
+ friend class ThreadManager::Impl;
+ STATE _state;
+ bool _idle;
};
-void ThreadManager::Impl::addWorker(size_t value) {
-
+
+ void ThreadManager::Impl::addWorker(size_t value) {
std::set<shared_ptr<Thread> > newThreads;
-
- for(size_t ix = 0; ix < value; ix++) {
-
- class ThreadManager::Worker;
-
+ for (size_t ix = 0; ix < value; ix++) {
+ class ThreadManager::Worker;
shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
-
newThreads.insert(_threadFactory->newThread(worker));
}
- {Synchronized s(_monitor);
-
+ {
+ Synchronized s(_monitor);
_workerMaxCount+= value;
-
_workers.insert(newThreads.begin(), newThreads.end());
}
- for(std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
+ for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
-
worker->_state = ThreadManager::Worker::STARTING;
-
(*ix)->start();
}
- {Synchronized s(_workerMonitor);
-
- while(_workerCount != _workerMaxCount) {
+ {
+ Synchronized s(_workerMonitor);
+ while (_workerCount != _workerMaxCount) {
_workerMonitor.wait();
}
}
@@ -342,133 +277,110 @@
void ThreadManager::Impl::start() {
- if(_state == ThreadManager::STOPPED) {
+ if (_state == ThreadManager::STOPPED) {
return;
}
- {Synchronized s(_monitor);
-
- if(_state == ThreadManager::UNINITIALIZED) {
-
- if(_threadFactory == NULL) {throw InvalidArgumentException();}
-
+ {
+ Synchronized s(_monitor);
+ if (_state == ThreadManager::UNINITIALIZED) {
+ if (_threadFactory == NULL) {
+ throw InvalidArgumentException();
+ }
_state = ThreadManager::STARTED;
-
_monitor.notifyAll();
}
- while(_state == STARTING) {
-
+ while (_state == STARTING) {
_monitor.wait();
}
}
}
void ThreadManager::Impl::stop() {
-
bool doStop = false;
-
- if(_state == ThreadManager::STOPPED) {
+ if (_state == ThreadManager::STOPPED) {
return;
}
- {Synchronized s(_monitor);
-
- if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
-
+ {
+ Synchronized s(_monitor);
+ if (!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
doStop = true;
-
_state = ThreadManager::STOPPING;
}
}
- if(doStop) {
-
+ if (doStop) {
removeWorker(_workerCount);
-
_state = ThreadManager::STOPPING;
}
// XXX
- // should be able to block here for transition to STOPPED since we're now using shared_ptrs
-
+ // should be able to block here for transition to STOPPED since we're no
+ // using shared_ptrs
}
void ThreadManager::Impl::removeWorker(size_t value) {
-
std::set<shared_ptr<Thread> > removedThreads;
-
- {Synchronized s(_monitor);
-
- if(value > _workerMaxCount) {
-
+ {
+ Synchronized s(_monitor);
+ if (value > _workerMaxCount) {
throw InvalidArgumentException();
}
_workerMaxCount-= value;
- if(_idleCount < value) {
-
- for(size_t ix = 0; ix < _idleCount; ix++) {
-
+ if (_idleCount < value) {
+ for (size_t ix = 0; ix < _idleCount; ix++) {
_monitor.notify();
}
} else {
-
_monitor.notifyAll();
}
}
- {Synchronized s(_workerMonitor);
+ {
+ Synchronized s(_workerMonitor);
- while(_workerCount != _workerMaxCount) {
+ while (_workerCount != _workerMaxCount) {
_workerMonitor.wait();
}
- for(std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
-
- _workers.erase(*ix);
-
+ for (std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+ _workers.erase(*ix);
}
-
+
_deadWorkers.clear();
}
}
void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
-
Synchronized s(_monitor);
- if(_state != ThreadManager::STARTED) {
-
+ if (_state != ThreadManager::STARTED) {
throw IllegalStateException();
}
_tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
- /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
- task in time. */
-
- if(_idleCount > 0) {
-
+ // If idle thread is available notify it, otherwise all worker threads are
+ // running and will get around to this task in time.
+ if (_idleCount > 0) {
_monitor.notify();
}
}
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
-
- Synchronized s(_monitor);
-
- if(_state != ThreadManager::STARTED) {
-
- throw IllegalStateException();
- }
+ Synchronized s(_monitor);
+ if (_state != ThreadManager::STARTED) {
+ throw IllegalStateException();
+ }
}
class SimpleThreadManager : public ThreadManager::Impl {
public:
-
SimpleThreadManager(size_t workerCount=4) :
_workerCount(workerCount),
_firstTime(true) {
@@ -476,12 +388,10 @@
void start() {
ThreadManager::Impl::start();
-
addWorker(_workerCount);
}
private:
-
const size_t _workerCount;
bool _firstTime;
Monitor _monitor;
@@ -497,4 +407,3 @@
}
}}} // facebook::thrift::concurrency
-
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index f365643..a90c5d2 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -1,49 +1,59 @@
-#if !defined(_concurrency_ThreadManager_h_)
-#define _concurrency_ThreadManager_h_ 1
+#ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
+#define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
#include <boost/shared_ptr.hpp>
-
#include <sys/types.h>
-
#include "Thread.h"
namespace facebook { namespace thrift { namespace concurrency {
using namespace boost;
-/** Thread Pool Manager and related classes
-
- @author marc
- @version $Id:$ */
-
+/**
+ * Thread Pool Manager and related classes
+ *
+ * @author marc
+ * @version $Id:$
+ */
class ThreadManager;
-/** ThreadManager class
-
- This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
- it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
- PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool
- size needs to be adjusted and call this object addWorker and removeWorker methods to make changes.
-
- This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
- policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads. */
-
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads. It never actually creates or destroys worker threads, rather
+ * It maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times and informs the PoolPolicy
+ * object bound to instances of this manager of interesting transitions. It is
+ * then up the PoolPolicy object to decide if the thread pool size needs to be
+ * adjusted and call this object addWorker and removeWorker methods to make
+ * changes.
+ *
+ * This design allows different policy implementations to used this code to
+ * handle basic worker thread management and worker task execution and focus on
+ * policy issues. The simplest policy, StaticPolicy, does nothing other than
+ * create a fixed number of threads.
+ */
class ThreadManager {
public:
-
ThreadManager() {}
virtual ~ThreadManager() {}
- /** Starts the thread manager. Verifies all attributes have been properly initialized, then allocates necessary resources to begin operation */
-
+ /**
+ * Starts the thread manager. Verifies all attributes have been properly
+ * initialized, then allocates necessary resources to begin operation
+ */
virtual void start() = 0;
- /** Stops the thread manager. Aborts all remaining unprocessed task, shuts down all created worker threads, and realeases all allocated resources.
- This method blocks for all worker threads to complete, thus it can potentially block forever if a worker thread is running a task that
- won't terminate. */
-
+ /**
+ * Stops the thread manager. Aborts all remaining unprocessed task, shuts
+ * down all created worker threads, and realeases all allocated resources.
+ * This method blocks for all worker threads to complete, thus it can
+ * potentially block forever if a worker thread is running a task that
+ * won't terminate.
+ */
virtual void stop() = 0;
enum STATE {
@@ -64,37 +74,43 @@
virtual void removeWorker(size_t value=1) = 0;
- /** Gets the current number of idle worker threads */
-
+ /**
+ * Gets the current number of idle worker threads
+ */
virtual size_t idleWorkerCount() const = 0;
- /** Gets the current number of total worker threads */
-
+ /**
+ * Gets the current number of total worker threads
+ */
virtual size_t workerCount() const = 0;
- /** Gets the current number of pending tasks */
-
+ /**
+ * Gets the current number of pending tasks
+ */
virtual size_t pendingTaskCount() const = 0;
- /** Gets the current number of pending and executing tasks */
-
+ /**
+ * Gets the current number of pending and executing tasks
+ */
virtual size_t totalTaskCount() const = 0;
- /** Adds a task to be execued at some time in the future by a worker thread.
-
- @param value The task to run */
-
-
+ /**
+ * Adds a task to be execued at some time in the future by a worker thread.
+ *
+ * @param value The task to run
+ */
virtual void add(shared_ptr<Runnable>value) = 0;
- /** Removes a pending task */
-
+ /**
+ * Removes a pending task
+ */
virtual void remove(shared_ptr<Runnable> task) = 0;
static shared_ptr<ThreadManager> newThreadManager();
- /** Creates a simple thread manager the uses count number of worker threads */
-
+ /**
+ * Creates a simple thread manager the uses count number of worker threads
+ */
static shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4);
class Task;
@@ -106,4 +122,4 @@
}}} // facebook::thrift::concurrency
-#endif // !defined(_concurrency_ThreadManager_h_)
+#endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index a223a77..f48df4e 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -8,17 +8,18 @@
namespace facebook { namespace thrift { namespace concurrency {
-/** TimerManager class
-
- @author marc
- @version $Id:$ */
-
typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
typedef std::pair<task_iterator, task_iterator> task_range;
+/**
+ * TimerManager class
+ *
+ * @author marc
+ * @version $Id:$
+ */
class TimerManager::Task : public Runnable {
-public:
+ public:
enum STATE {
WAITING,
EXECUTING,
@@ -28,130 +29,101 @@
Task(shared_ptr<Runnable> runnable) :
_runnable(runnable),
- _state(WAITING)
- {}
+ _state(WAITING) {}
~Task() {
- std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; //debug
-};
+ //debug
+ std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl;
+ }
void run() {
- if(_state == EXECUTING) {
+ if (_state == EXECUTING) {
_runnable->run();
_state = COMPLETE;
}
}
private:
-
shared_ptr<Runnable> _runnable;
-
class TimerManager::Dispatcher;
-
friend class TimerManager::Dispatcher;
-
STATE _state;
};
class TimerManager::Dispatcher: public Runnable {
-public:
+ public:
Dispatcher(TimerManager* manager) :
- _manager(manager) {
-}
+ _manager(manager) {}
~Dispatcher() {
- std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; //debug
+ // debug
+ std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl;
}
- /** Dispatcher entry point
-
- As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */
-
+ /**
+ * Dispatcher entry point
+ *
+ * As long as dispatcher thread is running, pull tasks off the task _taskMap
+ * and execute.
+ */
void run() {
-
- {Synchronized s(_manager->_monitor);
-
- if(_manager->_state == TimerManager::STARTING) {
-
+ {
+ Synchronized s(_manager->_monitor);
+ if (_manager->_state == TimerManager::STARTING) {
_manager->_state = TimerManager::STARTED;
-
_manager->_monitor.notifyAll();
}
}
do {
-
std::set<shared_ptr<TimerManager::Task> > expiredTasks;
-
- {Synchronized s(_manager->_monitor);
-
+ {
+ Synchronized s(_manager->_monitor);
task_iterator expiredTaskEnd;
-
long long now = Util::currentTime();
-
- while(_manager->_state == TimerManager::STARTED &&
- (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
-
+ while (_manager->_state == TimerManager::STARTED &&
+ (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
long long timeout = 0LL;
-
- if(!_manager->_taskMap.empty()) {
-
- timeout = _manager->_taskMap.begin()->first - now;
+ if (!_manager->_taskMap.empty()) {
+ timeout = _manager->_taskMap.begin()->first - now;
}
-
- assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
-
- _manager->_monitor.wait(timeout);
-
+ assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
+ _manager->_monitor.wait(timeout);
now = Util::currentTime();
}
- if(_manager->_state == TimerManager::STARTED) {
-
- for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
-
+ if (_manager->_state == TimerManager::STARTED) {
+ for (task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
shared_ptr<TimerManager::Task> task = ix->second;
-
- expiredTasks.insert(task);
-
- if(task->_state == TimerManager::Task::WAITING) {
-
+ expiredTasks.insert(task);
+ if (task->_state == TimerManager::Task::WAITING) {
task->_state = TimerManager::Task::EXECUTING;
}
-
- _manager->_taskCount--;
+ _manager->_taskCount--;
}
-
- _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
+ _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
}
}
- for(std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
-
- (*ix)->run();
+ for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
+ (*ix)->run();
}
- } while(_manager->_state == TimerManager::STARTED);
+ } while (_manager->_state == TimerManager::STARTED);
- {Synchronized s(_manager->_monitor);
-
- if(_manager->_state == TimerManager::STOPPING) {
-
+ {
+ Synchronized s(_manager->_monitor);
+ if (_manager->_state == TimerManager::STOPPING) {
_manager->_state = TimerManager::STOPPED;
-
_manager->_monitor.notify();
-
}
}
-
return;
}
private:
-
TimerManager* _manager;
-
friend class TimerManager;
};
@@ -164,141 +136,106 @@
TimerManager::~TimerManager() {
- /* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since
- stop already takes care of reentrancy. */
-
+ // If we haven't been explicitly stopped, do so now. We don't need to grab
+ // the monitor here, since stop already takes care of reentrancy.
std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
- if(_state != STOPPED) {
-
+ if (_state != STOPPED) {
try {
-
stop();
-
} catch(...) {
std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
throw;
-
// uhoh
-
}
}
}
void TimerManager::start() {
-
bool doStart = false;
-
- {Synchronized s(_monitor);
-
- if(_threadFactory == NULL) {throw InvalidArgumentException();}
-
- if(_state == TimerManager::UNINITIALIZED) {
-
+ {
+ Synchronized s(_monitor);
+ if (_threadFactory == NULL) {
+ throw InvalidArgumentException();
+ }
+ if (_state == TimerManager::UNINITIALIZED) {
_state = TimerManager::STARTING;
-
doStart = true;
}
}
- if(doStart) {
-
+ if (doStart) {
_dispatcherThread = _threadFactory->newThread(_dispatcher);
-
_dispatcherThread->start();
}
- {Synchronized s(_monitor);
-
- while(_state == TimerManager::STARTING) {
-
+ {
+ Synchronized s(_monitor);
+ while (_state == TimerManager::STARTING) {
_monitor.wait();
}
-
assert(_state != TimerManager::STARTING);
}
}
void TimerManager::stop() {
-
bool doStop = false;
-
- {Synchronized s(_monitor);
-
- if(_state == TimerManager::UNINITIALIZED) {
-
+ {
+ Synchronized s(_monitor);
+ if (_state == TimerManager::UNINITIALIZED) {
_state = TimerManager::STOPPED;
-
- } else if(_state != STOPPING && _state != STOPPED) {
-
+ } else if (_state != STOPPING && _state != STOPPED) {
doStop = true;
-
_state = STOPPING;
-
_monitor.notifyAll();
}
-
- while(_state != STOPPED) {
-
+ while (_state != STOPPED) {
_monitor.wait();
}
}
- if(doStop) {
-
+ if (doStop) {
// Clean up any outstanding tasks
-
- for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
-
+ for (task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
_taskMap.erase(ix);
}
// Remove dispatcher's reference to us.
-
_dispatcher->_manager = NULL;
}
}
shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
-
Synchronized s(_monitor);
-
return _threadFactory;
}
void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
-
Synchronized s(_monitor);
-
_threadFactory = value;
}
size_t TimerManager::taskCount() const {
-
return _taskCount;
}
void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
-
long long now = Util::currentTime();
-
timeout += now;
- {Synchronized s(_monitor);
-
- if(_state != TimerManager::STARTED) {
+ {
+ Synchronized s(_monitor);
+ if (_state != TimerManager::STARTED) {
throw IllegalStateException();
}
_taskCount++;
-
_taskMap.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
- /* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
- kick the dispatcher so it can update its timeout */
-
- if(_taskCount == 1 || timeout < _taskMap.begin()->first) {
-
+ // If the task map was empty, or if we have an expiration that is earlier
+ // than any previously seen, kick the dispatcher so it can update its
+ // timeout
+ if (_taskCount == 1 || timeout < _taskMap.begin()->first) {
_monitor.notify();
}
}
@@ -306,13 +243,12 @@
void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
- long long expiration;
-
+ long long expiration;
Util::toMilliseconds(expiration, value);
long long now = Util::currentTime();
- if(expiration < now) {
+ if (expiration < now) {
throw InvalidArgumentException();
}
@@ -321,15 +257,13 @@
void TimerManager::remove(shared_ptr<Runnable> task) {
- {Synchronized s(_monitor);
-
- if(_state != TimerManager::STARTED) {
- throw IllegalStateException();
- }
+ Synchronized s(_monitor);
+ if (_state != TimerManager::STARTED) {
+ throw IllegalStateException();
}
}
-const TimerManager::STATE TimerManager::state() const { return _state;}
+const TimerManager::STATE TimerManager::state() const { return _state; }
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h
index c0e6340..50a0c13 100644
--- a/lib/cpp/src/concurrency/TimerManager.h
+++ b/lib/cpp/src/concurrency/TimerManager.h
@@ -1,27 +1,26 @@
-#if !defined(_concurrency_TimerManager_h_)
-#define _concurrency_TimerManager_h_ 1
+#ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
+#define _THRIFT_CONCURRENCY_TIMERMANAGER_H_ 1
#include "Exception.h"
#include "Monitor.h"
#include "Thread.h"
#include <boost/shared_ptr.hpp>
-
#include <map>
-
#include <time.h>
namespace facebook { namespace thrift { namespace concurrency {
using namespace boost;
-/** Timer Manager
-
- This class dispatches timer tasks when they fall due.
-
- @author marc
- @version $Id:$ */
-
+/**
+ * Timer Manager
+ *
+ * This class dispatches timer tasks when they fall due.
+ *
+ * @author marc
+ * @version $Id:$
+ */
class TimerManager {
public:
@@ -34,39 +33,46 @@
virtual void threadFactory(shared_ptr<const ThreadFactory> value);
- /** Starts the timer manager service
-
- @throws IllegalArgumentException Missing thread factory attribute */
-
+ /**
+ * Starts the timer manager service
+ *
+ * @throws IllegalArgumentException Missing thread factory attribute
+ */
virtual void start();
- /** Stops the timer manager service */
-
+ /**
+ * Stops the timer manager service
+ */
virtual void stop();
virtual size_t taskCount() const ;
- /** Adds a task to be executed at some time in the future by a worker thread.
-
- @param task The task to execute
- @param timeout Time in milliseconds to delay before executing task */
-
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * @param task The task to execute
+ * @param timeout Time in milliseconds to delay before executing task
+ */
virtual void add(shared_ptr<Runnable> task, long long timeout);
- /** Adds a task to be executed at some time in the future by a worker thread.
-
- @param task The task to execute
- @param timeout Absolute time in the future to execute task. */
-
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
+ *
+ * @param task The task to execute
+ * @param timeout Absolute time in the future to execute task.
+ */
virtual void add(shared_ptr<Runnable> task, const struct timespec& timeout);
- /** Removes a pending task
-
- @throws NoSuchTaskException Specified task doesn't exist. It was either processed already or this call was made for a task that
- was never added to this timer
-
- @throws UncancellableTaskException Specified task is already being executed or has completed execution. */
-
+ /**
+ * Removes a pending task
+ *
+ * @throws NoSuchTaskException Specified task doesn't exist. It was either
+ * processed already or this call was made for a
+ * task that was never added to this timer
+ *
+ * @throws UncancellableTaskException Specified task is already being
+ * executed or has completed execution.
+ */
virtual void remove(shared_ptr<Runnable> task);
enum STATE {
@@ -80,31 +86,19 @@
virtual const STATE state() const;
private:
-
shared_ptr<const ThreadFactory> _threadFactory;
-
class Task;
-
friend class Task;
-
std::multimap<long long, shared_ptr<Task> > _taskMap;
-
size_t _taskCount;
-
Monitor _monitor;
-
STATE _state;
-
class Dispatcher;
-
friend class Dispatcher;
-
shared_ptr<Dispatcher> _dispatcher;
-
shared_ptr<Thread> _dispatcherThread;
-
};
}}} // facebook::thrift::concurrency
-#endif // !defined(_concurrency_TimerManager_h_)
+#endif // #ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h
index df37471..724b69a 100644
--- a/lib/cpp/src/concurrency/Util.h
+++ b/lib/cpp/src/concurrency/Util.h
@@ -1,5 +1,5 @@
-#if !defined(_concurrency_Util_h_)
-#define _concurrency_Util_h_ 1
+#ifndef _THRIFT_CONCURRENCY_UTIL_H_
+#define _THRIFT_CONCURRENCY_UTIL_H_ 1
#include <config.h>
@@ -13,68 +13,71 @@
namespace facebook { namespace thrift { namespace concurrency {
-/** Utility methods
-
- This class contains basic utility methods for converting time formats, and other common platform-dependent concurrency operations.
- It should not be included in API headers for other concurrency library headers, since it will, by definition, pull in all sorts of
- horrid platform dependent crap. Rather it should be inluded directly in concurrency library implementation source.
-
- @author marc
- @version $Id:$ */
-
+/**
+ * Utility methods
+ *
+ * This class contains basic utility methods for converting time formats,
+ * and other common platform-dependent concurrency operations.
+ * It should not be included in API headers for other concurrency library
+ * headers, since it will, by definition, pull in all sorts of horrid
+ * platform dependent crap. Rather it should be inluded directly in
+ * concurrency library implementation source.
+ *
+ * @author marc
+ * @version $Id:$
+ */
class Util {
static const long long NS_PER_S = 1000000000LL;
-
static const long long MS_PER_S = 1000LL;
-
static const long long NS_PER_MS = 1000000LL;
public:
- /** Converts timespec to milliseconds
-
- @param struct timespec& result
- @param time or duration in milliseconds */
-
+ /**
+ * Converts timespec to milliseconds
+ *
+ * @param struct timespec& result
+ * @param time or duration in milliseconds
+ */
static void toTimespec(struct timespec& result, long long value) {
-
- result.tv_sec = value / MS_PER_S; // ms to s
-
+ result.tv_sec = value / MS_PER_S; // ms to s
result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
}
- /** Converts timespec to milliseconds */
-
+ /**
+ * Converts timespec to milliseconds
+ */
static const void toMilliseconds(long long& result, const struct timespec& value) {
-
- result = (value.tv_sec * MS_PER_S) + (value.tv_nsec / NS_PER_MS) + (value.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ;
+ result =
+ (value.tv_sec * MS_PER_S) +
+ (value.tv_nsec / NS_PER_MS) +
+ (value.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0);
}
- /** Get current time as milliseconds from epoch */
-
+ /**
+ * Get current time as milliseconds from epoch
+ */
static const long long currentTime() {
-
#if defined(HAVE_CLOCK_GETTIME)
-
struct timespec now;
-
assert(clock_gettime(CLOCK_REALTIME, &now) == 0);
-
- return (now.tv_sec * MS_PER_S) + (now.tv_nsec / NS_PER_MS) + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ;
-
+ return
+ (now.tv_sec * MS_PER_S) +
+ (now.tv_nsec / NS_PER_MS) +
+ (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ;
#elif defined(HAVE_GETTIMEOFDAY)
-
struct timeval now;
-
assert(gettimeofday(&now, NULL) == 0);
-
- return (((long long)now.tv_sec) * MS_PER_S) + (now.tv_usec / MS_PER_S) + (now.tv_usec % MS_PER_S >= 500 ? 1 : 0);
-
+ return
+ (((long long)now.tv_sec) * MS_PER_S) +
+ (now.tv_usec / MS_PER_S) +
+ (now.tv_usec % MS_PER_S >= 500 ? 1 : 0);
#endif // defined(HAVE_GETTIMEDAY)
}
+
};
}}} // facebook::thrift::concurrency
-#endif // !defined(_concurrency_Util_h_)
+#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_
diff --git a/lib/cpp/src/concurrency/test/Tests.cc b/lib/cpp/src/concurrency/test/Tests.cc
index 5c4dd24..77e5551 100644
--- a/lib/cpp/src/concurrency/test/Tests.cc
+++ b/lib/cpp/src/concurrency/test/Tests.cc
@@ -14,13 +14,13 @@
args[0] = "all";
- for(int ix = 1; ix < argc; ix++) {
+ for (int ix = 1; ix < argc; ix++) {
args[ix - 1] = std::string(argv[ix]);
}
bool runAll = args[0].compare("all") == 0;
- if(runAll || args[0].compare("thread-factory") == 0) {
+ if (runAll || args[0].compare("thread-factory") == 0) {
ThreadFactoryTests threadFactoryTests;
@@ -41,7 +41,7 @@
assert(threadFactoryTests.monitorTimeoutTest());
}
- if(runAll || args[0].compare("util") == 0) {
+ if (runAll || args[0].compare("util") == 0) {
std::cout << "Util tests..." << std::endl;
@@ -56,7 +56,7 @@
time01 = time00;
size_t count = 0;
- while(time01 < time00 + 10) {
+ while (time01 < time00 + 10) {
count++;
time01 = Util::currentTime();
}
@@ -65,7 +65,7 @@
}
- if(runAll || args[0].compare("timer-manager") == 0) {
+ if (runAll || args[0].compare("timer-manager") == 0) {
std::cout << "TimerManager tests..." << std::endl;
@@ -76,7 +76,7 @@
assert(timerManagerTests.test00());
}
- if(runAll || args[0].compare("thread-manager") == 0) {
+ if (runAll || args[0].compare("thread-manager") == 0) {
std::cout << "ThreadManager tests..." << std::endl;
@@ -96,7 +96,7 @@
}
}
- if(runAll || args[0].compare("thread-manager-benchmark") == 0) {
+ if (runAll || args[0].compare("thread-manager-benchmark") == 0) {
std::cout << "ThreadManager benchmark tests..." << std::endl;
@@ -110,7 +110,7 @@
long long delay = 10LL;
- for(size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) {
+ for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) {
size_t taskCount = workerCount * tasksPerWorker;
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
index 26e4c28..34b03d9 100644
--- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -11,11 +11,12 @@
using namespace facebook::thrift::concurrency;
-/** ThreadManagerTests class
-
- @author marc
- @version $Id:$ */
-
+/**
+ * ThreadManagerTests class
+ *
+ * @author marc
+ * @version $Id:$
+ */
class ThreadFactoryTests {
public:
@@ -33,8 +34,9 @@
}
};
- /** Hello world test */
-
+ /**
+ * Hello world test
+ */
bool helloWorldTest() {
PosixThreadFactory threadFactory = PosixThreadFactory();
@@ -52,28 +54,26 @@
return true;
}
- /** Reap N threads */
-
- class ReapNTask: public Runnable {
-
- public:
-
- ReapNTask(Monitor& monitor, int& activeCount) :
- _monitor(monitor),
- _count(activeCount) {
- }
+ /**
+ * Reap N threads
+ */
+ class ReapNTask: public Runnable {
+ public:
+
+ ReapNTask(Monitor& monitor, int& activeCount) :
+ _monitor(monitor),
+ _count(activeCount) {}
+
void run() {
-
- {Synchronized s(_monitor);
-
- _count--;
-
- //std::cout << "\t\t\tthread count: " << _count << std::endl;
-
- if(_count == 0) {
- _monitor.notify();
- }
+ Synchronized s(_monitor);
+
+ _count--;
+
+ //std::cout << "\t\t\tthread count: " << _count << std::endl;
+
+ if (_count == 0) {
+ _monitor.notify();
}
}
@@ -92,25 +92,24 @@
std::set<shared_ptr<Thread> > threads;
- for(int ix = 0; ix < count; ix++) {
+ for (int ix = 0; ix < count; ix++) {
threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
}
- for(std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
(*thread)->start();
}
- {Synchronized s(*monitor);
-
- while(*activeCount > 0) {
+ {
+ Synchronized s(*monitor);
+ while (*activeCount > 0) {
monitor->wait(1000);
}
}
- for(std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
-
+ for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
threads.erase(*thread);
}
@@ -119,10 +118,10 @@
return true;
}
- class SynchStartTask: public Runnable {
+ class SynchStartTask: public Runnable {
- public:
-
+ public:
+
enum STATE {
UNINITIALIZED,
STARTING,
@@ -131,38 +130,33 @@
STOPPED
};
- SynchStartTask(Monitor& monitor,
- volatile STATE& state) :
- _monitor(monitor),
- _state(state) {
- }
+ SynchStartTask(Monitor& monitor, volatile STATE& state) :
+ _monitor(monitor),
+ _state(state) {}
void run() {
-
- {Synchronized s(_monitor);
-
- if(_state == SynchStartTask::STARTING) {
+ {
+ Synchronized s(_monitor);
+ if (_state == SynchStartTask::STARTING) {
_state = SynchStartTask::STARTED;
_monitor.notify();
}
}
- {Synchronized s(_monitor);
-
- while(_state == SynchStartTask::STARTED) {
+ {
+ Synchronized s(_monitor);
+ while (_state == SynchStartTask::STARTED) {
_monitor.wait();
}
- if(_state == SynchStartTask::STOPPING) {
-
- _state = SynchStartTask::STOPPED;
-
- _monitor.notifyAll();
+ if (_state == SynchStartTask::STOPPING) {
+ _state = SynchStartTask::STOPPED;
+ _monitor.notifyAll();
}
}
}
- private:
+ private:
Monitor& _monitor;
volatile STATE& _state;
};
@@ -179,34 +173,35 @@
shared_ptr<Thread> thread = threadFactory.newThread(task);
- if(state == SynchStartTask::UNINITIALIZED) {
+ if (state == SynchStartTask::UNINITIALIZED) {
state = SynchStartTask::STARTING;
thread->start();
}
- {Synchronized s(monitor);
-
- while(state == SynchStartTask::STARTING) {
+ {
+ Synchronized s(monitor);
+ while (state == SynchStartTask::STARTING) {
monitor.wait();
}
}
assert(state != SynchStartTask::STARTING);
- {Synchronized s(monitor);
+ {
+ Synchronized s(monitor);
monitor.wait(100);
- if(state == SynchStartTask::STARTED) {
+ if (state == SynchStartTask::STARTED) {
state = SynchStartTask::STOPPING;
monitor.notify();
}
- while(state == SynchStartTask::STOPPING) {
+ while (state == SynchStartTask::STOPPING) {
monitor.wait();
}
}
@@ -228,8 +223,9 @@
long long startTime = Util::currentTime();
- for(size_t ix = 0; ix < count; ix++) {
- {Synchronized s(monitor);
+ for (size_t ix = 0; ix < count; ix++) {
+ {
+ Synchronized s(monitor);
monitor.wait(timeout);
}
}
@@ -238,7 +234,7 @@
double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
- if(error < 0.0) {
+ if (error < 0.0) {
error *= 1.0;
}
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index 7e74aac..e174343 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -14,11 +14,12 @@
using namespace facebook::thrift::concurrency;
-/** ThreadManagerTests class
-
- @author marc
- @version $Id:$ */
-
+/**
+ * ThreadManagerTests class
+ *
+ * @author marc
+ * @version $Id:$
+ */
class ThreadManagerTests {
public:
@@ -39,8 +40,8 @@
_startTime = Util::currentTime();
- {Synchronized s(_sleep);
-
+ {
+ Synchronized s(_sleep);
_sleep.wait(_timeout);
}
@@ -49,13 +50,14 @@
_done = true;
- {Synchronized s(_monitor);
+ {
+ Synchronized s(_monitor);
// std::cout << "Thread " << _count << " completed " << std::endl;
_count--;
- if(_count == 0) {
+ if (_count == 0) {
_monitor.notify();
}
@@ -71,9 +73,11 @@
Monitor _sleep;
};
- /** Dispatch count tasks, each of which blocks for timeout milliseconds then completes.
- Verify that all tasks completed and that thread manager cleans up properly on delete. */
-
+ /**
+ * Dispatch count tasks, each of which blocks for timeout milliseconds then
+ * completes. Verify that all tasks completed and that thread manager cleans
+ * up properly on delete.
+ */
bool loadTest(size_t count=100, long long timeout=100LL, size_t workerCount=4) {
Monitor monitor;
@@ -92,20 +96,21 @@
std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
- for(size_t ix = 0; ix < count; ix++) {
+ for (size_t ix = 0; ix < count; ix++) {
tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
}
long long time00 = Util::currentTime();
- for(std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
threadManager->add(*ix);
}
- {Synchronized s(monitor);
-
+ {
+ Synchronized s(monitor);
+
while(activeCount > 0) {
monitor.wait();
@@ -121,7 +126,7 @@
long long minTime = 9223372036854775807LL;
long long maxTime = 0;
- for(std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
shared_ptr<ThreadManagerTests::Task> task = *ix;
@@ -129,19 +134,19 @@
assert(delta > 0);
- if(task->_startTime < firstTime) {
+ if (task->_startTime < firstTime) {
firstTime = task->_startTime;
}
- if(task->_endTime > lastTime) {
+ if (task->_endTime > lastTime) {
lastTime = task->_endTime;
}
- if(delta < minTime) {
+ if (delta < minTime) {
minTime = delta;
}
- if(delta > maxTime) {
+ if (delta > maxTime) {
maxTime = delta;
}
@@ -156,7 +161,7 @@
double error = ((time01 - time00) - expectedTime) / expectedTime;
- if(error < 0) {
+ if (error < 0) {
error*= -1.0;
}
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
index fe56d31..faab733 100644
--- a/lib/cpp/src/concurrency/test/TimerManagerTests.h
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -10,29 +10,29 @@
using namespace facebook::thrift::concurrency;
-/** ThreadManagerTests class
-
- @author marc
- @version $Id:$ */
-
+/**
+ * ThreadManagerTests class
+ *
+ * @author marc
+ * @version $Id:$
+ */
class TimerManagerTests {
public:
static const double ERROR;
- class Task: public Runnable {
-
- public:
+ class Task: public Runnable {
+ public:
Task(Monitor& monitor, long long timeout) :
_timeout(timeout),
_startTime(Util::currentTime()),
_monitor(monitor),
_success(false),
- _done(false) {}
+ _done(false) {}
- ~Task() {std::cerr << this << std::endl;}
+ ~Task() { std::cerr << this << std::endl; }
void run() {
@@ -58,9 +58,7 @@
{Synchronized s(_monitor);
_monitor.notifyAll();
}
- }
-
-
+ }
long long _timeout;
long long _startTime;
@@ -70,10 +68,12 @@
bool _done;
};
- /** This test creates two tasks and waits for the first to expire within 10% of the expected expiration time. It then verifies that
- the timer manager properly clean up itself and the remaining orphaned timeout task when the manager goes out of scope and its
- destructor is called. */
-
+ /**
+ * This test creates two tasks and waits for the first to expire within 10%
+ * of the expected expiration time. It then verifies that the timer manager
+ * properly clean up itself and the remaining orphaned timeout task when the
+ * manager goes out of scope and its destructor is called.
+ */
bool test00(long long timeout=1000LL) {
shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
@@ -90,7 +90,8 @@
shared_ptr<TimerManagerTests::Task> task = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
- {Synchronized s(_monitor);
+ {
+ Synchronized s(_monitor);
timerManager.add(orphanTask, 10 * timeout);
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.h b/lib/cpp/src/protocol/TBinaryProtocol.h
index d66fa2f..5bca6dd 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.h
+++ b/lib/cpp/src/protocol/TBinaryProtocol.h
@@ -1,5 +1,5 @@
-#ifndef T_BINARY_PROTOCOL_H
-#define T_BINARY_PROTOCOL_H
+#ifndef _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_
+#define _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_ 1
#include "TProtocol.h"
@@ -147,5 +147,6 @@
}}} // facebook::thrift::protocol
-#endif
+#endif // #ifndef _THRIFT_PROTOCOL_TBINARYPROTOCOL_H_
+
diff --git a/lib/cpp/src/protocol/TProtocol.h b/lib/cpp/src/protocol/TProtocol.h
index e9d560c..65d6f3c 100644
--- a/lib/cpp/src/protocol/TProtocol.h
+++ b/lib/cpp/src/protocol/TProtocol.h
@@ -1,5 +1,5 @@
-#ifndef T_PROTOCOL_H
-#define T_PROTOCOL_H
+#ifndef _THRIFT_PROTOCOL_TPROTOCOL_H_
+#define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1
#include <transport/TTransport.h>
@@ -20,7 +20,7 @@
#define htonll(x) ntohll(x)
-/** Forward declaration for TProtocol */
+// Forward declaration for TProtocol
struct TBuf;
/**
@@ -49,7 +49,8 @@
};
/**
- * Enumerated definition of the message types that the Thrift protocol supports.
+ * Enumerated definition of the message types that the Thrift protocol
+ * supports.
*/
enum TMessageType {
T_CALL = 1,
@@ -297,5 +298,4 @@
}}} // facebook::thrift::protocol
-#endif
-
+#endif // #define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1
diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h
index c94d926..0783ca4 100644
--- a/lib/cpp/src/transport/TBufferedTransport.h
+++ b/lib/cpp/src/transport/TBufferedTransport.h
@@ -1,5 +1,5 @@
-#ifndef T_BUFFERED_TRANSPORT_H
-#define T_BUFFERED_TRANSPORT_H
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ 1
#include "TTransport.h"
#include <string>
@@ -84,4 +84,4 @@
}}} // facebook::thrift::transport
-#endif
+#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TChunkedTransport.h b/lib/cpp/src/transport/TChunkedTransport.h
index c6312d0..ebc2780 100644
--- a/lib/cpp/src/transport/TChunkedTransport.h
+++ b/lib/cpp/src/transport/TChunkedTransport.h
@@ -1,5 +1,5 @@
-#ifndef T_CHUNKED_TRANSPORT_H
-#define T_CHUNKED_TRANSPORT_H
+#ifndef _THRIFT_TRANSPORT_TCHUNKEDTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TCHUNKEDTRANSPORT_H_ 1
#include "TTransport.h"
#include <string>
@@ -80,4 +80,4 @@
}}} // facebook::thrift::transport
-#endif
+#endif // #ifndef _THRIFT_TRANSPORT_TCHUNKEDTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TNullTransport.h b/lib/cpp/src/transport/TNullTransport.h
index 8bb5bd2..dd2999f 100644
--- a/lib/cpp/src/transport/TNullTransport.h
+++ b/lib/cpp/src/transport/TNullTransport.h
@@ -1,5 +1,5 @@
-#ifndef T_NULL_TRANSPORT
-#define T_NULL_TRANSPORT
+#ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ 1
#include "TTransport.h"
@@ -25,4 +25,4 @@
}}} // facebook::thrift::transport
-#endif
+#endif // #ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h
index a7af44b..29cfdfe 100644
--- a/lib/cpp/src/transport/TServerSocket.h
+++ b/lib/cpp/src/transport/TServerSocket.h
@@ -1,5 +1,5 @@
-#ifndef T_SERVER_SOCKET_H
-#define T_SERVER_SOCKET_H
+#ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
+#define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1
#include "TServerTransport.h"
#include <boost/shared_ptr.hpp>
@@ -34,4 +34,4 @@
}}} // facebook::thrift::transport
-#endif
+#endif // #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
diff --git a/lib/cpp/src/transport/TServerTransport.h b/lib/cpp/src/transport/TServerTransport.h
index 390fa70..0abab47 100644
--- a/lib/cpp/src/transport/TServerTransport.h
+++ b/lib/cpp/src/transport/TServerTransport.h
@@ -1,5 +1,5 @@
-#ifndef T_SERVER_TRANSPORT_H
-#define T_SERVER_TRANSPORT_H
+#ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_ 1
#include "TTransport.h"
#include "TTransportException.h"
@@ -65,4 +65,4 @@
}}} // facebook::thrift::transport
-#endif
+#endif // #ifndef _THRIFT_TRANSPORT_TSERVERTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index bf90b63..f1a065f 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -1,5 +1,5 @@
-#ifndef T_SOCKET_H
-#define T_SOCKET_H
+#ifndef _THRIFT_TRANSPORT_TSOCKET_H_
+#define _THRIFT_TRANSPORT_TSOCKET_H_ 1
#include <string>
@@ -100,4 +100,6 @@
};
}}} // facebook::thrift::transport
-#endif
+
+#endif // #ifndef _THRIFT_TRANSPORT_TSOCKET_H_
+
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
index e5e40e4..b2e2a5b 100644
--- a/lib/cpp/src/transport/TTransport.h
+++ b/lib/cpp/src/transport/TTransport.h
@@ -1,5 +1,5 @@
-#ifndef T_TRANSPORT_H
-#define T_TRANSPORT_H
+#ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
#include "TTransportException.h"
#include <string>
@@ -98,4 +98,4 @@
}}} // facebook::thrift::transport
-#endif
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h
index 4b2be87..c54084d 100644
--- a/lib/cpp/src/transport/TTransportException.h
+++ b/lib/cpp/src/transport/TTransportException.h
@@ -1,5 +1,5 @@
-#ifndef T_TRANSPORT_EXCEPTION_H
-#define T_TRANSPORT_EXCEPTION_H
+#ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_
+#define _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_ 1
#include <string>
@@ -64,4 +64,4 @@
}}} // facebook::thrift::transport
-#endif
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTEXCEPTION_H_