THRIFT-1361 Optional replacement of pthread by boost::thread
Patch: alexandre parenteau

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1178176 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index d5bc489..593ef9e 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -41,14 +41,12 @@
 
 AM_CXXFLAGS = -Wall
 AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(srcdir)/src
+AM_LDFLAGS = $(BOOST_LDFLAGS)
 
 # Define the source files for the module
 
 libthrift_la_SOURCES = src/Thrift.cpp \
                        src/TApplicationException.cpp \
-                       src/concurrency/Mutex.cpp \
-                       src/concurrency/Monitor.cpp \
-                       src/concurrency/PosixThreadFactory.cpp \
                        src/concurrency/ThreadManager.cpp \
                        src/concurrency/TimerManager.cpp \
                        src/concurrency/Util.cpp \
@@ -77,6 +75,17 @@
                        src/async/TAsyncChannel.cpp \
                        src/processor/PeekProcessor.cpp
 
+if WITH_BOOSTTHREADS
+libthrift_la_SOURCES += src/concurrency/BoostThreadFactory.cpp \
+                        src/concurrency/BoostMonitor.cpp \
+                        src/concurrency/BoostMutex.cpp
+else
+libthrift_la_SOURCES += src/concurrency/Mutex.cpp \
+                        src/concurrency/Monitor.cpp \
+                        src/concurrency/PosixThreadFactory.cpp
+endif
+
+
 libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp \
                          src/async/TAsyncProtocolProcessor.cpp \
                          src/async/TEvhttpServer.cpp \
@@ -91,6 +100,10 @@
 libthriftnb_la_CXXFLAGS = $(AM_CXXFLAGS)
 libthriftz_la_CXXFLAGS  = $(AM_CXXFLAGS)
 
+if WITH_BOOSTTHREADS
+libthrift_la_LIBADD = -lboost_thread
+endif
+
 include_thriftdir = $(includedir)/thrift
 include_thrift_HEADERS = \
                          $(top_builddir)/config.h \
