THRIFT-4730: remove pthread code and refactor, ending up with just ThreadFactory
diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt
index 9e36665..e12c08c 100755
--- a/lib/cpp/CMakeLists.txt
+++ b/lib/cpp/CMakeLists.txt
@@ -105,32 +105,19 @@
list(APPEND SYSLIBS "${OPENSSL_LIBRARIES}")
endif()
-# WITH_*THREADS selects which threading library to use
-if(UNIX AND NOT WITH_STDTHREADS)
+if(UNIX)
if(ANDROID)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
else()
list(APPEND SYSLIBS pthread)
endif()
- set( thriftcpp_threads_SOURCES
- src/thrift/concurrency/PosixThreadFactory.cpp
- src/thrift/concurrency/Mutex.cpp
- src/thrift/concurrency/Monitor.cpp
- )
-else()
- if(UNIX)
- if(ANDROID)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
- else()
- list(APPEND SYSLIBS pthread)
- endif()
- endif()
- set( thriftcpp_threads_SOURCES
- src/thrift/concurrency/StdThreadFactory.cpp
- src/thrift/concurrency/StdMutex.cpp
- src/thrift/concurrency/StdMonitor.cpp
- )
endif()
+set( thriftcpp_threads_SOURCES
+ src/thrift/concurrency/ThreadFactory.cpp
+ src/thrift/concurrency/Thread.cpp
+ src/thrift/concurrency/Monitor.cpp
+ src/thrift/concurrency/Mutex.cpp
+)
# Thrift non blocking server
set( thriftcppnb_SOURCES
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 85bb9ab..19bedd7 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -107,8 +107,9 @@
src/thrift/server/TThreadedServer.cpp
libthrift_la_SOURCES += src/thrift/concurrency/Mutex.cpp \
- src/thrift/concurrency/Monitor.cpp \
- src/thrift/concurrency/PosixThreadFactory.cpp
+ src/thrift/concurrency/ThreadFactory.cpp \
+ src/thrift/concurrency/Thread.cpp \
+ src/thrift/concurrency/Monitor.cpp
libthriftnb_la_SOURCES = src/thrift/server/TNonblockingServer.cpp \
src/thrift/async/TEvhttpServer.cpp \
@@ -166,12 +167,7 @@
src/thrift/concurrency/Exception.h \
src/thrift/concurrency/Mutex.h \
src/thrift/concurrency/Monitor.h \
- src/thrift/concurrency/PlatformThreadFactory.h \
- src/thrift/concurrency/PosixThreadFactory.h \
- src/thrift/concurrency/StdMonitor.cpp \
- src/thrift/concurrency/StdMutex.cpp \
- src/thrift/concurrency/StdThreadFactory.cpp \
- src/thrift/concurrency/StdThreadFactory.h \
+ src/thrift/concurrency/ThreadFactory.h \
src/thrift/concurrency/Thread.h \
src/thrift/concurrency/ThreadManager.h \
src/thrift/concurrency/TimerManager.h \
diff --git a/lib/cpp/src/thrift/concurrency/Monitor.cpp b/lib/cpp/src/thrift/concurrency/Monitor.cpp
index 9570cc6..7b3b209 100644
--- a/lib/cpp/src/thrift/concurrency/Monitor.cpp
+++ b/lib/cpp/src/thrift/concurrency/Monitor.cpp
@@ -23,43 +23,36 @@
#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Util.h>
#include <thrift/transport/PlatformSocket.h>
-#include <memory>
-
#include <assert.h>
-#include <iostream>
-
-#include <pthread.h>
+#include <condition_variable>
+#include <chrono>
+#include <thread>
+#include <mutex>
namespace apache {
namespace thrift {
-
-using std::unique_ptr;
-using std::shared_ptr;
-
namespace concurrency {
/**
- * Monitor implementation using the POSIX pthread library
+ * Monitor implementation using the std thread library
*
* @version $Id:$
*/
class Monitor::Impl {
public:
- Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) {
- init(ownedMutex_.get());
+ Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); }
+
+ Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); }
+
+ Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) {
+ init(&(monitor->mutex()));
}
- Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); }
-
- Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); }
-
- ~Impl() { cleanup(); }
-
Mutex& mutex() { return *mutex_; }
- void lock() { mutex().lock(); }
- void unlock() { mutex().unlock(); }
+ void lock() { mutex_->lock(); }
+ void unlock() { mutex_->unlock(); }
/**
* Exception-throwing version of waitForTimeRelative(), called simply
@@ -68,15 +61,12 @@
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
- void wait(int64_t timeout_ms) const {
+ void wait(int64_t timeout_ms) {
int result = waitForTimeRelative(timeout_ms);
if (result == THRIFT_ETIMEDOUT) {
- // pthread_cond_timedwait has been observed to return early on
- // various platforms, so comment out this assert.
- // assert(Util::currentTime() >= (now + timeout));
throw TimedOutException();
} else if (result != 0) {
- throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed");
+ throw TException("Monitor::wait() failed");
}
}
@@ -86,88 +76,86 @@
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTimeRelative(int64_t timeout_ms) const {
+ int waitForTimeRelative(int64_t timeout_ms) {
if (timeout_ms == 0LL) {
return waitForever();
}
- struct THRIFT_TIMESPEC abstime;
- Util::toTimespec(abstime, Util::currentTime() + timeout_ms);
- return waitForTime(&abstime);
+ assert(mutex_);
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms))
+ == std::cv_status::timeout);
+ lock.release();
+ return (timedout ? THRIFT_ETIMEDOUT : 0);
}
/**
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
- int waitForTime(const THRIFT_TIMESPEC* abstime) const {
- assert(mutex_);
- pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
- assert(mutexImpl);
-
- // XXX Need to assert that caller owns mutex
- return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime);
- }
-
- int waitForTime(const struct timeval* abstime) const {
- struct THRIFT_TIMESPEC temp;
- temp.tv_sec = abstime->tv_sec;
- temp.tv_nsec = abstime->tv_usec * 1000;
+ int waitForTime(const THRIFT_TIMESPEC* abstime) {
+ struct timeval temp;
+ temp.tv_sec = static_cast<long>(abstime->tv_sec);
+ temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
return waitForTime(&temp);
}
+
+ /**
+ * Waits until the absolute time specified using struct timeval.
+ * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
+ */
+ int waitForTime(const struct timeval* abstime) {
+ assert(mutex_);
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
+ assert(mutexImpl);
+
+ struct timeval currenttime;
+ Util::toTimeval(currenttime, Util::currentTime());
+
+ long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
+ long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
+ if (tv_sec < 0)
+ tv_sec = 0;
+ if (tv_usec < 0)
+ tv_usec = 0;
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ bool timedout = (conditionVariable_.wait_for(lock,
+ std::chrono::seconds(tv_sec)
+ + std::chrono::microseconds(tv_usec))
+ == std::cv_status::timeout);
+ lock.release();
+ return (timedout ? THRIFT_ETIMEDOUT : 0);
+ }
+
/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
- int waitForever() const {
+ int waitForever() {
assert(mutex_);
- pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+ std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
- return pthread_cond_wait(&pthread_cond_, mutexImpl);
+
+ std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
+ conditionVariable_.wait(lock);
+ lock.release();
+ return 0;
}
- void notify() {
- // XXX Need to assert that caller owns mutex
- int iret = pthread_cond_signal(&pthread_cond_);
- THRIFT_UNUSED_VARIABLE(iret);
- assert(iret == 0);
- }
+ void notify() { conditionVariable_.notify_one(); }
- void notifyAll() {
- // XXX Need to assert that caller owns mutex
- int iret = pthread_cond_broadcast(&pthread_cond_);
- THRIFT_UNUSED_VARIABLE(iret);
- assert(iret == 0);
- }
+ void notifyAll() { conditionVariable_.notify_all(); }
private:
- void init(Mutex* mutex) {
- mutex_ = mutex;
+ void init(Mutex* mutex) { mutex_ = mutex; }
- if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
- condInitialized_ = true;
- }
-
- if (!condInitialized_) {
- cleanup();
- throw SystemResourceException();
- }
- }
-
- void cleanup() {
- if (condInitialized_) {
- condInitialized_ = false;
- int iret = pthread_cond_destroy(&pthread_cond_);
- THRIFT_UNUSED_VARIABLE(iret);
- assert(iret == 0);
- }
- }
-
- unique_ptr<Mutex> ownedMutex_;
+ const std::unique_ptr<Mutex> ownedMutex_;
+ std::condition_variable_any conditionVariable_;
Mutex* mutex_;
-
- mutable pthread_cond_t pthread_cond_;
- mutable bool condInitialized_;
};
Monitor::Monitor() : impl_(new Monitor::Impl()) {
@@ -182,43 +170,43 @@
}
Mutex& Monitor::mutex() const {
- return impl_->mutex();
+ return const_cast<Monitor::Impl*>(impl_)->mutex();
}
void Monitor::lock() const {
- impl_->lock();
+ const_cast<Monitor::Impl*>(impl_)->lock();
}
void Monitor::unlock() const {
- impl_->unlock();
+ const_cast<Monitor::Impl*>(impl_)->unlock();
}
void Monitor::wait(int64_t timeout) const {
- impl_->wait(timeout);
+ const_cast<Monitor::Impl*>(impl_)->wait(timeout);
}
int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
- return impl_->waitForTime(abstime);
+ return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}
int Monitor::waitForTime(const timeval* abstime) const {
- return impl_->waitForTime(abstime);
+ return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}
int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
- return impl_->waitForTimeRelative(timeout_ms);
+ return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
}
int Monitor::waitForever() const {
- return impl_->waitForever();
+ return const_cast<Monitor::Impl*>(impl_)->waitForever();
}
void Monitor::notify() const {
- impl_->notify();
+ const_cast<Monitor::Impl*>(impl_)->notify();
}
void Monitor::notifyAll() const {
- impl_->notifyAll();
+ const_cast<Monitor::Impl*>(impl_)->notifyAll();
}
}
}
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.cpp b/lib/cpp/src/thrift/concurrency/Mutex.cpp
index a526461..7580283 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/Mutex.cpp
@@ -17,202 +17,29 @@
* under the License.
*/
-// needed to test for pthread implementation capabilities:
-#define __USE_GNU
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/Thrift.h>
-#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Mutex.h>
-#include <thrift/concurrency/Util.h>
-#include <assert.h>
-#include <stdlib.h>
-#include <pthread.h>
-#include <signal.h>
-#include <string.h>
-
-#include <boost/format.hpp>
+#include <chrono>
+#include <mutex>
namespace apache {
namespace thrift {
namespace concurrency {
-// Enable this to turn on mutex contention profiling support
-// #define THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
-
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
-
-static int32_t mutexProfilingCounter = 0;
-static int32_t mutexProfilingSampleRate = 0;
-static MutexWaitCallback mutexProfilingCallback = 0;
-
-void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback) {
- mutexProfilingSampleRate = profilingSampleRate;
- mutexProfilingCallback = callback;
-}
-
-#define PROFILE_MUTEX_START_LOCK() int64_t _lock_startTime = maybeGetProfilingStartTime();
-
-#define PROFILE_MUTEX_NOT_LOCKED() \
- do { \
- if (_lock_startTime > 0) { \
- int64_t endTime = Util::currentTimeUsec(); \
- (*mutexProfilingCallback)(this, endTime - _lock_startTime); \
- } \
- } while (0)
-
-#define PROFILE_MUTEX_LOCKED() \
- do { \
- profileTime_ = _lock_startTime; \
- if (profileTime_ > 0) { \
- profileTime_ = Util::currentTimeUsec() - profileTime_; \
- } \
- } while (0)
-
-#define PROFILE_MUTEX_START_UNLOCK() \
- int64_t _temp_profileTime = profileTime_; \
- profileTime_ = 0;
-
-#define PROFILE_MUTEX_UNLOCKED() \
- do { \
- if (_temp_profileTime > 0) { \
- (*mutexProfilingCallback)(this, _temp_profileTime); \
- } \
- } while (0)
-
-static inline int64_t maybeGetProfilingStartTime() {
- if (mutexProfilingSampleRate && mutexProfilingCallback) {
- // This block is unsynchronized, but should produce a reasonable sampling
- // rate on most architectures. The main race conditions are the gap
- // between the decrement and the test, the non-atomicity of decrement, and
- // potential caching of different values at different CPUs.
- //
- // - if two decrements race, the likeliest result is that the counter
- // decrements slowly (perhaps much more slowly) than intended.
- //
- // - many threads could potentially decrement before resetting the counter
- // to its large value, causing each additional incoming thread to
- // profile every call. This situation is unlikely to persist for long
- // as the critical gap is quite short, but profiling could be bursty.
- sig_atomic_t localValue = --mutexProfilingCounter;
- if (localValue <= 0) {
- mutexProfilingCounter = mutexProfilingSampleRate;
- return Util::currentTimeUsec();
- }
- }
-
- return 0;
-}
-
-#else
-#define PROFILE_MUTEX_START_LOCK()
-#define PROFILE_MUTEX_NOT_LOCKED()
-#define PROFILE_MUTEX_LOCKED()
-#define PROFILE_MUTEX_START_UNLOCK()
-#define PROFILE_MUTEX_UNLOCKED()
-#endif // THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
-
-#define EINTR_LOOP(_CALL) int ret; do { ret = _CALL; } while (ret == EINTR)
-#define ABORT_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { abort(); } }
-#define THROW_SRE(_CALLSTR, RET) { throw SystemResourceException(boost::str(boost::format("%1% returned %2% (%3%)") % _CALLSTR % RET % ::strerror(RET))); }
-#define THROW_SRE_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { THROW_SRE(#_CALL, ret); } }
-#define THROW_SRE_TRYFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret == 0) { return true; } else if (ret == EBUSY) { return false; } THROW_SRE(#_CALL, ret); }
-
/**
- * Implementation of Mutex class using POSIX mutex
+ * Implementation of Mutex class using C++11 std::timed_mutex
*
- * Throws apache::thrift::concurrency::SystemResourceException on error.
+ * Methods throw std::system_error on error.
*
* @version $Id:$
*/
-class Mutex::impl {
-public:
- impl(Initializer init) : initialized_(false) {
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
- profileTime_ = 0;
-#endif
- init(&pthread_mutex_);
- initialized_ = true;
- }
+class Mutex::impl : public std::timed_mutex {};
- ~impl() {
- if (initialized_) {
- initialized_ = false;
- ABORT_ONFAIL(pthread_mutex_destroy(&pthread_mutex_));
- }
- }
-
- void lock() const {
- PROFILE_MUTEX_START_LOCK();
- THROW_SRE_ONFAIL(pthread_mutex_lock(&pthread_mutex_));
- PROFILE_MUTEX_LOCKED();
- }
-
- bool trylock() const {
- THROW_SRE_TRYFAIL(pthread_mutex_trylock(&pthread_mutex_));
- }
-
- bool timedlock(int64_t milliseconds) const {
-#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L
- PROFILE_MUTEX_START_LOCK();
-
- struct THRIFT_TIMESPEC ts;
- Util::toTimespec(ts, milliseconds + Util::currentTime());
- EINTR_LOOP(pthread_mutex_timedlock(&pthread_mutex_, &ts));
- if (ret == 0) {
- PROFILE_MUTEX_LOCKED();
- return true;
- } else if (ret == ETIMEDOUT) {
- PROFILE_MUTEX_NOT_LOCKED();
- return false;
- }
-
- THROW_SRE("pthread_mutex_timedlock(&pthread_mutex_, &ts)", ret);
-#else
- /* Otherwise follow solution used by Mono for Android */
- struct THRIFT_TIMESPEC sleepytime, now, to;
-
- /* This is just to avoid a completely busy wait */
- sleepytime.tv_sec = 0;
- sleepytime.tv_nsec = 10000000L; /* 10ms */
-
- Util::toTimespec(to, milliseconds + Util::currentTime());
-
- while ((trylock()) == false) {
- Util::toTimespec(now, Util::currentTime());
- if (now.tv_sec >= to.tv_sec && now.tv_nsec >= to.tv_nsec) {
- return false;
- }
- nanosleep(&sleepytime, NULL);
- }
-
- return true;
-#endif
- }
-
- void unlock() const {
- PROFILE_MUTEX_START_UNLOCK();
- THROW_SRE_ONFAIL(pthread_mutex_unlock(&pthread_mutex_));
- PROFILE_MUTEX_UNLOCKED();
- }
-
- void* getUnderlyingImpl() const { return (void*)&pthread_mutex_; }
-
-private:
- mutable pthread_mutex_t pthread_mutex_;
- mutable bool initialized_;
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
- mutable int64_t profileTime_;
-#endif
-};
-
-Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {
+Mutex::Mutex() : impl_(new Mutex::impl()) {
}
void* Mutex::getUnderlyingImpl() const {
- return impl_->getUnderlyingImpl();
+ return impl_.get();
}
void Mutex::lock() const {
@@ -220,161 +47,17 @@
}
bool Mutex::trylock() const {
- return impl_->trylock();
+ return impl_->try_lock();
}
bool Mutex::timedlock(int64_t ms) const {
- return impl_->timedlock(ms);
+ return impl_->try_lock_for(std::chrono::milliseconds(ms));
}
void Mutex::unlock() const {
impl_->unlock();
}
-void Mutex::DEFAULT_INITIALIZER(void* arg) {
- pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg;
- THROW_SRE_ONFAIL(pthread_mutex_init(pthread_mutex, NULL));
-}
-
-#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) || defined(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP) || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP)
-static void init_with_kind(pthread_mutex_t* mutex, int kind) {
- pthread_mutexattr_t mutexattr;
- THROW_SRE_ONFAIL(pthread_mutexattr_init(&mutexattr));
- THROW_SRE_ONFAIL(pthread_mutexattr_settype(&mutexattr, kind));
- THROW_SRE_ONFAIL(pthread_mutex_init(mutex, &mutexattr));
- THROW_SRE_ONFAIL(pthread_mutexattr_destroy(&mutexattr));
-}
-#endif
-
-#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
-void Mutex::ADAPTIVE_INITIALIZER(void* arg) {
- // From mysql source: mysys/my_thr_init.c
- // Set mutex type to "fast" a.k.a "adaptive"
- //
- // In this case the thread may steal the mutex from some other thread
- // that is waiting for the same mutex. This will save us some
- // context switches but may cause a thread to 'starve forever' while
- // waiting for the mutex (not likely if the code within the mutex is
- // short).
- init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ADAPTIVE_NP);
-}
-#endif
-
-#ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
-void Mutex::ERRORCHECK_INITIALIZER(void* arg) {
- init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ERRORCHECK);
-}
-#endif
-
-#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
-void Mutex::RECURSIVE_INITIALIZER(void* arg) {
- init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP);
-}
-#endif
-
-/**
- * Implementation of ReadWriteMutex class using POSIX rw lock
- *
- * @version $Id:$
- */
-class ReadWriteMutex::impl {
-public:
- impl() : initialized_(false) {
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
- profileTime_ = 0;
-#endif
- THROW_SRE_ONFAIL(pthread_rwlock_init(&rw_lock_, NULL));
- initialized_ = true;
- }
-
- ~impl() {
- if (initialized_) {
- initialized_ = false;
- ABORT_ONFAIL(pthread_rwlock_destroy(&rw_lock_));
- }
- }
-
- void acquireRead() const {
- PROFILE_MUTEX_START_LOCK();
- THROW_SRE_ONFAIL(pthread_rwlock_rdlock(&rw_lock_));
- PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
- }
-
- void acquireWrite() const {
- PROFILE_MUTEX_START_LOCK();
- THROW_SRE_ONFAIL(pthread_rwlock_wrlock(&rw_lock_));
- PROFILE_MUTEX_LOCKED();
- }
-
- bool attemptRead() const { THROW_SRE_TRYFAIL(pthread_rwlock_tryrdlock(&rw_lock_)); }
-
- bool attemptWrite() const { THROW_SRE_TRYFAIL(pthread_rwlock_trywrlock(&rw_lock_)); }
-
- void release() const {
- PROFILE_MUTEX_START_UNLOCK();
- THROW_SRE_ONFAIL(pthread_rwlock_unlock(&rw_lock_));
- PROFILE_MUTEX_UNLOCKED();
- }
-
-private:
- mutable pthread_rwlock_t rw_lock_;
- mutable bool initialized_;
-#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
- mutable int64_t profileTime_;
-#endif
-};
-
-ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {
-}
-
-void ReadWriteMutex::acquireRead() const {
- impl_->acquireRead();
-}
-
-void ReadWriteMutex::acquireWrite() const {
- impl_->acquireWrite();
-}
-
-bool ReadWriteMutex::attemptRead() const {
- return impl_->attemptRead();
-}
-
-bool ReadWriteMutex::attemptWrite() const {
- return impl_->attemptWrite();
-}
-
-void ReadWriteMutex::release() const {
- impl_->release();
-}
-
-NoStarveReadWriteMutex::NoStarveReadWriteMutex() : writerWaiting_(false) {
-}
-
-void NoStarveReadWriteMutex::acquireRead() const {
- if (writerWaiting_) {
- // writer is waiting, block on the writer's mutex until he's done with it
- mutex_.lock();
- mutex_.unlock();
- }
-
- ReadWriteMutex::acquireRead();
-}
-
-void NoStarveReadWriteMutex::acquireWrite() const {
- // if we can acquire the rwlock the easy way, we're done
- if (attemptWrite()) {
- return;
- }
-
- // failed to get the rwlock, do it the hard way:
- // locking the mutex and setting writerWaiting will cause all new readers to
- // block on the mutex rather than on the rwlock.
- mutex_.lock();
- writerWaiting_ = true;
- ReadWriteMutex::acquireWrite();
- writerWaiting_ = false;
- mutex_.unlock();
-}
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.h b/lib/cpp/src/thrift/concurrency/Mutex.h
index a1f5396..123ae78 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.h
+++ b/lib/cpp/src/thrift/concurrency/Mutex.h
@@ -28,31 +28,6 @@
namespace thrift {
namespace concurrency {
-#ifndef THRIFT_NO_CONTENTION_PROFILING
-
-/**
- * Determines if the Thrift Mutex and ReadWriteMutex classes will attempt to
- * profile their blocking acquire methods. If this value is set to non-zero,
- * Thrift will attempt to invoke the callback once every profilingSampleRate
- * times. However, as the sampling is not synchronized the rate is not
- * guranateed, and could be subject to big bursts and swings. Please ensure
- * your sampling callback is as performant as your application requires.
- *
- * The callback will get called with the wait time taken to lock the mutex in
- * usec and a (void*) that uniquely identifies the Mutex (or ReadWriteMutex)
- * being locked.
- *
- * The enableMutexProfiling() function is unsynchronized; calling this function
- * while profiling is already enabled may result in race conditions. On
- * architectures where a pointer assignment is atomic, this is safe but there
- * is no guarantee threads will agree on a single callback within any
- * particular time period.
- */
-typedef void (*MutexWaitCallback)(const void* id, int64_t waitTimeMicros);
-void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback);
-
-#endif
-
/**
* NOTE: All mutex implementations throw an exception on failure. See each
* specific implementation to understand the exception type(s) used.
@@ -65,9 +40,7 @@
*/
class Mutex {
public:
- typedef void (*Initializer)(void*);
-
- Mutex(Initializer init = DEFAULT_INITIALIZER);
+ Mutex();
virtual ~Mutex() {}
virtual void lock() const;
@@ -77,57 +50,11 @@
void* getUnderlyingImpl() const;
- // If you attempt to use one of these and it fails to link, it means
- // your version of pthreads does not support it - try another one.
- static void ADAPTIVE_INITIALIZER(void*);
- static void DEFAULT_INITIALIZER(void*);
- static void ERRORCHECK_INITIALIZER(void*);
- static void RECURSIVE_INITIALIZER(void*);
-
private:
class impl;
std::shared_ptr<impl> impl_;
};
-class ReadWriteMutex {
-public:
- ReadWriteMutex();
- virtual ~ReadWriteMutex() {}
-
- // these get the lock and block until it is done successfully
- virtual void acquireRead() const;
- virtual void acquireWrite() const;
-
- // these attempt to get the lock, returning false immediately if they fail
- virtual bool attemptRead() const;
- virtual bool attemptWrite() const;
-
- // this releases both read and write locks
- virtual void release() const;
-
-private:
- class impl;
- std::shared_ptr<impl> impl_;
-};
-
-/**
- * A ReadWriteMutex that guarantees writers will not be starved by readers:
- * When a writer attempts to acquire the mutex, all new readers will be
- * blocked from acquiring the mutex until the writer has acquired and
- * released it. In some operating systems, this may already be guaranteed
- * by a regular ReadWriteMutex.
- */
-class NoStarveReadWriteMutex : public ReadWriteMutex {
-public:
- NoStarveReadWriteMutex();
-
- virtual void acquireRead() const;
- virtual void acquireWrite() const;
-
-private:
- Mutex mutex_;
- mutable volatile bool writerWaiting_;
-};
class Guard : boost::noncopyable {
public:
@@ -156,32 +83,6 @@
const Mutex* mutex_;
};
-// Can be used as second argument to RWGuard to make code more readable
-// as to whether we're doing acquireRead() or acquireWrite().
-enum RWGuardType { RW_READ = 0, RW_WRITE = 1 };
-
-class RWGuard : boost::noncopyable {
-public:
- RWGuard(const ReadWriteMutex& value, bool write = false) : rw_mutex_(value) {
- if (write) {
- rw_mutex_.acquireWrite();
- } else {
- rw_mutex_.acquireRead();
- }
- }
-
- RWGuard(const ReadWriteMutex& value, RWGuardType type) : rw_mutex_(value) {
- if (type == RW_WRITE) {
- rw_mutex_.acquireWrite();
- } else {
- rw_mutex_.acquireRead();
- }
- }
- ~RWGuard() { rw_mutex_.release(); }
-
-private:
- const ReadWriteMutex& rw_mutex_;
-};
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
deleted file mode 100644
index 5c59269..0000000
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/concurrency/Exception.h>
-#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/PosixThreadFactory.h>
-
-#if GOOGLE_PERFTOOLS_REGISTER_THREAD
-#include <google/profiler.h>
-#endif
-
-#include <assert.h>
-#include <pthread.h>
-
-#include <iostream>
-
-#include <memory>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * The POSIX thread class.
- *
- * @version $Id:$
- */
-class PthreadThread : public Thread {
-public:
- enum STATE { uninitialized, starting, started, stopping, stopped };
-
- static const int MB = 1024 * 1024;
-
- static void* threadMain(void* arg);
-
-private:
- pthread_t pthread_;
- Monitor monitor_; // guard to protect state_ and also notification
- STATE state_; // to protect proper thread start behavior
- int policy_;
- int priority_;
- int stackSize_;
- std::weak_ptr<PthreadThread> self_;
- bool detached_;
-
-public:
- PthreadThread(int policy,
- int priority,
- int stackSize,
- bool detached,
- std::shared_ptr<Runnable> runnable)
- :
-
-#ifndef _WIN32
- pthread_(0),
-#endif // _WIN32
- state_(uninitialized),
- policy_(policy),
- priority_(priority),
- stackSize_(stackSize),
- detached_(detached) {
-
- this->Thread::runnable(runnable);
- }
-
- ~PthreadThread() {
- /* Nothing references this thread, if is is not detached, do a join
- now, otherwise the thread-id and, possibly, other resources will
- be leaked. */
- if (!detached_) {
- try {
- join();
- } catch (...) {
- // We're really hosed.
- }
- }
- }
-
- STATE getState() const
- {
- Synchronized sync(monitor_);
- return state_;
- }
-
- void setState(STATE newState)
- {
- Synchronized sync(monitor_);
- state_ = newState;
-
- // unblock start() with the knowledge that the thread has actually
- // started running, which avoids a race in detached threads.
- if (newState == started) {
- monitor_.notify();
- }
- }
-
- void start() {
- if (getState() != uninitialized) {
- return;
- }
-
- pthread_attr_t thread_attr;
- if (pthread_attr_init(&thread_attr) != 0) {
- throw SystemResourceException("pthread_attr_init failed");
- }
-
- if (pthread_attr_setdetachstate(&thread_attr,
- detached_ ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE)
- != 0) {
- throw SystemResourceException("pthread_attr_setdetachstate failed");
- }
-
- // Set thread stack size
- if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
- throw SystemResourceException("pthread_attr_setstacksize failed");
- }
-
-// Set thread policy
-#ifdef _WIN32
- // WIN32 Pthread implementation doesn't seem to support sheduling policies other then
- // PosixThreadFactory::OTHER - runtime error
- policy_ = PosixThreadFactory::OTHER;
-#endif
-
-#if _POSIX_THREAD_PRIORITY_SCHEDULING > 0
- if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
- throw SystemResourceException("pthread_attr_setschedpolicy failed");
- }
-#endif
-
- struct sched_param sched_param;
- sched_param.sched_priority = priority_;
-
- // Set thread priority
- if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
- throw SystemResourceException("pthread_attr_setschedparam failed");
- }
-
- // Create reference
- std::shared_ptr<PthreadThread>* selfRef = new std::shared_ptr<PthreadThread>();
- *selfRef = self_.lock();
-
- setState(starting);
-
- Synchronized sync(monitor_);
-
- if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
- throw SystemResourceException("pthread_create failed");
- }
-
- // The caller may not choose to guarantee the scope of the Runnable
- // being used in the thread, so we must actually wait until the thread
- // starts before we return. If we do not wait, it would be possible
- // for the caller to start destructing the Runnable and the Thread,
- // and we would end up in a race. This was identified with valgrind.
- monitor_.wait();
- }
-
- void join() {
- if (!detached_ && getState() != uninitialized) {
- void* ignore;
- /* XXX
- If join fails it is most likely due to the fact
- that the last reference was the thread itself and cannot
- join. This results in leaked threads and will eventually
- cause the process to run out of thread resources.
- We're beyond the point of throwing an exception. Not clear how
- best to handle this. */
- int res = pthread_join(pthread_, &ignore);
- detached_ = (res == 0);
- if (res != 0) {
- GlobalOutput.printf("PthreadThread::join(): fail with code %d", res);
- }
- }
- }
-
- Thread::id_t getId() {
-
-#ifndef _WIN32
- return (Thread::id_t)pthread_;
-#else
- return (Thread::id_t)pthread_.p;
-#endif // _WIN32
- }
-
- std::shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
-
- void runnable(std::shared_ptr<Runnable> value) { Thread::runnable(value); }
-
- void weakRef(std::shared_ptr<PthreadThread> self) {
- assert(self.get() == this);
- self_ = std::weak_ptr<PthreadThread>(self);
- }
-};
-
-void* PthreadThread::threadMain(void* arg) {
- std::shared_ptr<PthreadThread> thread = *(std::shared_ptr<PthreadThread>*)arg;
- delete reinterpret_cast<std::shared_ptr<PthreadThread>*>(arg);
-
-#if GOOGLE_PERFTOOLS_REGISTER_THREAD
- ProfilerRegisterThread();
-#endif
-
- thread->setState(started);
-
- thread->runnable()->run();
-
- STATE _s = thread->getState();
- if (_s != stopping && _s != stopped) {
- thread->setState(stopping);
- }
-
- return (void*)0;
-}
-
-/**
- * Converts generic posix thread schedule policy enums into pthread
- * API values.
- */
-static int toPthreadPolicy(PosixThreadFactory::POLICY policy) {
- switch (policy) {
- case PosixThreadFactory::OTHER:
- return SCHED_OTHER;
- case PosixThreadFactory::FIFO:
- return SCHED_FIFO;
- case PosixThreadFactory::ROUND_ROBIN:
- return SCHED_RR;
- }
- return SCHED_OTHER;
-}
-
-/**
- * Converts relative thread priorities to absolute value based on posix
- * thread scheduler policy
- *
- * The idea is simply to divide up the priority range for the given policy
- * into the correpsonding relative priority level (lowest..highest) and
- * then pro-rate accordingly.
- */
-static int toPthreadPriority(PosixThreadFactory::POLICY policy, PosixThreadFactory::PRIORITY priority) {
- int pthread_policy = toPthreadPolicy(policy);
- int min_priority = 0;
- int max_priority = 0;
-#ifdef HAVE_SCHED_GET_PRIORITY_MIN
- min_priority = sched_get_priority_min(pthread_policy);
-#endif
-#ifdef HAVE_SCHED_GET_PRIORITY_MAX
- max_priority = sched_get_priority_max(pthread_policy);
-#endif
- int quanta = (PosixThreadFactory::HIGHEST - PosixThreadFactory::LOWEST) + 1;
- float stepsperquanta = (float)(max_priority - min_priority) / quanta;
-
- if (priority <= PosixThreadFactory::HIGHEST) {
- return (int)(min_priority + stepsperquanta * priority);
- } else {
- // should never get here for priority increments.
- assert(false);
- return (int)(min_priority + stepsperquanta * PosixThreadFactory::NORMAL);
- }
-}
-
-PosixThreadFactory::PosixThreadFactory(POLICY policy,
- PRIORITY priority,
- int stackSize,
- bool detached)
- : ThreadFactory(detached),
- policy_(policy),
- priority_(priority),
- stackSize_(stackSize) {
-}
-
-PosixThreadFactory::PosixThreadFactory(bool detached)
- : ThreadFactory(detached),
- policy_(ROUND_ROBIN),
- priority_(NORMAL),
- stackSize_(1) {
-}
-
-std::shared_ptr<Thread> PosixThreadFactory::newThread(std::shared_ptr<Runnable> runnable) const {
- std::shared_ptr<PthreadThread> result
- = std::shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_),
- toPthreadPriority(policy_, priority_),
- stackSize_,
- isDetached(),
- runnable));
- result->weakRef(result);
- runnable->thread(result);
- return result;
-}
-
-int PosixThreadFactory::getStackSize() const {
- return stackSize_;
-}
-
-void PosixThreadFactory::setStackSize(int value) {
- stackSize_ = value;
-}
-
-PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const {
- return priority_;
-}
-
-void PosixThreadFactory::setPriority(PRIORITY value) {
- priority_ = value;
-}
-
-Thread::id_t PosixThreadFactory::getCurrentThreadId() const {
-#ifndef _WIN32
- return (Thread::id_t)pthread_self();
-#else
- return (Thread::id_t)pthread_self().p;
-#endif // _WIN32
-}
-
-}
-}
-} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h b/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
deleted file mode 100644
index cb3b17c..0000000
--- a/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
-#define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1
-
-#include <thrift/concurrency/Thread.h>
-
-#include <memory>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * A thread factory to create posix threads
- *
- * @version $Id:$
- */
-class PosixThreadFactory : public ThreadFactory {
-
-public:
- /**
- * POSIX Thread scheduler policies
- */
- enum POLICY { OTHER, FIFO, ROUND_ROBIN };
-
- /**
- * POSIX Thread scheduler relative priorities,
- *
- * Absolute priority is determined by scheduler policy and OS. This
- * enumeration specifies relative priorities such that one can specify a
- * priority within a giving scheduler policy without knowing the absolute
- * value of the priority.
- */
- enum PRIORITY {
- LOWEST = 0,
- LOWER = 1,
- LOW = 2,
- NORMAL = 3,
- HIGH = 4,
- HIGHER = 5,
- HIGHEST = 6,
- INCREMENT = 7,
- DECREMENT = 8
- };
-
- /**
- * Posix thread (pthread) factory. All threads created by a factory are reference-counted
- * via std::shared_ptr. The factory guarantees that threads and the Runnable tasks
- * they host will be properly cleaned up once the last strong reference to both is
- * given up.
- *
- * Threads are created with the specified policy, priority, stack-size and detachable-mode
- * detached means the thread is free-running and will release all system resources the
- * when it completes. A detachable thread is not joinable. The join method
- * of a detachable thread will return immediately with no error.
- *
- * By default threads are not joinable.
- */
- PosixThreadFactory(POLICY policy = ROUND_ROBIN,
- PRIORITY priority = NORMAL,
- int stackSize = 1,
- bool detached = true);
-
- /**
- * Provide a constructor compatible with the other factories
- * The default policy is POLICY::ROUND_ROBIN.
- * The default priority is PRIORITY::NORMAL.
- * The default stackSize is 1.
- */
- PosixThreadFactory(bool detached);
-
- // From ThreadFactory;
- std::shared_ptr<Thread> newThread(std::shared_ptr<Runnable> runnable) const;
-
- // From ThreadFactory;
- Thread::id_t getCurrentThreadId() const;
-
- /**
- * Gets stack size for newly created threads
- *
- * @return int size in megabytes
- */
- virtual int getStackSize() const;
-
- /**
- * Sets stack size for newly created threads
- *
- * @param value size in megabytes
- */
- virtual void setStackSize(int value);
-
- /**
- * Gets priority relative to current policy
- */
- virtual PRIORITY getPriority() const;
-
- /**
- * Sets priority relative to current policy
- */
- virtual void setPriority(PRIORITY priority);
-
-private:
- POLICY policy_;
- PRIORITY priority_;
- int stackSize_;
-};
-}
-}
-} // apache::thrift::concurrency
-
-#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
diff --git a/lib/cpp/src/thrift/concurrency/StdMonitor.cpp b/lib/cpp/src/thrift/concurrency/StdMonitor.cpp
deleted file mode 100644
index 7b3b209..0000000
--- a/lib/cpp/src/thrift/concurrency/StdMonitor.cpp
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/Exception.h>
-#include <thrift/concurrency/Util.h>
-#include <thrift/transport/PlatformSocket.h>
-#include <assert.h>
-
-#include <condition_variable>
-#include <chrono>
-#include <thread>
-#include <mutex>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * Monitor implementation using the std thread library
- *
- * @version $Id:$
- */
-class Monitor::Impl {
-
-public:
- Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); }
-
- Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); }
-
- Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) {
- init(&(monitor->mutex()));
- }
-
- Mutex& mutex() { return *mutex_; }
- void lock() { mutex_->lock(); }
- void unlock() { mutex_->unlock(); }
-
- /**
- * Exception-throwing version of waitForTimeRelative(), called simply
- * wait(int64) for historical reasons. Timeout is in milliseconds.
- *
- * If the condition occurs, this function returns cleanly; on timeout or
- * error an exception is thrown.
- */
- void wait(int64_t timeout_ms) {
- int result = waitForTimeRelative(timeout_ms);
- if (result == THRIFT_ETIMEDOUT) {
- throw TimedOutException();
- } else if (result != 0) {
- throw TException("Monitor::wait() failed");
- }
- }
-
- /**
- * Waits until the specified timeout in milliseconds for the condition to
- * occur, or waits forever if timeout_ms == 0.
- *
- * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
- */
- int waitForTimeRelative(int64_t timeout_ms) {
- if (timeout_ms == 0LL) {
- return waitForever();
- }
-
- assert(mutex_);
- std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
- assert(mutexImpl);
-
- std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
- bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms))
- == std::cv_status::timeout);
- lock.release();
- return (timedout ? THRIFT_ETIMEDOUT : 0);
- }
-
- /**
- * Waits until the absolute time specified using struct THRIFT_TIMESPEC.
- * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
- */
- int waitForTime(const THRIFT_TIMESPEC* abstime) {
- struct timeval temp;
- temp.tv_sec = static_cast<long>(abstime->tv_sec);
- temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
- return waitForTime(&temp);
- }
-
- /**
- * Waits until the absolute time specified using struct timeval.
- * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
- */
- int waitForTime(const struct timeval* abstime) {
- assert(mutex_);
- std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
- assert(mutexImpl);
-
- struct timeval currenttime;
- Util::toTimeval(currenttime, Util::currentTime());
-
- long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
- long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
- if (tv_sec < 0)
- tv_sec = 0;
- if (tv_usec < 0)
- tv_usec = 0;
-
- std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
- bool timedout = (conditionVariable_.wait_for(lock,
- std::chrono::seconds(tv_sec)
- + std::chrono::microseconds(tv_usec))
- == std::cv_status::timeout);
- lock.release();
- return (timedout ? THRIFT_ETIMEDOUT : 0);
- }
-
- /**
- * Waits forever until the condition occurs.
- * Returns 0 if condition occurs, or an error code otherwise.
- */
- int waitForever() {
- assert(mutex_);
- std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl());
- assert(mutexImpl);
-
- std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock);
- conditionVariable_.wait(lock);
- lock.release();
- return 0;
- }
-
- void notify() { conditionVariable_.notify_one(); }
-
- void notifyAll() { conditionVariable_.notify_all(); }
-
-private:
- void init(Mutex* mutex) { mutex_ = mutex; }
-
- const std::unique_ptr<Mutex> ownedMutex_;
- std::condition_variable_any conditionVariable_;
- Mutex* mutex_;
-};
-
-Monitor::Monitor() : impl_(new Monitor::Impl()) {
-}
-Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {
-}
-Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {
-}
-
-Monitor::~Monitor() {
- delete impl_;
-}
-
-Mutex& Monitor::mutex() const {
- return const_cast<Monitor::Impl*>(impl_)->mutex();
-}
-
-void Monitor::lock() const {
- const_cast<Monitor::Impl*>(impl_)->lock();
-}
-
-void Monitor::unlock() const {
- const_cast<Monitor::Impl*>(impl_)->unlock();
-}
-
-void Monitor::wait(int64_t timeout) const {
- const_cast<Monitor::Impl*>(impl_)->wait(timeout);
-}
-
-int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
- return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
-}
-
-int Monitor::waitForTime(const timeval* abstime) const {
- return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
-}
-
-int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
- return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
-}
-
-int Monitor::waitForever() const {
- return const_cast<Monitor::Impl*>(impl_)->waitForever();
-}
-
-void Monitor::notify() const {
- const_cast<Monitor::Impl*>(impl_)->notify();
-}
-
-void Monitor::notifyAll() const {
- const_cast<Monitor::Impl*>(impl_)->notifyAll();
-}
-}
-}
-} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/StdMutex.cpp b/lib/cpp/src/thrift/concurrency/StdMutex.cpp
deleted file mode 100644
index e0f79fa..0000000
--- a/lib/cpp/src/thrift/concurrency/StdMutex.cpp
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#include <thrift/concurrency/Mutex.h>
-#include <thrift/concurrency/Util.h>
-
-#include <cassert>
-#include <chrono>
-#include <mutex>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * Implementation of Mutex class using C++11 std::timed_mutex
- *
- * Methods throw std::system_error on error.
- *
- * @version $Id:$
- */
-class Mutex::impl : public std::timed_mutex {};
-
-Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {
- ((void)init);
-}
-
-void* Mutex::getUnderlyingImpl() const {
- return impl_.get();
-}
-
-void Mutex::lock() const {
- impl_->lock();
-}
-
-bool Mutex::trylock() const {
- return impl_->try_lock();
-}
-
-bool Mutex::timedlock(int64_t ms) const {
- return impl_->try_lock_for(std::chrono::milliseconds(ms));
-}
-
-void Mutex::unlock() const {
- impl_->unlock();
-}
-
-void Mutex::DEFAULT_INITIALIZER(void* arg) {
- ((void)arg);
-}
-}
-}
-} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
deleted file mode 100644
index c885f3a..0000000
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <thrift/thrift-config.h>
-
-#if USE_STD_THREAD
-
-#include <thrift/concurrency/Exception.h>
-#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/StdThreadFactory.h>
-#include <memory>
-
-#include <cassert>
-#include <thread>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * The C++11 thread class.
- *
- * Note that we use boost shared_ptr rather than std shared_ptrs here
- * because the Thread/Runnable classes use those and we don't want to
- * mix them.
- *
- * @version $Id:$
- */
-class StdThread : public Thread, public std::enable_shared_from_this<StdThread> {
-public:
- enum STATE { uninitialized, starting, started, stopping, stopped };
-
- static void threadMain(std::shared_ptr<StdThread> thread);
-
-private:
- std::unique_ptr<std::thread> thread_;
- Monitor monitor_;
- STATE state_;
- bool detached_;
-
-public:
- StdThread(bool detached, std::shared_ptr<Runnable> runnable)
- : state_(uninitialized), detached_(detached) {
- this->Thread::runnable(runnable);
- }
-
- ~StdThread() {
- if (!detached_ && thread_->joinable()) {
- try {
- join();
- } catch (...) {
- // We're really hosed.
- }
- }
- }
-
- STATE getState() const
- {
- Synchronized sync(monitor_);
- return state_;
- }
-
- void setState(STATE newState)
- {
- Synchronized sync(monitor_);
- state_ = newState;
-
- // unblock start() with the knowledge that the thread has actually
- // started running, which avoids a race in detached threads.
- if (newState == started) {
- monitor_.notify();
- }
- }
-
- void start() {
- if (getState() != uninitialized) {
- return;
- }
-
- std::shared_ptr<StdThread> selfRef = shared_from_this();
- setState(starting);
-
- Synchronized sync(monitor_);
- thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef));
-
- if (detached_)
- thread_->detach();
-
- // Wait for the thread to start and get far enough to grab everything
- // that it needs from the calling context, thus absolving the caller
- // from being required to hold on to runnable indefinitely.
- monitor_.wait();
- }
-
- void join() {
- if (!detached_ && state_ != uninitialized) {
- thread_->join();
- }
- }
-
- Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); }
-
- std::shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
-
- void runnable(std::shared_ptr<Runnable> value) { Thread::runnable(value); }
-};
-
-void StdThread::threadMain(std::shared_ptr<StdThread> thread) {
-#if GOOGLE_PERFTOOLS_REGISTER_THREAD
- ProfilerRegisterThread();
-#endif
-
- thread->setState(started);
- thread->runnable()->run();
-
- if (thread->getState() != stopping && thread->getState() != stopped) {
- thread->setState(stopping);
- }
-}
-
-StdThreadFactory::StdThreadFactory(bool detached) : ThreadFactory(detached) {
-}
-
-std::shared_ptr<Thread> StdThreadFactory::newThread(std::shared_ptr<Runnable> runnable) const {
- std::shared_ptr<StdThread> result = std::shared_ptr<StdThread>(new StdThread(isDetached(), runnable));
- runnable->thread(result);
- return result;
-}
-
-Thread::id_t StdThreadFactory::getCurrentThreadId() const {
- return std::this_thread::get_id();
-}
-}
-}
-} // apache::thrift::concurrency
-
-#endif // USE_STD_THREAD
diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
deleted file mode 100644
index e74046b..0000000
--- a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
-#define _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ 1
-
-#include <thrift/concurrency/Thread.h>
-
-#include <memory>
-
-namespace apache {
-namespace thrift {
-namespace concurrency {
-
-/**
- * A thread factory to create std::threads.
- *
- * @version $Id:$
- */
-class StdThreadFactory : public ThreadFactory {
-
-public:
- /**
- * Std thread factory. All threads created by a factory are reference-counted
- * via std::shared_ptr. The factory guarantees that threads and the Runnable tasks
- * they host will be properly cleaned up once the last strong reference
- * to both is given up.
- *
- * By default threads are not joinable.
- */
-
- StdThreadFactory(bool detached = true);
-
- // From ThreadFactory;
- std::shared_ptr<Thread> newThread(std::shared_ptr<Runnable> runnable) const;
-
- // From ThreadFactory;
- Thread::id_t getCurrentThreadId() const;
-};
-
-}
-}
-} // apache::thrift::concurrency
-
-#endif // #ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_
diff --git a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h b/lib/cpp/src/thrift/concurrency/Thread.cpp
similarity index 61%
copy from lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h
copy to lib/cpp/src/thrift/concurrency/Thread.cpp
index 99b4403..a2bb127 100644
--- a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/Thread.cpp
@@ -17,32 +17,21 @@
* under the License.
*/
-#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
-#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1
-
-// clang-format off
-#include <thrift/thrift-config.h>
-#if USE_STD_THREAD
-# include <thrift/concurrency/StdThreadFactory.h>
-#else
-# include <thrift/concurrency/PosixThreadFactory.h>
-#endif
-// clang-format on
+#include <thrift/concurrency/Thread.h>
namespace apache {
namespace thrift {
namespace concurrency {
-// clang-format off
-#if USE_STD_THREAD
- typedef StdThreadFactory PlatformThreadFactory;
-#else
- typedef PosixThreadFactory PlatformThreadFactory;
-#endif
-// clang-format on
+void Thread::threadMain(std::shared_ptr<Thread> thread) {
+ thread->setState(started);
+ thread->runnable()->run();
+
+ if (thread->getState() != stopping && thread->getState() != stopped) {
+ thread->setState(stopping);
+ }
+}
}
}
} // apache::thrift::concurrency
-
-#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h
index b2ea4e2..729d11a 100644
--- a/lib/cpp/src/thrift/concurrency/Thread.h
+++ b/lib/cpp/src/thrift/concurrency/Thread.h
@@ -22,16 +22,10 @@
#include <stdint.h>
#include <memory>
+#include <thread>
#include <thrift/thrift-config.h>
-
-#if USE_STD_THREAD
-#include <thread>
-#else
-#ifdef HAVE_PTHREAD_H
-#include <pthread.h>
-#endif
-#endif
+#include <thrift/concurrency/Monitor.h>
namespace apache {
namespace thrift {
@@ -75,94 +69,106 @@
*
* @see apache::thrift::concurrency::ThreadFactory)
*/
-class Thread {
+class Thread final : public std::enable_shared_from_this<Thread> {
public:
-#if USE_STD_THREAD
typedef std::thread::id id_t;
+ enum STATE { uninitialized, starting, started, stopping, stopped };
+
+ static void threadMain(std::shared_ptr<Thread> thread);
+
static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); }
static inline id_t get_current() { return std::this_thread::get_id(); }
-#else
- typedef pthread_t id_t;
- static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); }
- static inline id_t get_current() { return pthread_self(); }
-#endif
+ Thread(bool detached, std::shared_ptr<Runnable> runnable)
+ : state_(uninitialized), detached_(detached) {
+ this->_runnable = runnable;
+ }
- virtual ~Thread(){};
+ ~Thread() {
+ if (!detached_ && thread_->joinable()) {
+ try {
+ join();
+ } catch (...) {
+ // We're really hosed.
+ }
+ }
+ }
+
+ STATE getState() const
+ {
+ Synchronized sync(monitor_);
+ return state_;
+ }
+
+ void setState(STATE newState)
+ {
+ Synchronized sync(monitor_);
+ state_ = newState;
+
+ // unblock start() with the knowledge that the thread has actually
+ // started running, which avoids a race in detached threads.
+ if (newState == started) {
+ monitor_.notify();
+ }
+ }
/**
* Starts the thread. Does platform specific thread creation and
* configuration then invokes the run method of the Runnable object bound
* to this thread.
*/
- virtual void start() = 0;
+ void start() {
+ if (getState() != uninitialized) {
+ return;
+ }
+
+ std::shared_ptr<Thread> selfRef = shared_from_this();
+ setState(starting);
+
+ Synchronized sync(monitor_);
+ thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef));
+
+ if (detached_)
+ thread_->detach();
+
+ // Wait for the thread to start and get far enough to grab everything
+ // that it needs from the calling context, thus absolving the caller
+ // from being required to hold on to runnable indefinitely.
+ monitor_.wait();
+ }
/**
* Join this thread. If this thread is joinable, the calling thread blocks
* until this thread completes. If the target thread is not joinable, then
* nothing happens.
*/
- virtual void join() = 0;
+ void join() {
+ if (!detached_ && state_ != uninitialized) {
+ thread_->join();
+ }
+ }
/**
* Gets the thread's platform-specific ID
*/
- virtual id_t getId() = 0;
+ Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); }
/**
* Gets the runnable object this thread is hosting
*/
- virtual std::shared_ptr<Runnable> runnable() const { return _runnable; }
-
-protected:
- virtual void runnable(std::shared_ptr<Runnable> value) { _runnable = value; }
+ std::shared_ptr<Runnable> runnable() const { return _runnable; }
private:
std::shared_ptr<Runnable> _runnable;
-};
-
-/**
- * Factory to create platform-specific thread object and bind them to Runnable
- * object for execution
- */
-class ThreadFactory {
-protected:
- ThreadFactory(bool detached) : detached_(detached) { }
-
-public:
- virtual ~ThreadFactory() { }
-
- /**
- * Gets current detached mode
- */
- bool isDetached() const { return detached_; }
-
- /**
- * Sets the detached disposition of newly created threads.
- */
- void setDetached(bool detached) { detached_ = detached; }
-
- /**
- * Create a new thread.
- */
- virtual std::shared_ptr<Thread> newThread(std::shared_ptr<Runnable> runnable) const = 0;
-
- /**
- * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
- */
- virtual Thread::id_t getCurrentThreadId() const = 0;
-
- /**
- * For code readability define the unknown/undefined thread id
- */
- static const Thread::id_t unknown_thread_id;
-
-private:
+ std::unique_ptr<std::thread> thread_;
+ Monitor monitor_;
+ STATE state_;
bool detached_;
};
+
}
}
} // apache::thrift::concurrency
diff --git a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h b/lib/cpp/src/thrift/concurrency/ThreadFactory.cpp
similarity index 64%
rename from lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h
rename to lib/cpp/src/thrift/concurrency/ThreadFactory.cpp
index 99b4403..941b993 100644
--- a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h
+++ b/lib/cpp/src/thrift/concurrency/ThreadFactory.cpp
@@ -17,32 +17,24 @@
* under the License.
*/
-#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
-#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1
-
-// clang-format off
#include <thrift/thrift-config.h>
-#if USE_STD_THREAD
-# include <thrift/concurrency/StdThreadFactory.h>
-#else
-# include <thrift/concurrency/PosixThreadFactory.h>
-#endif
-// clang-format on
+
+#include <thrift/concurrency/ThreadFactory.h>
+#include <memory>
namespace apache {
namespace thrift {
namespace concurrency {
-// clang-format off
-#if USE_STD_THREAD
- typedef StdThreadFactory PlatformThreadFactory;
-#else
- typedef PosixThreadFactory PlatformThreadFactory;
-#endif
-// clang-format on
+std::shared_ptr<Thread> ThreadFactory::newThread(std::shared_ptr<Runnable> runnable) const {
+ std::shared_ptr<Thread> result = std::shared_ptr<Thread>(new Thread(isDetached(), runnable));
+ runnable->thread(result);
+ return result;
+}
+Thread::id_t ThreadFactory::getCurrentThreadId() const {
+ return std::this_thread::get_id();
+}
}
}
} // apache::thrift::concurrency
-
-#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
diff --git a/lib/cpp/src/thrift/concurrency/ThreadFactory.h b/lib/cpp/src/thrift/concurrency/ThreadFactory.h
new file mode 100644
index 0000000..f317afc
--- /dev/null
+++ b/lib/cpp/src/thrift/concurrency/ThreadFactory.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_THREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_THREADFACTORY_H_ 1
+
+#include <thrift/concurrency/Thread.h>
+
+#include <memory>
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+/**
+ * Factory to create thread object and bind them to Runnable
+ * object for execution
+ */
+class ThreadFactory final {
+public:
+ /**
+ * All threads created by a factory are reference-counted
+ * via std::shared_ptr. The factory guarantees that threads and the Runnable tasks
+ * they host will be properly cleaned up once the last strong reference
+ * to both is given up.
+ *
+ * By default threads are not joinable.
+ */
+ ThreadFactory(bool detached = true) : detached_(detached) { }
+
+ ~ThreadFactory() = default;
+
+ /**
+ * Gets current detached mode
+ */
+ bool isDetached() const { return detached_; }
+
+ /**
+ * Sets the detached disposition of newly created threads.
+ */
+ void setDetached(bool detached) { detached_ = detached; }
+
+ /**
+ * Create a new thread.
+ */
+ std::shared_ptr<Thread> newThread(std::shared_ptr<Runnable> runnable) const;
+
+ /**
+ * Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread
+ */
+ Thread::id_t getCurrentThreadId() const;
+
+ /**
+ * For code readability define the unknown/undefined thread id
+ */
+ static const Thread::id_t unknown_thread_id;
+
+private:
+ bool detached_;
+};
+
+}
+}
+} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_THREADFACTORY_H_
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.h b/lib/cpp/src/thrift/concurrency/ThreadManager.h
index 470fc0a..4b4b3d4 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.h
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.h
@@ -23,7 +23,7 @@
#include <functional>
#include <memory>
#include <sys/types.h>
-#include <thrift/concurrency/Thread.h>
+#include <thrift/concurrency/ThreadFactory.h>
namespace apache {
namespace thrift {
diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.h b/lib/cpp/src/thrift/concurrency/TimerManager.h
index ba79226..4d73b00 100644
--- a/lib/cpp/src/thrift/concurrency/TimerManager.h
+++ b/lib/cpp/src/thrift/concurrency/TimerManager.h
@@ -22,7 +22,7 @@
#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/Thread.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <memory>
#include <map>
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index f16fce7..31ff2a9 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -22,7 +22,7 @@
#include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/Exception.h>
#include <thrift/transport/TSocket.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/transport/PlatformSocket.h>
#include <algorithm>
@@ -1118,12 +1118,7 @@
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
- ioThreadFactory_.reset(new PlatformThreadFactory(
-#if !USE_STD_THREAD
- PlatformThreadFactory::OTHER, // scheduler
- PlatformThreadFactory::NORMAL, // priority
- 1, // stack size (MB)
-#endif
+ ioThreadFactory_.reset(new ThreadFactory(
false // detached
));
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h
index e79c24f..2c2389c 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.h
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.h
@@ -30,7 +30,7 @@
#include <thrift/concurrency/ThreadManager.h>
#include <climits>
#include <thrift/concurrency/Thread.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Mutex.h>
#include <stack>
#include <vector>
@@ -53,7 +53,7 @@
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
-using apache::thrift::concurrency::PlatformThreadFactory;
+using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::Thread;
using apache::thrift::concurrency::Mutex;
@@ -166,7 +166,7 @@
bool threadPoolProcessing_;
// Factory to create the IO threads
- std::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
+ std::shared_ptr<ThreadFactory> ioThreadFactory_;
// Vector of IOThread objects that will handle our IO
std::vector<std::shared_ptr<TNonblockingIOThread> > ioThreads_;
@@ -386,9 +386,7 @@
/**
* Sets the number of IO threads used by this server. Can only be used before
- * the call to serve() and has no effect afterwards. We always use a
- * PosixThreadFactory for the IO worker threads, because they must joinable
- * for clean shutdown.
+ * the call to serve() and has no effect afterwards.
*/
void setNumIOThreads(size_t numThreads) {
numIOThreads_ = numThreads;
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp
index 2264df7..ed2d80d 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp
@@ -19,7 +19,7 @@
#include <string>
#include <memory>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/server/TThreadedServer.h>
namespace apache {
diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h
index c5ccd03..9fc9d11 100644
--- a/lib/cpp/src/thrift/server/TThreadedServer.h
+++ b/lib/cpp/src/thrift/server/TThreadedServer.h
@@ -22,7 +22,7 @@
#include <map>
#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Thread.h>
#include <thrift/server/TServerFramework.h>
@@ -44,7 +44,7 @@
const std::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
const std::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= std::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory(false)));
+ new apache::thrift::concurrency::ThreadFactory(false)));
TThreadedServer(
const std::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -53,7 +53,7 @@
const std::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory,
const std::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= std::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory(false)));
+ new apache::thrift::concurrency::ThreadFactory(false)));
TThreadedServer(
const std::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory,
@@ -64,7 +64,7 @@
const std::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
const std::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= std::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory(false)));
+ new apache::thrift::concurrency::ThreadFactory(false)));
TThreadedServer(
const std::shared_ptr<apache::thrift::TProcessor>& processor,
@@ -75,7 +75,7 @@
const std::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory,
const std::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory
= std::shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory(false)));
+ new apache::thrift::concurrency::ThreadFactory(false)));
virtual ~TThreadedServer();
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h
index 6cc7bd2..ece271a 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.h
+++ b/lib/cpp/src/thrift/transport/TFileTransport.h
@@ -30,7 +30,7 @@
#include <thrift/concurrency/Mutex.h>
#include <thrift/concurrency/Monitor.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Thread.h>
namespace apache {
@@ -336,7 +336,7 @@
static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
// writer thread
- apache::thrift::concurrency::PlatformThreadFactory threadFactory_;
+ apache::thrift::concurrency::ThreadFactory threadFactory_;
std::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
// buffers to hold data before it is flushed. Each element of the buffer stores a msg that
diff --git a/lib/cpp/src/thrift/windows/config.h b/lib/cpp/src/thrift/windows/config.h
index a5f4457..14a3f4f 100644
--- a/lib/cpp/src/thrift/windows/config.h
+++ b/lib/cpp/src/thrift/windows/config.h
@@ -28,11 +28,6 @@
#error "This is a Windows header only"
#endif
-// use std::thread in MSVC11 (2012) or newer and in MinGW
-#if (_MSC_VER >= 1700) || defined(__MINGW32__)
-#define USE_STD_THREAD 1
-#endif
-
// Something that defines PRId64 is required to build
#define HAVE_INTTYPES_H 1
diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt
index b30ef17..8a8aada 100644
--- a/lib/cpp/test/CMakeLists.txt
+++ b/lib/cpp/test/CMakeLists.txt
@@ -84,11 +84,6 @@
TServerTransportTest.cpp
)
-if(NOT WITH_STDTHREADS AND NOT MSVC AND NOT MINGW)
- list(APPEND UnitTest_SOURCES concurrency/MutexTest.cpp)
- list(APPEND UnitTest_SOURCES concurrency/RWMutexStarveTest.cpp)
-endif()
-
add_executable(UnitTests ${UnitTest_SOURCES})
target_link_libraries(UnitTests testgencpp ${Boost_LIBRARIES})
LINK_AGAINST_THRIFT_LIBRARY(UnitTests thrift)
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index d645a65..5bb9eb7 100755
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -135,10 +135,6 @@
TServerTransportTest.cpp \
TTransportCheckThrow.h
-UnitTests_SOURCES += \
- concurrency/MutexTest.cpp \
- concurrency/RWMutexStarveTest.cpp
-
UnitTests_LDADD = \
libtestgencpp.la \
$(BOOST_TEST_LDADD) \
diff --git a/lib/cpp/test/TNonblockingSSLServerTest.cpp b/lib/cpp/test/TNonblockingSSLServerTest.cpp
index 330380b..2111de8 100644
--- a/lib/cpp/test/TNonblockingSSLServerTest.cpp
+++ b/lib/cpp/test/TNonblockingSSLServerTest.cpp
@@ -218,12 +218,7 @@
runner->userEventBase = userEventBase_;
std::unique_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory(
- new apache::thrift::concurrency::PlatformThreadFactory(
-#if !USE_STD_THREAD
- concurrency::PlatformThreadFactory::OTHER, concurrency::PlatformThreadFactory::NORMAL,
- 1,
-#endif
- false));
+ new apache::thrift::concurrency::ThreadFactory(false));
thread = threadFactory->newThread(runner);
thread->start();
runner->readyBarrier();
diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp
index f0bb283..f2f5922 100644
--- a/lib/cpp/test/TNonblockingServerTest.cpp
+++ b/lib/cpp/test/TNonblockingServerTest.cpp
@@ -33,7 +33,7 @@
using apache::thrift::concurrency::Guard;
using apache::thrift::concurrency::Monitor;
using apache::thrift::concurrency::Mutex;
-using apache::thrift::concurrency::PlatformThreadFactory;
+using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::Thread;
using apache::thrift::concurrency::ThreadFactory;
@@ -147,12 +147,7 @@
runner->userEventBase = userEventBase_;
shared_ptr<ThreadFactory> threadFactory(
- new PlatformThreadFactory(
-#if !USE_STD_THREAD
- PlatformThreadFactory::OTHER, PlatformThreadFactory::NORMAL,
- 1,
-#endif
- false));
+ new ThreadFactory(false));
thread = threadFactory->newThread(runner);
thread->start();
runner->readyBarrier();
diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp
index 7976c8b..a7680d8 100644
--- a/lib/cpp/test/TServerIntegrationTest.cpp
+++ b/lib/cpp/test/TServerIntegrationTest.cpp
@@ -379,7 +379,7 @@
TServerIntegrationProcessorFactoryTestFixture<TThreadPoolServer>) {
pServer->getThreadManager()->threadFactory(
shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory));
+ new apache::thrift::concurrency::ThreadFactory));
pServer->getThreadManager()->start();
// thread factory has 4 threads as a default
@@ -394,7 +394,7 @@
TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
pServer->getThreadManager()->threadFactory(
shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory));
+ new apache::thrift::concurrency::ThreadFactory));
pServer->getThreadManager()->start();
// thread factory has 4 threads as a default
@@ -409,7 +409,7 @@
TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
pServer->getThreadManager()->threadFactory(
shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory));
+ new apache::thrift::concurrency::ThreadFactory));
pServer->getThreadManager()->start();
pServer->setConcurrentClientLimit(4);
@@ -420,7 +420,7 @@
TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
pServer->getThreadManager()->threadFactory(
shared_ptr<apache::thrift::concurrency::ThreadFactory>(
- new apache::thrift::concurrency::PlatformThreadFactory));
+ new apache::thrift::concurrency::ThreadFactory));
pServer->getThreadManager()->start();
stress(10, boost::posix_time::seconds(3));
diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp
index ce19544..3872071 100644
--- a/lib/cpp/test/TransportTest.cpp
+++ b/lib/cpp/test/TransportTest.cpp
@@ -1031,7 +1031,7 @@
apache::thrift::transport::TWinsockSingleton::create();
#endif
- apache::thrift::concurrency::PlatformThreadFactory factory;
+ apache::thrift::concurrency::ThreadFactory factory;
factory.setDetached(false);
alarmThread_ = factory.newThread(
diff --git a/lib/cpp/test/concurrency/MutexTest.cpp b/lib/cpp/test/concurrency/MutexTest.cpp
deleted file mode 100644
index 781ec1a..0000000
--- a/lib/cpp/test/concurrency/MutexTest.cpp
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// This is linked into the UnitTests test executable
-
-#include <boost/test/unit_test.hpp>
-
-#include "thrift/concurrency/Exception.h"
-#include "thrift/concurrency/Mutex.h"
-
-using boost::unit_test::test_suite;
-using boost::unit_test::framework::master_test_suite;
-
-using namespace apache::thrift::concurrency;
-
-struct LFAT
-{
- LFAT()
- : uut(Mutex::ERRORCHECK_INITIALIZER)
- {
- BOOST_CHECK_EQUAL(0, pthread_mutex_init(&mx, 0));
- BOOST_CHECK_EQUAL(0, pthread_cond_init(&cv, 0));
- }
-
- Mutex uut;
- pthread_mutex_t mx;
- pthread_cond_t cv;
-};
-
-// Helper for testing mutex behavior when locked by another thread
-void * lockFromAnotherThread(void *ptr)
-{
- struct LFAT *lfat = (LFAT *)ptr;
- BOOST_CHECK_EQUAL (0, pthread_mutex_lock(&lfat->mx)); // synchronize with testing thread
- BOOST_CHECK_NO_THROW( lfat->uut.lock());
- BOOST_CHECK_EQUAL (0, pthread_cond_signal(&lfat->cv)); // tell testing thread we have locked the mutex
- BOOST_CHECK_EQUAL (0, pthread_cond_wait(&lfat->cv, &lfat->mx)); // wait for testing thread to signal condition variable telling us to unlock
- BOOST_CHECK_NO_THROW( lfat->uut.unlock());
- return ptr; // testing thread should join to ensure completeness
-}
-
-BOOST_AUTO_TEST_SUITE(MutexTest)
-
-BOOST_AUTO_TEST_CASE(happy_path)
-{
- Mutex uut(Mutex::ERRORCHECK_INITIALIZER); // needed to test unlocking twice without undefined behavior
-
- BOOST_CHECK_NO_THROW( uut.lock());
- BOOST_CHECK_THROW ( uut.lock(), SystemResourceException); // EDEADLK (this thread owns it)
- BOOST_CHECK_NO_THROW( uut.unlock());
-}
-
-BOOST_AUTO_TEST_CASE(recursive_happy_path)
-{
- Mutex uut(Mutex::RECURSIVE_INITIALIZER);
-
- BOOST_CHECK_NO_THROW( uut.lock());
- BOOST_CHECK_NO_THROW( uut.lock());
- BOOST_CHECK_NO_THROW( uut.unlock());
- BOOST_CHECK_NO_THROW( uut.lock());
- BOOST_CHECK_NO_THROW( uut.lock());
- BOOST_CHECK_NO_THROW( uut.unlock());
- BOOST_CHECK_NO_THROW( uut.lock());
- BOOST_CHECK_NO_THROW( uut.unlock());
- BOOST_CHECK_NO_THROW( uut.unlock());
- BOOST_CHECK_NO_THROW( uut.unlock());
-}
-
-BOOST_AUTO_TEST_CASE(trylock)
-{
- Mutex uut(Mutex::ADAPTIVE_INITIALIZER); // just using another initializer for coverage
-
- BOOST_CHECK ( uut.trylock());
- BOOST_CHECK (!uut.trylock());
- BOOST_CHECK_NO_THROW( uut.unlock());
-}
-
-BOOST_AUTO_TEST_CASE(timedlock)
-{
- pthread_t th;
- struct LFAT lfat;
-
- BOOST_CHECK ( lfat.uut.timedlock(100));
- BOOST_CHECK_THROW ( lfat.uut.timedlock(100),
- SystemResourceException); // EDEADLK (current thread owns mutex - logic error)
- BOOST_CHECK_NO_THROW( lfat.uut.unlock());
-
- BOOST_CHECK_EQUAL (0, pthread_mutex_lock(&lfat.mx)); // synchronize with helper thread
- BOOST_CHECK_EQUAL (0, pthread_create(&th, NULL,
- lockFromAnotherThread, &lfat)); // create helper thread
- BOOST_CHECK_EQUAL (0, pthread_cond_wait(&lfat.cv, &lfat.mx)); // wait for helper thread to lock mutex
-
- BOOST_CHECK (!lfat.uut.timedlock(100)); // false: another thread owns the lock
-
- BOOST_CHECK_EQUAL (0, pthread_cond_signal(&lfat.cv)); // tell helper thread we are done
- BOOST_CHECK_EQUAL (0, pthread_mutex_unlock(&lfat.mx)); // let helper thread clean up
- BOOST_CHECK_EQUAL (0, pthread_join(th, 0)); // wait for testing thread to unlock and be done
-}
-
-BOOST_AUTO_TEST_CASE(underlying)
-{
- Mutex uut;
-
- BOOST_CHECK ( uut.getUnderlyingImpl());
-}
-
-BOOST_AUTO_TEST_SUITE_END()
diff --git a/lib/cpp/test/concurrency/RWMutexStarveTest.cpp b/lib/cpp/test/concurrency/RWMutexStarveTest.cpp
deleted file mode 100644
index 985a230..0000000
--- a/lib/cpp/test/concurrency/RWMutexStarveTest.cpp
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// This is linked into the UnitTests test executable
-
-#include <boost/test/unit_test.hpp>
-
-#include "thrift/concurrency/Mutex.h"
-#include "thrift/concurrency/PosixThreadFactory.h"
-#include <memory>
-
-using std::shared_ptr;
-using boost::unit_test::test_suite;
-using boost::unit_test::framework::master_test_suite;
-
-using namespace apache::thrift::concurrency;
-
-class Locker : public Runnable {
-protected:
- Locker(shared_ptr<ReadWriteMutex> rwlock, bool writer)
- : rwlock_(rwlock), writer_(writer), started_(false), gotLock_(false), signaled_(false) {}
-
-public:
- virtual void run() {
- started_ = true;
- if (writer_) {
- rwlock_->acquireWrite();
- } else {
- rwlock_->acquireRead();
- }
- gotLock_ = true;
- while (!signaled_) {
- usleep(5000);
- }
- rwlock_->release();
- }
-
- bool started() const { return started_; }
- bool gotLock() const { return gotLock_; }
- void signal() { signaled_ = true; }
-
-protected:
- shared_ptr<ReadWriteMutex> rwlock_;
- bool writer_;
- volatile bool started_;
- volatile bool gotLock_;
- volatile bool signaled_;
-};
-
-class Reader : public Locker {
-public:
- Reader(shared_ptr<ReadWriteMutex> rwlock) : Locker(rwlock, false) {}
-};
-
-class Writer : public Locker {
-public:
- Writer(shared_ptr<ReadWriteMutex> rwlock) : Locker(rwlock, true) {}
-};
-
-void test_starve(PosixThreadFactory::POLICY policy) {
- // the man pages for pthread_wrlock_rdlock suggest that any OS guarantee about
- // writer starvation may be influenced by the scheduling policy, so let's try
- // all 3 policies to see if any of them work.
- PosixThreadFactory factory(policy);
- factory.setDetached(false);
-
- shared_ptr<ReadWriteMutex> rwlock(new NoStarveReadWriteMutex());
-
- shared_ptr<Reader> reader1(new Reader(rwlock));
- shared_ptr<Reader> reader2(new Reader(rwlock));
- shared_ptr<Writer> writer(new Writer(rwlock));
-
- shared_ptr<Thread> treader1 = factory.newThread(reader1);
- shared_ptr<Thread> treader2 = factory.newThread(reader2);
- shared_ptr<Thread> twriter = factory.newThread(writer);
-
- // launch a reader and make sure he has the lock
- treader1->start();
- while (!reader1->gotLock()) {
- usleep(2000);
- }
-
- // launch a writer and make sure he's blocked on the lock
- twriter->start();
- while (!writer->started()) {
- usleep(2000);
- }
- // tricky part... we can never be 100% sure that the writer is actually
- // blocked on the lock, but we can pretty reasonably sure because we know
- // he just executed the line immediately before getting the lock, and
- // we'll wait a full second for him to get on it.
- sleep(1);
-
- // launch a second reader... if the RWMutex guarantees that writers won't
- // starve, this reader should not be able to acquire the lock until the writer
- // has acquired and released it.
- treader2->start();
- while (!reader2->started()) {
- usleep(2000);
- }
- // again... can't be 100% sure the reader is waiting on (or has) the lock
- // but we can be close.
- sleep(1);
-
- // tell reader 1 to let go of the lock
- reader1->signal();
-
- // wait for someone to get the lock
- while (!reader2->gotLock() && !writer->gotLock()) {
- usleep(2000);
- }
-
- // the test succeeded if the WRITER got the lock.
- bool success = writer->gotLock();
-
- // tell everyone we're done and wait for them to finish
- reader2->signal();
- writer->signal();
- treader1->join();
- treader2->join();
- twriter->join();
-
- // make sure it worked.
- BOOST_CHECK_MESSAGE(success, "writer is starving");
-}
-
-BOOST_AUTO_TEST_SUITE(RWMutexStarveTest)
-
-BOOST_AUTO_TEST_CASE(test_starve_other) {
- test_starve(PosixThreadFactory::OTHER);
-}
-
-BOOST_AUTO_TEST_CASE(test_starve_rr) {
- test_starve(PosixThreadFactory::ROUND_ROBIN);
-}
-
-BOOST_AUTO_TEST_CASE(test_starve_fifo) {
- test_starve(PosixThreadFactory::FIFO);
-}
-
-BOOST_AUTO_TEST_SUITE_END()
diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h
index ba98502..8ab754c 100644
--- a/lib/cpp/test/concurrency/ThreadFactoryTests.h
+++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h
@@ -19,7 +19,7 @@
#include <thrift/thrift-config.h>
#include <thrift/concurrency/Thread.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/Mutex.h>
#include <thrift/concurrency/Util.h>
@@ -66,7 +66,7 @@
bool reapNThreads(int loop = 1, int count = 10) {
- PlatformThreadFactory threadFactory = PlatformThreadFactory();
+ ThreadFactory threadFactory = ThreadFactory();
shared_ptr<Monitor> monitor(new Monitor);
for (int lix = 0; lix < loop; lix++) {
@@ -159,7 +159,7 @@
shared_ptr<SynchStartTask> task
= shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
- PlatformThreadFactory threadFactory = PlatformThreadFactory();
+ ThreadFactory threadFactory = ThreadFactory();
shared_ptr<Thread> thread = threadFactory.newThread(task);
@@ -265,7 +265,7 @@
Monitor& _mon;
};
- void foo(PlatformThreadFactory* tf) { (void)tf; }
+ void foo(ThreadFactory* tf) { (void)tf; }
bool floodNTest(size_t loop = 1, size_t count = 100000) {
@@ -274,7 +274,7 @@
for (size_t lix = 0; lix < loop; lix++) {
- PlatformThreadFactory threadFactory = PlatformThreadFactory();
+ ThreadFactory threadFactory = ThreadFactory();
threadFactory.setDetached(true);
for (size_t tix = 0; tix < count; tix++) {
diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h
index d6c092d..b3a319a 100644
--- a/lib/cpp/test/concurrency/ThreadManagerTests.h
+++ b/lib/cpp/test/concurrency/ThreadManagerTests.h
@@ -19,7 +19,7 @@
#include <thrift/thrift-config.h>
#include <thrift/concurrency/ThreadManager.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/Util.h>
@@ -108,12 +108,9 @@
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
- shared_ptr<PlatformThreadFactory> threadFactory
- = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory(false));
+ shared_ptr<ThreadFactory> threadFactory
+ = shared_ptr<ThreadFactory>(new ThreadFactory(false));
-#if !USE_STD_THREAD
- threadFactory->setPriority(PosixThreadFactory::HIGHEST);
-#endif
threadManager->threadFactory(threadFactory);
threadManager->start();
@@ -257,12 +254,9 @@
shared_ptr<ThreadManager> threadManager
= ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
- shared_ptr<PlatformThreadFactory> threadFactory
- = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
+ shared_ptr<ThreadFactory> threadFactory
+ = shared_ptr<ThreadFactory>(new ThreadFactory());
-#if !USE_STD_THREAD
- threadFactory->setPriority(PosixThreadFactory::HIGHEST);
-#endif
threadManager->threadFactory(threadFactory);
threadManager->start();
@@ -401,54 +395,15 @@
return false;
}
-#if !USE_STD_THREAD
- // test once with a detached thread factory and once with a joinable thread factory
-
- shared_ptr<PosixThreadFactory> threadFactory
- = shared_ptr<PosixThreadFactory>(new PosixThreadFactory(false));
-
- std::cout << "\t\t\tapiTest with joinable thread factory" << std::endl;
- if (!apiTestWithThreadFactory(threadFactory)) {
- return false;
- }
-
- threadFactory.reset(new PosixThreadFactory(true));
- std::cout << "\t\t\tapiTest with detached thread factory" << std::endl;
- return apiTestWithThreadFactory(threadFactory);
-#else
- return apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
-#endif
+ return apiTestWithThreadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
}
- bool apiTestWithThreadFactory(shared_ptr<PlatformThreadFactory> threadFactory)
+ bool apiTestWithThreadFactory(shared_ptr<ThreadFactory> threadFactory)
{
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
threadManager->threadFactory(threadFactory);
-#if !USE_STD_THREAD
- threadFactory->setPriority(PosixThreadFactory::HIGHEST);
-
- // verify we cannot change the thread factory to one with the opposite detached setting
- shared_ptr<PlatformThreadFactory> threadFactory2
- = shared_ptr<PosixThreadFactory>(new PlatformThreadFactory(
- PosixThreadFactory::ROUND_ROBIN,
- PosixThreadFactory::NORMAL,
- 1,
- !threadFactory->isDetached()));
- try {
- threadManager->threadFactory(threadFactory2);
- // if the call succeeded we changed the thread factory to one that had the opposite setting for "isDetached()".
- // this is bad, because the thread manager checks with the thread factory to see if it should join threads
- // as they are leaving - so the detached status of new threads cannot change while there are existing threads.
- std::cerr << "\t\t\tShould not be able to change thread factory detached disposition" << std::endl;
- return false;
- }
- catch (InvalidArgumentException& ex) {
- /* expected */
- }
-#endif
-
std::cout << "\t\t\t\tstarting.. " << std::endl;
threadManager->start();
diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h
index 1c52c47..c15b14b 100644
--- a/lib/cpp/test/concurrency/TimerManagerTests.h
+++ b/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -18,7 +18,7 @@
*/
#include <thrift/concurrency/TimerManager.h>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/Util.h>
@@ -80,7 +80,7 @@
{
TimerManager timerManager;
- timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
timerManager.start();
if (timerManager.state() != TimerManager::STARTED) {
std::cerr << "timerManager is not in the STARTED state, but should be" << std::endl;
@@ -125,7 +125,7 @@
*/
bool test01(int64_t timeout = 1000LL) {
TimerManager timerManager;
- timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
timerManager.start();
assert(timerManager.state() == TimerManager::STARTED);
@@ -158,7 +158,7 @@
*/
bool test02(int64_t timeout = 1000LL) {
TimerManager timerManager;
- timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
timerManager.start();
assert(timerManager.state() == TimerManager::STARTED);
@@ -191,7 +191,7 @@
*/
bool test03(int64_t timeout = 1000LL) {
TimerManager timerManager;
- timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
timerManager.start();
assert(timerManager.state() == TimerManager::STARTED);
@@ -228,7 +228,7 @@
*/
bool test04(int64_t timeout = 1000LL) {
TimerManager timerManager;
- timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
timerManager.start();
assert(timerManager.state() == TimerManager::STARTED);
diff --git a/lib/cpp/test/processor/ProcessorTest.cpp b/lib/cpp/test/processor/ProcessorTest.cpp
index 36ce013..9483a0e 100644
--- a/lib/cpp/test/processor/ProcessorTest.cpp
+++ b/lib/cpp/test/processor/ProcessorTest.cpp
@@ -25,7 +25,7 @@
#include <boost/test/unit_test.hpp>
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/Monitor.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TThreadedServer.h>
@@ -94,7 +94,7 @@
const std::shared_ptr<TProtocolFactory>& protocolFactory) {
std::shared_ptr<TServerSocket> socket(new TServerSocket(port));
- std::shared_ptr<PlatformThreadFactory> threadFactory(new PlatformThreadFactory);
+ std::shared_ptr<ThreadFactory> threadFactory(new ThreadFactory);
std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(8);
threadManager->threadFactory(threadFactory);
threadManager->start();
@@ -123,7 +123,7 @@
}
std::shared_ptr<TNonblockingServerSocket> socket(new TNonblockingServerSocket(port));
- std::shared_ptr<PlatformThreadFactory> threadFactory(new PlatformThreadFactory);
+ std::shared_ptr<ThreadFactory> threadFactory(new ThreadFactory);
std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(8);
threadManager->threadFactory(threadFactory);
threadManager->start();
diff --git a/lib/cpp/test/processor/ServerThread.cpp b/lib/cpp/test/processor/ServerThread.cpp
index 4d1ec4c..b050500 100644
--- a/lib/cpp/test/processor/ServerThread.cpp
+++ b/lib/cpp/test/processor/ServerThread.cpp
@@ -21,7 +21,7 @@
#include "ServerThread.h"
-#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TBufferTransports.h>
@@ -38,7 +38,7 @@
helper_.reset(new Helper(this));
// Start the other thread
- concurrency::PlatformThreadFactory threadFactory;
+ concurrency::ThreadFactory threadFactory;
threadFactory.setDetached(false);
thread_ = threadFactory.newThread(helper_);