THRIFT-1461 Recent TNonblockingServer changes broke --enable-boostthreads=yes, Windows
Patch: Alexandre Parenteau
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1214547 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PlatformThreadFactory.h b/lib/cpp/src/concurrency/PlatformThreadFactory.h
index 9f053a0..04fdc5b 100644
--- a/lib/cpp/src/concurrency/PlatformThreadFactory.h
+++ b/lib/cpp/src/concurrency/PlatformThreadFactory.h
@@ -30,7 +30,6 @@
#ifndef USE_BOOST_THREAD
typedef PosixThreadFactory PlatformThreadFactory;
-#include <concurrency/PosixThreadFactory.h>
#else
typedef BoostThreadFactory PlatformThreadFactory;
#endif
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 6924aa6..2019353 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -17,6 +17,9 @@
* under the License.
*/
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
#include "PosixThreadFactory.h"
#include "Exception.h"
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
old mode 100644
new mode 100755
index a9e15af..654778c
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -24,10 +24,18 @@
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
#ifdef USE_BOOST_THREAD
#include <boost/thread.hpp>
#endif
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
+
namespace apache { namespace thrift { namespace concurrency {
class Thread;
@@ -74,8 +82,13 @@
#ifdef USE_BOOST_THREAD
typedef boost::thread::id id_t;
+
+ static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); }
+ static inline id_t get_current() { return boost::this_thread::get_id(); }
#else
typedef uint64_t id_t;
+ static inline bool is_current(pthread_t t) { return pthread_equal(pthread_self(), t); }
+ static inline id_t get_current() { return pthread_self(); }
#endif
virtual ~Thread() {};
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index e56a9b5..b756bf1 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -17,6 +17,10 @@
* under the License.
*/
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
#include "ThreadManager.h"
#include "Exception.h"
#include "Monitor.h"
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 0e44ab2..7d42a2e 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -26,7 +26,7 @@
#include "TNonblockingServer.h"
#include <concurrency/Exception.h>
#include <transport/TSocket.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <iostream>
@@ -53,12 +53,19 @@
#include <errno.h>
#include <assert.h>
+
+#ifdef HAVE_SCHED_H
#include <sched.h>
+#endif
#ifndef AF_LOCAL
#define AF_LOCAL AF_UNIX
#endif
+#ifdef _MSC_VER
+#define PRIu32 "I32u"
+#endif
+
namespace apache { namespace thrift { namespace server {
using namespace apache::thrift::protocol;
@@ -1208,10 +1215,12 @@
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
- ioThreadFactory_.reset(new PosixThreadFactory(
- PosixThreadFactory::OTHER, // scheduler
- PosixThreadFactory::NORMAL, // priority
+ ioThreadFactory_.reset(new PlatformThreadFactory(
+#ifndef USE_BOOST_THREAD
+ PlatformThreadFactory::OTHER, // scheduler
+ PlatformThreadFactory::NORMAL, // priority
1, // stack size (MB)
+#endif
false // detached
));
@@ -1262,7 +1271,7 @@
GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
errno);
}
- listenSocket_ = TNonblockingServer::INVALID_SOCKET;
+ listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
}
for (int i = 0; i < 2; ++i) {
@@ -1271,26 +1280,30 @@
GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
errno);
}
- notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET;
+ notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
}
}
}
void TNonblockingIOThread::createNotificationPipe() {
- if (pipe(notificationPipeFDs_) != 0) {
- GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+ if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
throw TException("can't create notification pipe");
}
- int flags;
- if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
- fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+ if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+ evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
close(notificationPipeFDs_[0]);
close(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
}
for (int i = 0; i < 2; ++i) {
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+ int flags;
if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+#else
+ if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
+#endif
close(notificationPipeFDs_[0]);
close(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() "
@@ -1349,7 +1362,7 @@
}
const int kSize = sizeof(conn);
- if (write(fd, &conn, kSize) != kSize) {
+ if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
return false;
}
@@ -1357,7 +1370,7 @@
}
/* static */
-void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) {
+void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
assert(ioThread);
(void)which;
@@ -1365,7 +1378,7 @@
while (true) {
TNonblockingServer::TConnection* connection = 0;
const int kSize = sizeof(connection);
- int nBytes = read(fd, &connection, kSize);
+ int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
if (nBytes == kSize) {
if (connection == NULL) {
// this is the command to stop our thread, exit the handler!
@@ -1416,12 +1429,13 @@
// mechanism to stop the thread, but happily if we're running in the
// same thread, this means the thread can't be blocking in the event
// loop either.
- if (!pthread_equal(pthread_self(), threadId_)) {
+ if (!Thread::is_current(threadId_)) {
notify(NULL);
}
}
void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+#ifdef HAVE_SCHED_H
// Start out with a standard, low-priority setup for the sched params.
struct sched_param sp;
bzero((void*) &sp, sizeof(sp));
@@ -1446,10 +1460,11 @@
} else {
GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
}
+#endif
}
void TNonblockingIOThread::run() {
- threadId_ = pthread_self();
+ threadId_ = Thread::get_current();
assert(eventBase_ == 0);
eventBase_ = event_base_new();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index e5d3311..5f5ea11 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -27,7 +27,7 @@
#include <concurrency/ThreadManager.h>
#include <climits>
#include <concurrency/Thread.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <concurrency/Mutex.h>
#include <stack>
#include <vector>
@@ -48,7 +48,7 @@
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
-using apache::thrift::concurrency::PosixThreadFactory;
+using apache::thrift::concurrency::PlatformThreadFactory;
using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::Thread;
using apache::thrift::concurrency::Mutex;
@@ -146,7 +146,7 @@
static const int DEFAULT_IO_THREADS = 1;
/// File descriptor of an invalid socket
- static const int INVALID_SOCKET = -1;
+ static const int INVALID_SOCKET_VALUE = -1;
/// # of IO threads this server will use
size_t numIOThreads_;
@@ -167,7 +167,7 @@
bool threadPoolProcessing_;
// Factory to create the IO threads
- boost::shared_ptr<PosixThreadFactory> ioThreadFactory_;
+ boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
// Vector of IOThread objects that will handle our IO
std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
@@ -804,13 +804,13 @@
// Returns the thread id associated with this object. This should
// only be called after the thread has been started.
- pthread_t getThreadId() const { return threadId_; }
+ Thread::id_t getThreadId() const { return threadId_; }
// Returns the send-fd for task complete notifications.
- int getNotificationSendFD() const { return notificationPipeFDs_[1]; }
+ evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
// Returns the read-fd for task complete notifications.
- int getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
+ evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
// Returns the actual thread object associated with this IO thread.
boost::shared_ptr<Thread> getThread() const { return thread_; }
@@ -839,7 +839,7 @@
*
* @param fd the descriptor the event occurred on.
*/
- static void notifyHandler(int fd, short which, void* v);
+ static void notifyHandler(evutil_socket_t fd, short which, void* v);
/**
* C-callable event handler for listener events. Provides a callback
@@ -876,7 +876,7 @@
const int number_;
/// The actual physical thread id.
- pthread_t threadId_;
+ Thread::id_t threadId_;
/// If listenSocket_ >= 0, adds an event on the event_base to accept conns
int listenSocket_;
@@ -894,7 +894,7 @@
struct event notificationEvent_;
/// File descriptors for pipe used for task completion notification.
- int notificationPipeFDs_[2];
+ evutil_socket_t notificationPipeFDs_[2];
/// Actual IO Thread
boost::shared_ptr<Thread> thread_;
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index c16e32f..ec30d76 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -17,6 +17,10 @@
* under the License.
*/
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
#include "server/TThreadPoolServer.h"
#include "transport/TTransportException.h"
#include "concurrency/Thread.h"
diff --git a/lib/cpp/src/windows/config.h b/lib/cpp/src/windows/config.h
index 3af158a..95a5676 100644
--- a/lib/cpp/src/windows/config.h
+++ b/lib/cpp/src/windows/config.h
@@ -30,6 +30,8 @@
#pragma warning(disable: 4996) // Depreciated posix name.
#pragma warning(disable: 4250) // Inherits via dominance.
+#pragma warning(disable: 4244) // conversion from '...' to '...', possible loss of data.
+#pragma warning(disable: 4267) // conversion from '...' to '...', possible loss of data.
#define VERSION "0.9.0-dev"
#define HAVE_GETTIMEOFDAY 1
@@ -105,9 +107,9 @@
}
#endif // WINVER
-inline void close(SOCKET socket)
+inline int close(SOCKET socket)
{
- ::closesocket(socket);
+ return ::closesocket(socket);
}
#endif // _THRIFT_WINDOWS_CONFIG_H_
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 1626c6e..bf41935 100644
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -65,8 +65,12 @@
UnitTests_SOURCES = \
UnitTestMain.cpp \
TMemoryBufferTest.cpp \
- TBufferBaseTest.cpp \
+ TBufferBaseTest.cpp
+
+if !WITH_BOOSTTHREADS
+UnitTests_SOURCES += \
RWMutexStarveTest.cpp
+endif
UnitTests_LDADD = \
libtestgencpp.la \