diff --git a/lib/cpp/src/concurrency/BoostMonitor.cpp b/lib/cpp/src/concurrency/BoostMonitor.cpp
new file mode 100644
index 0000000..7a9b589
--- /dev/null
+++ b/lib/cpp/src/concurrency/BoostMonitor.cpp
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include "Monitor.h"
+#include "Exception.h"
+#include "Util.h"
+
+#include <assert.h>
+#include <errno.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/interprocess/sync/interprocess_mutex.hpp>
+#include <boost/interprocess/sync/interprocess_condition.hpp>
+#include <boost/interprocess/sync/scoped_lock.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using namespace boost::interprocess;
+
+/**
+ * Monitor implementation using the boost interprocess library
+ *
+ * @version $Id:$
+ */
+class Monitor::Impl : public interprocess_condition {
+
+ public:
+
+  Impl()
+     : ownedMutex_(new Mutex()),
+       mutex_(NULL) {
+    init(ownedMutex_.get());
+  }
+
+  Impl(Mutex* mutex)
+     : mutex_(NULL) {
+    init(mutex);
+  }
+
+  Impl(Monitor* monitor)
+     : mutex_(NULL) {
+    init(&(monitor->mutex()));
+  }
+
+  Mutex& mutex() { return *mutex_; }
+  void lock() { mutex().lock(); }
+  void unlock() { mutex().unlock(); }
+
+  /**
+   * Exception-throwing version of waitForTimeRelative(), called simply
+   * wait(int64) for historical reasons.  Timeout is in milliseconds.
+   *
+   * If the condition occurs,  this function returns cleanly; on timeout or
+   * error an exception is thrown.
+   */
+  void wait(int64_t timeout_ms) {
+    int result = waitForTimeRelative(timeout_ms);
+    if (result == ETIMEDOUT) {
+      throw TimedOutException();
+    } else if (result != 0) {
+      throw TException(
+        "Monitor::wait() failed");
+    }
+  }
+
+  /**
+   * Waits until the specified timeout in milliseconds for the condition to
+   * occur, or waits forever if timeout_ms == 0.
+   *
+   * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
+   */
+  int waitForTimeRelative(int64_t timeout_ms) {
+    if (timeout_ms == 0LL) {
+      return waitForever();
+    }
+
+    assert(mutex_);
+    interprocess_mutex* mutexImpl =
+      reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
+    assert(mutexImpl);
+
+	scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
+	int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : ETIMEDOUT;
+	lock.release();
+	return res;
+  }
+
+  /**
+   * Waits until the absolute time specified using struct timespec.
+   * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code.
+   */
+  int waitForTime(const timespec* abstime) {
+    assert(mutex_);
+    interprocess_mutex* mutexImpl =
+      reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
+    assert(mutexImpl);
+
+    struct timespec currenttime;
+    Util::toTimespec(currenttime, Util::currentTime());
+
+	long tv_sec = abstime->tv_sec - currenttime.tv_sec;
+	long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec;
+	if(tv_sec < 0)
+		tv_sec = 0;
+	if(tv_nsec < 0)
+		tv_nsec = 0;
+
+	scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
+	int res = timed_wait(lock, boost::get_system_time() +
+		boost::posix_time::seconds(tv_sec) +
+		boost::posix_time::microseconds(tv_nsec / 1000)
+		) ? 0 : ETIMEDOUT;
+	lock.release();
+	return res;
+  }
+
+  /**
+   * Waits forever until the condition occurs.
+   * Returns 0 if condition occurs, or an error code otherwise.
+   */
+  int waitForever() {
+    assert(mutex_);
+    interprocess_mutex* mutexImpl =
+      reinterpret_cast<interprocess_mutex*>(mutex_->getUnderlyingImpl());
+    assert(mutexImpl);
+
+	scoped_lock<interprocess_mutex> lock(*mutexImpl, accept_ownership_type());
+	((interprocess_condition*)this)->wait(lock);
+	lock.release();
+    return 0;
+  }
+
+
+  void notify() {
+	  notify_one();
+  }
+
+  void notifyAll() {
+	  notify_all();
+  }
+
+ private:
+
+  void init(Mutex* mutex) {
+    mutex_ = mutex;
+  }
+
+  boost::scoped_ptr<Mutex> ownedMutex_;
+  Mutex* mutex_;
+};
+
+Monitor::Monitor() : impl_(new Monitor::Impl()) {}
+Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
+Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
+
+Monitor::~Monitor() { delete impl_; }
+
+Mutex& Monitor::mutex() const { return const_cast<Monitor::Impl*>(impl_)->mutex(); }
+
+void Monitor::lock() const { const_cast<Monitor::Impl*>(impl_)->lock(); }
+
+void Monitor::unlock() const { const_cast<Monitor::Impl*>(impl_)->unlock(); }
+
+void Monitor::wait(int64_t timeout) const { const_cast<Monitor::Impl*>(impl_)->wait(timeout); }
+
+int Monitor::waitForTime(const timespec* abstime) const {
+  return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
+}
+
+int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
+  return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
+}
+
+int Monitor::waitForever() const {
+  return const_cast<Monitor::Impl*>(impl_)->waitForever();
+}
+
+void Monitor::notify() const { const_cast<Monitor::Impl*>(impl_)->notify(); }
+
+void Monitor::notifyAll() const { const_cast<Monitor::Impl*>(impl_)->notifyAll(); }
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/BoostMutex.cpp b/lib/cpp/src/concurrency/BoostMutex.cpp
new file mode 100644
index 0000000..2277f61
--- /dev/null
+++ b/lib/cpp/src/concurrency/BoostMutex.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include "Mutex.h"
+#include "Util.h"
+
+#include <cassert>
+#include <boost/thread.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/interprocess/sync/interprocess_mutex.hpp>
+
+using namespace boost::interprocess;
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Implementation of Mutex class using boost interprocess mutex
+ *
+ * @version $Id:$
+ */
+class Mutex::impl : public interprocess_mutex {
+};
+
+Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {}
+
+void* Mutex::getUnderlyingImpl() const { return impl_.get(); }
+
+void Mutex::lock() const { impl_->lock(); }
+
+bool Mutex::trylock() const { return impl_->try_lock(); }
+
+bool Mutex::timedlock(int64_t ms) const { return impl_->timed_lock(boost::get_system_time()+boost::posix_time::milliseconds(ms)); }
+
+void Mutex::unlock() const { impl_->unlock(); }
+
+void Mutex::DEFAULT_INITIALIZER(void* arg) {
+}
+
+}}} // apache::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/BoostThreadFactory.cpp b/lib/cpp/src/concurrency/BoostThreadFactory.cpp
new file mode 100644
index 0000000..5551528
--- /dev/null
+++ b/lib/cpp/src/concurrency/BoostThreadFactory.cpp
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include "BoostThreadFactory.h"
+#include "Exception.h"
+
+#include <cassert>
+
+#include <boost/weak_ptr.hpp>
+#include <boost/thread.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+using boost::weak_ptr;
+
+/**
+ * The boost thread class.
+ *
+ * @version $Id:$
+ */
+class BoostThread: public Thread {
+ public:
+
+  enum STATE {
+    uninitialized,
+    starting,
+    started,
+    stopping,
+    stopped
+  };
+
+  static void* threadMain(void* arg);
+
+ private:
+  std::auto_ptr<boost::thread> thread_;
+  STATE state_;
+  weak_ptr<BoostThread> self_;
+  bool detached_;
+
+ public:
+
+  BoostThread(bool detached, shared_ptr<Runnable> runnable) :
+	 state_(uninitialized),
+	 detached_(detached) {
+    this->Thread::runnable(runnable);
+  }
+
+  ~BoostThread() {
+    if(!detached_) {
+      try {
+        join();
+      } catch(...) {
+        // We're really hosed.
+      }
+    }
+  }
+
+  void start() {
+    if (state_ != uninitialized) {
+      return;
+    }
+	
+	// Create reference
+    shared_ptr<BoostThread>* selfRef = new shared_ptr<BoostThread>();
+    *selfRef = self_.lock();
+
+	thread_ = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(threadMain, (void*)selfRef)));
+
+	if(detached_)
+		thread_->detach();
+
+	state_ = starting;
+  }
+
+  void join() {
+    if (!detached_ && state_ != uninitialized) {
+	  thread_->join();
+    }
+  }
+
+  Thread::id_t getId() {
+	  return thread_.get() ? thread_->get_id() : boost::thread::id();
+  }
+
+  shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
+
+  void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
+
+  void weakRef(shared_ptr<BoostThread> self) {
+    assert(self.get() == this);
+    self_ = weak_ptr<BoostThread>(self);
+  }
+};
+
+void* BoostThread::threadMain(void* arg) {
+  shared_ptr<BoostThread> thread = *(shared_ptr<BoostThread>*)arg;
+  delete reinterpret_cast<shared_ptr<BoostThread>*>(arg);
+
+  if (thread == NULL) {
+    return (void*)0;
+  }
+
+  if (thread->state_ != starting) {
+    return (void*)0;
+  }
+
+  thread->state_ = started;
+  thread->runnable()->run();
+
+  if (thread->state_ != stopping && thread->state_ != stopped) {
+    thread->state_ = stopping;
+  }
+  return (void*)0;
+}
+
+/**
+ * POSIX Thread factory implementation
+ */
+class BoostThreadFactory::Impl {
+
+ private:
+  bool detached_;
+
+ public:
+
+  Impl(bool detached) :
+    detached_(detached) {}
+
+  /**
+   * Creates a new POSIX thread to run the runnable object
+   *
+   * @param runnable A runnable object
+   */
+  shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
+    shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(detached_, runnable));
+    result->weakRef(result);
+    runnable->thread(result);
+    return result;
+  }
+
+  bool isDetached() const { return detached_; }
+
+  void setDetached(bool value) { detached_ = value; }
+
+  Thread::id_t getCurrentThreadId() const {
+	  return boost::this_thread::get_id();
+  }
+
+};
+
+BoostThreadFactory::BoostThreadFactory(bool detached) :
+  impl_(new BoostThreadFactory::Impl(detached)) {}
+
+shared_ptr<Thread> BoostThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
+
+bool BoostThreadFactory::isDetached() const { return impl_->isDetached(); }
+
+void BoostThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+
+Thread::id_t BoostThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/BoostThreadFactory.h b/lib/cpp/src/concurrency/BoostThreadFactory.h
new file mode 100644
index 0000000..a466705
--- /dev/null
+++ b/lib/cpp/src/concurrency/BoostThreadFactory.h
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ 1
+
+#include "Thread.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A thread factory to create posix threads
+ *
+ * @version $Id:$
+ */
+class BoostThreadFactory : public ThreadFactory {
+
+ public:
+
+  /**
+   * Boost thread factory.  All threads created by a factory are reference-counted
+   * via boost::shared_ptr and boost::weak_ptr.  The factory guarantees that threads and
+   * the Runnable tasks they host will be properly cleaned up once the last strong reference
+   * to both is given up.
+   *
+   * Threads are created with the specified boost policy, priority, stack-size. A detachable thread is not
+   * joinable.
+   *
+   * By default threads are not joinable.
+   */
+
+  BoostThreadFactory(bool detached=true);
+
+  // From ThreadFactory;
+  boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+
+  // From ThreadFactory;
+  Thread::id_t getCurrentThreadId() const;
+
+  /**
+   * Sets detached mode of threads
+   */
+  virtual void setDetached(bool detached);
+
+  /**
+   * Gets current detached mode
+   */
+  virtual bool isDetached() const;
+
+private:
+  class Impl;
+  boost::shared_ptr<Impl> impl_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
index 4b1c3bf..847aaf6 100644
--- a/lib/cpp/src/concurrency/Mutex.h
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -167,7 +167,7 @@
 
 // A little hack to prevent someone from trying to do "Guard(m);"
 // Such a use is invalid because the temporary Guard object is
-// destoryed at the end of the line, releasing the lock.
+// destroyed at the end of the line, releasing the lock.
 // Sorry for polluting the global namespace, but I think it's worth it.
 #define Guard(m) incorrect_use_of_Guard(m)
 #define RWGuard(m) incorrect_use_of_RWGuard(m)
diff --git a/lib/cpp/src/concurrency/PlatformThreadFactory.h b/lib/cpp/src/concurrency/PlatformThreadFactory.h
new file mode 100644
index 0000000..9f053a0
--- /dev/null
+++ b/lib/cpp/src/concurrency/PlatformThreadFactory.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1
+
+#ifndef USE_BOOST_THREAD
+#  include <concurrency/PosixThreadFactory.h>
+#else
+#  include <concurrency/BoostThreadFactory.h>
+#endif
+
+namespace apache { namespace thrift { namespace concurrency {
+
+#ifndef USE_BOOST_THREAD
+  typedef PosixThreadFactory PlatformThreadFactory;
+#include <concurrency/PosixThreadFactory.h>
+#else
+  typedef BoostThreadFactory PlatformThreadFactory;
+#endif
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index fe5ba123..70204f1 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -195,7 +195,7 @@
   ProfilerRegisterThread();
 #endif
 
-  thread->state_ = starting;
+  thread->state_ = started;
   thread->runnable()->run();
   if (thread->state_ != stopping && thread->state_ != stopped) {
     thread->state_ = stopping;
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index d4282ad..a9e15af 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -24,6 +24,10 @@
 #include <boost/shared_ptr.hpp>
 #include <boost/weak_ptr.hpp>
 
+#ifdef USE_BOOST_THREAD
+#include <boost/thread.hpp>
+#endif
+
 namespace apache { namespace thrift { namespace concurrency {
 
 class Thread;
@@ -68,7 +72,11 @@
 
  public:
 
+#ifdef USE_BOOST_THREAD
+  typedef boost::thread::id id_t;
+#else
   typedef uint64_t id_t;
+#endif
 
   virtual ~Thread() {};
 
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
index 2d7976e..d9066b5 100644
--- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -19,7 +19,7 @@
 
 #include <config.h>
 #include <concurrency/Thread.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
 #include <concurrency/Monitor.h>
 #include <concurrency/Util.h>
 
@@ -60,7 +60,7 @@
    */
   bool helloWorldTest() {
 
-    PosixThreadFactory threadFactory = PosixThreadFactory();
+    PlatformThreadFactory threadFactory = PlatformThreadFactory();
 
     shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
 
@@ -105,7 +105,7 @@
 
   bool reapNThreads(int loop=1, int count=10) {
 
-    PosixThreadFactory threadFactory =  PosixThreadFactory();
+    PlatformThreadFactory threadFactory =  PlatformThreadFactory();
 
     Monitor* monitor = new Monitor();
 
@@ -203,7 +203,7 @@
 
     shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
 
-    PosixThreadFactory threadFactory =  PosixThreadFactory();
+    PlatformThreadFactory threadFactory =  PlatformThreadFactory();
 
     shared_ptr<Thread> thread = threadFactory.newThread(task);
 
@@ -307,7 +307,7 @@
     const size_t _id;
   };
 
-  void foo(PosixThreadFactory *tf) {
+  void foo(PlatformThreadFactory *tf) {
     (void) tf;
   }
 
@@ -317,7 +317,7 @@
 
     for(size_t lix = 0; lix < loop; lix++) {
 
-      PosixThreadFactory threadFactory = PosixThreadFactory();
+      PlatformThreadFactory threadFactory = PlatformThreadFactory();
       threadFactory.setDetached(true);
 
         for(size_t tix = 0; tix < count; tix++) {
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index b6b5c3e..e12201c 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -19,7 +19,7 @@
 
 #include <config.h>
 #include <concurrency/ThreadManager.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
 #include <concurrency/Monitor.h>
 #include <concurrency/Util.h>
 
@@ -110,10 +110,11 @@
 
     shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
 
-    shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+    shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
 
+#ifndef USE_BOOST_THREAD
     threadFactory->setPriority(PosixThreadFactory::HIGHEST);
-
+#endif
     threadManager->threadFactory(threadFactory);
 
     threadManager->start();
@@ -249,10 +250,11 @@
 
       shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
 
-      shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+      shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
 
+#ifndef USE_BOOST_THREAD
       threadFactory->setPriority(PosixThreadFactory::HIGHEST);
-
+#endif
       threadManager->threadFactory(threadFactory);
 
       threadManager->start();
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
index b89074c..41f1674 100644
--- a/lib/cpp/src/concurrency/test/TimerManagerTests.h
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -18,7 +18,7 @@
  */
 
 #include <concurrency/TimerManager.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
 #include <concurrency/Monitor.h>
 #include <concurrency/Util.h>
 
@@ -100,7 +100,7 @@
 
       TimerManager timerManager;
 
-      timerManager.threadFactory(shared_ptr<PosixThreadFactory>(new PosixThreadFactory()));
+      timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
 
       timerManager.start();
 
diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp
index f40135c..6b816a4 100644
--- a/lib/cpp/src/server/TThreadedServer.cpp
+++ b/lib/cpp/src/server/TThreadedServer.cpp
@@ -19,12 +19,14 @@
 
 #include "server/TThreadedServer.h"
 #include "transport/TTransportException.h"
-#include "concurrency/PosixThreadFactory.h"
+#include <concurrency/PlatformThreadFactory.h>
 
 #include <string>
 #include <iostream>
-#include <pthread.h>
+
+#ifdef HAVE_UNISTD_H
 #include <unistd.h>
+#endif
 
 namespace apache { namespace thrift { namespace server {
 
@@ -123,7 +125,7 @@
   stop_ = false;
 
   if (!threadFactory_) {
-    threadFactory_.reset(new PosixThreadFactory);
+    threadFactory_.reset(new PlatformThreadFactory);
   }
 }
 
diff --git a/lib/cpp/src/transport/TFDTransport.cpp b/lib/cpp/src/transport/TFDTransport.cpp
index b1479fa..8a448fa 100644
--- a/lib/cpp/src/transport/TFDTransport.cpp
+++ b/lib/cpp/src/transport/TFDTransport.cpp
@@ -22,7 +22,9 @@
 
 #include <transport/TFDTransport.h>
 
+#ifdef HAVE_UNISTD_H
 #include <unistd.h>
+#endif
 
 using namespace std;
 
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index c6c3155..405c162 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -47,12 +47,17 @@
 #include <sys/stat.h>
 #endif
 
+#ifdef _WIN32
+#include <io.h>
+#endif
+
 namespace apache { namespace thrift { namespace transport {
 
 using boost::scoped_ptr;
 using boost::shared_ptr;
 using namespace std;
 using namespace apache::thrift::protocol;
+using namespace apache::thrift::concurrency;
 
 #ifndef HAVE_CLOCK_GETTIME
 
@@ -102,13 +107,10 @@
   , lastBadChunk_(0)
   , numCorruptedEventsInChunk_(0)
   , readOnly_(readOnly)
+  , notFull_(&mutex_)
+  , notEmpty_(&mutex_)
+  , flushed_(&mutex_)
 {
-  // initialize all the condition vars/mutexes
-  pthread_mutex_init(&mutex_, NULL);
-  pthread_cond_init(&notFull_, NULL);
-  pthread_cond_init(&notEmpty_, NULL);
-  pthread_cond_init(&flushed_, NULL);
-
   openLogFile();
 }
 
@@ -142,16 +144,25 @@
 
 TFileTransport::~TFileTransport() {
   // flush the buffer if a writer thread is active
+#ifdef USE_BOOST_THREAD
+  if(writerThreadId_.get()) {
+#else
   if (writerThreadId_ > 0) {
+#endif
     // set state to closing
     closing_ = true;
 
     // wake up the writer thread
     // Since closing_ is true, it will attempt to flush all data, then exit.
-    pthread_cond_signal(&notEmpty_);
+	notEmpty_.notify();
 
+#ifdef USE_BOOST_THREAD
+    writerThreadId_->join();
+	writerThreadId_.reset();
+#else
     pthread_join(writerThreadId_, NULL);
     writerThreadId_ = 0;
+#endif
   }
 
   if (dequeueBuffer_) {
@@ -191,12 +202,18 @@
     return false;
   }
 
+#ifdef USE_BOOST_THREAD
+  if(!writerThreadId_.get()) {
+    writerThreadId_ = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(startWriterThread, (void *)this)));
+  }
+#else
   if (writerThreadId_ == 0) {
     if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
       T_ERROR("%s", "Could not create writer thread");
       return false;
     }
   }
+#endif
 
   dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
   enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
@@ -242,20 +259,19 @@
   toEnqueue->eventSize_ = eventLen + 4;
 
   // lock mutex
-  pthread_mutex_lock(&mutex_);
+  Guard g(mutex_);
 
   // make sure that enqueue buffer is initialized and writer thread is running
   if (!bufferAndThreadInitialized_) {
     if (!initBufferAndWriteThread()) {
       delete toEnqueue;
-      pthread_mutex_unlock(&mutex_);
       return;
     }
   }
 
   // Can't enqueue while buffer is full
   while (enqueueBuffer_->isFull()) {
-    pthread_cond_wait(&notFull_, &mutex_);
+	  notFull_.wait();
   }
 
   // We shouldn't be trying to enqueue new data while a forced flush is
@@ -266,23 +282,21 @@
   // add to the buffer
   if (!enqueueBuffer_->addEvent(toEnqueue)) {
     delete toEnqueue;
-    pthread_mutex_unlock(&mutex_);
     return;
   }
 
   // signal anybody who's waiting for the buffer to be non-empty
-  pthread_cond_signal(&notEmpty_);
+  notEmpty_.notify();
 
   // this really should be a loop where it makes sure it got flushed
   // because condition variables can get triggered by the os for no reason
   // it is probably a non-factor for the time being
-  pthread_mutex_unlock(&mutex_);
 }
 
 bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
-  pthread_mutex_lock(&mutex_);
-
   bool swap;
+  Guard g(mutex_);
+
   if (!enqueueBuffer_->isEmpty()) {
     swap = true;
   } else if (closing_) {
@@ -292,10 +306,10 @@
   } else {
     if (deadline != NULL) {
       // if we were handed a deadline time struct, do a timed wait
-      pthread_cond_timedwait(&notEmpty_, &mutex_, deadline);
+      notEmpty_.waitForTime(deadline);
     } else {
       // just wait until the buffer gets an item
-      pthread_cond_wait(&notEmpty_, &mutex_);
+      notEmpty_.wait();
     }
 
     // could be empty if we timed out
@@ -308,11 +322,9 @@
     dequeueBuffer_ = temp;
   }
 
-  // unlock the mutex and signal if required
-  pthread_mutex_unlock(&mutex_);
 
   if (swap) {
-    pthread_cond_signal(&notFull_);
+	  notFull_.notify();
   }
 
   return swap;
@@ -340,7 +352,11 @@
       seekToEnd();
       // throw away any partial events
       offset_ += readState_.lastDispatchPtr_;
+#ifndef _WIN32
       ftruncate(fd_, offset_);
+#else
+      _chsize_s(fd_, offset_);
+#endif
       readState_.resetAllValues();
     } catch (...) {
       int errno_copy = errno;
@@ -358,12 +374,18 @@
     // this will only be true when the destructor is being invoked
     if (closing_) {
       if (hasIOError) {
-        pthread_exit(NULL);
+#ifndef USE_BOOST_THREAD
+		  pthread_exit(NULL);
+#else
+		  return;
+#endif
       }
 
       // Try to empty buffers before exit
       if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+#ifndef _WIN32
         fsync(fd_);
+#endif
         if (-1 == ::close(fd_)) {
           int errno_copy = errno;
           GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
@@ -371,8 +393,12 @@
           //fd successfully closed
           fd_ = 0;
         }
+#ifndef USE_BOOST_THREAD
         pthread_exit(NULL);
-      }
+#else
+        return;
+#endif
+	  }
     }
 
     if (swapEventBuffers(&ts_next_flush)) {
@@ -387,7 +413,11 @@
           T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
           usleep(writerThreadIOErrorSleepTime_);
           if (closing_) {
+#ifndef USE_BOOST_THREAD
             pthread_exit(NULL);
+#else
+            return;
+#endif
           }
           if (!fd_) {
             ::close(fd_);
@@ -467,7 +497,8 @@
     // time, it could have changed state in between.  This will result in us
     // making inconsistent decisions.
     bool forced_flush = false;
-    pthread_mutex_lock(&mutex_);
+	{
+    Guard g(mutex_);
     if (forceFlush_) {
       if (!enqueueBuffer_->isEmpty()) {
         // If forceFlush_ is true, we need to flush all available data.
@@ -479,12 +510,11 @@
         // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
         // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
         // and clear forceFlush_ the next time around the loop.)
-        pthread_mutex_unlock(&mutex_);
         continue;
       }
       forced_flush = true;
-    }
-    pthread_mutex_unlock(&mutex_);
+	}
+	}
 
     // determine if we need to perform an fsync
     bool flush = false;
@@ -508,18 +538,19 @@
 
     if (flush) {
       // sync (force flush) file to disk
+#ifndef _WIN32
       fsync(fd_);
+#endif
       unflushed = 0;
       getNextFlushTime(&ts_next_flush);
 
       // notify anybody waiting for flush completion
       if (forced_flush) {
-        pthread_mutex_lock(&mutex_);
+        Guard g(mutex_);
         forceFlush_ = false;
         assert(enqueueBuffer_->isEmpty());
         assert(dequeueBuffer_->isEmpty());
-        pthread_cond_broadcast(&flushed_);
-        pthread_mutex_unlock(&mutex_);
+		flushed_.notifyAll();
       }
     }
   }
@@ -527,22 +558,26 @@
 
 void TFileTransport::flush() {
   // file must be open for writing for any flushing to take place
+#ifdef USE_BOOST_THREAD
+  if (!writerThreadId_.get()) {
+    return;
+  }
+#else
   if (writerThreadId_ <= 0) {
     return;
   }
+#endif
   // wait for flush to take place
-  pthread_mutex_lock(&mutex_);
+  Guard g(mutex_);
 
   // Indicate that we are requesting a flush
   forceFlush_ = true;
   // Wake up the writer thread so it will perform the flush immediately
-  pthread_cond_signal(&notEmpty_);
+  notEmpty_.notify();
 
   while (forceFlush_) {
-    pthread_cond_wait(&flushed_, &mutex_);
+    flushed_.wait();
   }
-
-  pthread_mutex_unlock(&mutex_);
 }
 
 
@@ -892,9 +927,15 @@
 
 // Utility Functions
 void TFileTransport::openLogFile() {
+#ifndef _WIN32
   mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
   int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
   fd_ = ::open(filename_.c_str(), flags, mode);
+#else
+  int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
+  int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
+  fd_ = ::_open(filename_.c_str(), flags, mode);
+#endif
   offset_ = 0;
 
   // make sure open call was successful
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index 2ea8c9a..b0e48d1 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -27,15 +27,26 @@
 #include <string>
 #include <stdio.h>
 
+#ifdef HAVE_PTHREAD_H
 #include <pthread.h>
+#endif
+
+#ifdef USE_BOOST_THREAD
+#include <boost/thread.hpp>
+#endif
 
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 
+#include "concurrency/Mutex.h"
+#include "concurrency/Monitor.h"
+
 namespace apache { namespace thrift { namespace transport {
 
 using apache::thrift::TProcessor;
 using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Monitor;
 
 // Data pertaining to a single event
 typedef struct eventInfo {
@@ -360,7 +371,11 @@
   static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
 
   // writer thread id
-  pthread_t writerThreadId_;
+#ifdef USE_BOOST_THREAD
+	std::auto_ptr<boost::thread> writerThreadId_;
+#else
+	pthread_t writerThreadId_;
+#endif
 
   // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
   // needs to be written to the file.  The buffers are swapped by the writer thread.
@@ -368,15 +383,15 @@
   TFileTransportBuffer *enqueueBuffer_;
 
   // conditions used to block when the buffer is full or empty
-  pthread_cond_t notFull_, notEmpty_;
+  Monitor notFull_, notEmpty_;
   volatile bool closing_;
 
   // To keep track of whether the buffer has been flushed
-  pthread_cond_t flushed_;
+  Monitor flushed_;
   volatile bool forceFlush_;
 
   // Mutex that is grabbed when enqueueing and swapping the read/write buffers
-  pthread_mutex_t mutex_;
+  Mutex mutex_;
 
   // File information
   std::string filename_;
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 2db8f8b..a0cc77a 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -496,6 +496,12 @@
     }
     #endif
 
+#ifdef _WIN32
+    if(errno_copy == WSAECONNRESET) {
+      return 0; // EOF
+    }
+#endif
+
     // Now it's not a try again case, but a real probblez
     GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
 
diff --git a/lib/cpp/src/windows/config.h b/lib/cpp/src/windows/config.h
index 0f9a304..2db2596 100644
--- a/lib/cpp/src/windows/config.h
+++ b/lib/cpp/src/windows/config.h
@@ -32,6 +32,7 @@
 #pragma warning(disable: 4250) // Inherits via dominance.
 
 #define HAVE_GETTIMEOFDAY 1
+#define HAVE_SYS_STAT_H 1
 
 #include "TargetVersion.h"
 #include "GetTimeOfDay.h"
@@ -53,13 +54,25 @@
 #pragma comment(lib, "Ws2_32.lib")
 
 // pthreads
-#include <pthread.h>
+#if 0
+#	include <pthread.h>
+#else
+struct timespec {
+	long tv_sec;
+	long tv_nsec;
+};
+#	define USE_BOOST_THREAD 1
+#	define ctime_r( _clock, _buf ) \
+        ( strcpy( (_buf), ctime( (_clock) ) ),  \
+          (_buf) )
+#endif
 
 typedef ptrdiff_t ssize_t;
 
 // Missing functions.
 #define usleep(ms) Sleep(ms)
 
+#if WINVER <= 0x0502
 #define poll(fds, nfds, timeout) \
     poll_win32(fds, nfds, timeout)
 
@@ -80,6 +93,10 @@
     timeval time_out = {timeout * 0.001, timeout * 1000};
     return select(1, &read_fds, &write_fds, &except_fds, &time_out);
 }
+#else
+#   define poll(fds, nfds, timeout) \
+    WSAPoll(fds, nfds, timeout)
+#endif // WINVER
 
 inline void close(SOCKET socket)
 {