THRIFT-1461 Recent TNonblockingServer changes broke --enable-boostthreads=yes, Windows
Patch: Alexandre Parenteau
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1214547 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/compiler.vcxproj b/compiler/cpp/compiler.vcxproj
index 5f43577..a01f3f2 100644
--- a/compiler/cpp/compiler.vcxproj
+++ b/compiler/cpp/compiler.vcxproj
@@ -5,10 +5,18 @@
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
+ <ProjectConfiguration Include="Debug|x64">
+ <Configuration>Debug</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
+ <ProjectConfiguration Include="Release|x64">
+ <Configuration>Release</Configuration>
+ <Platform>x64</Platform>
+ </ProjectConfiguration>
</ItemGroup>
<ItemGroup>
<ClInclude Include="src\generate\t_generator.h" />
@@ -83,31 +91,62 @@
<UseDebugLibraries>true</UseDebugLibraries>
<CharacterSet>MultiByte</CharacterSet>
</PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>MultiByte</CharacterSet>
+ </PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>MultiByte</CharacterSet>
</PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>MultiByte</CharacterSet>
+ </PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
+ <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
+ <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="PropertySheets">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
<IncludePath>$(ProjectDir)\src\;$(ProjectDir)\src\windows\;$(IncludePath)</IncludePath>
<TargetName>thrift</TargetName>
+ <ExecutablePath>$(ExecutablePath);C:\Program Files (x86)\Git\bin</ExecutablePath>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <LinkIncremental>true</LinkIncremental>
+ <IncludePath>$(ProjectDir)\src\;$(ProjectDir)\src\windows\;$(IncludePath)</IncludePath>
+ <TargetName>thrift</TargetName>
+ <ExecutablePath>$(ExecutablePath);C:\Program Files (x86)\Git\bin</ExecutablePath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
<IncludePath>$(ProjectDir)\src\;$(ProjectDir)\src\windows\;$(IncludePath)</IncludePath>
<TargetName>thrift</TargetName>
+ <ExecutablePath>$(ExecutablePath);C:\Program Files (x86)\Git\bin</ExecutablePath>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <LinkIncremental>false</LinkIncremental>
+ <IncludePath>$(ProjectDir)\src\;$(ProjectDir)\src\windows\;$(IncludePath)</IncludePath>
+ <TargetName>thrift</TargetName>
+ <ExecutablePath>$(ExecutablePath);C:\Program Files (x86)\Git\bin</ExecutablePath>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
@@ -128,6 +167,25 @@
bison -y -o "src/thrifty.cc" --defines="src/thrifty.h" src/thrifty.yy</Command>
</PreBuildEvent>
</ItemDefinitionGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
+ <ClCompile>
+ <PrecompiledHeader>
+ </PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions>WIN32;MINGW;YY_NO_UNISTD_H;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <ForcedIncludeFiles>config.h</ForcedIncludeFiles>
+ <CompileAs>CompileAsCpp</CompileAs>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ </Link>
+ <PreBuildEvent>
+ <Command>flex -o "src/thriftl.cc" src/thriftl.ll
+bison -y -o "src/thrifty.cc" --defines="src/thrifty.h" src/thrifty.yy</Command>
+ </PreBuildEvent>
+ </ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
@@ -151,6 +209,29 @@
bison -y -o "src/thrifty.cc" --defines="src/thrifty.h" src/thrifty.yy</Command>
</PreBuildEvent>
</ItemDefinitionGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
+ <ClCompile>
+ <WarningLevel>Level3</WarningLevel>
+ <PrecompiledHeader>
+ </PrecompiledHeader>
+ <Optimization>MaxSpeed</Optimization>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <PreprocessorDefinitions>WIN32;MINGW;YY_NO_UNISTD_H;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <ForcedIncludeFiles>config.h</ForcedIncludeFiles>
+ <CompileAs>CompileAsCpp</CompileAs>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ </Link>
+ <PreBuildEvent>
+ <Command>flex -o "src/thriftl.cc" src/thriftl.ll
+bison -y -o "src/thrifty.cc" --defines="src/thrifty.h" src/thrifty.yy</Command>
+ </PreBuildEvent>
+ </ItemDefinitionGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
diff --git a/compiler/cpp/src/thrifty.yy b/compiler/cpp/src/thrifty.yy
index cc024a1..ef53cc3 100644
--- a/compiler/cpp/src/thrifty.yy
+++ b/compiler/cpp/src/thrifty.yy
@@ -28,7 +28,11 @@
#define __STDC_LIMIT_MACROS
#define __STDC_FORMAT_MACROS
#include <stdio.h>
+#ifndef _MSC_VER
#include <inttypes.h>
+#else
+#include <stdint.h>
+#endif
#include <limits.h>
#include "main.h"
#include "globals.h"
diff --git a/configure.ac b/configure.ac
index aa8a16e..8943cdb 100644
--- a/configure.ac
+++ b/configure.ac
@@ -325,6 +325,7 @@
AC_CHECK_HEADERS([openssl/ssl.h])
AC_CHECK_HEADERS([openssl/rand.h])
AC_CHECK_HEADERS([openssl/x509v3.h])
+AC_CHECK_HEADERS([sched.h])
AC_CHECK_LIB(pthread, pthread_create)
dnl NOTE(dreiss): I haven't been able to find any really solid docs
diff --git a/lib/cpp/src/concurrency/PlatformThreadFactory.h b/lib/cpp/src/concurrency/PlatformThreadFactory.h
index 9f053a0..04fdc5b 100644
--- a/lib/cpp/src/concurrency/PlatformThreadFactory.h
+++ b/lib/cpp/src/concurrency/PlatformThreadFactory.h
@@ -30,7 +30,6 @@
#ifndef USE_BOOST_THREAD
typedef PosixThreadFactory PlatformThreadFactory;
-#include <concurrency/PosixThreadFactory.h>
#else
typedef BoostThreadFactory PlatformThreadFactory;
#endif
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 6924aa6..2019353 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -17,6 +17,9 @@
* under the License.
*/
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
#include "PosixThreadFactory.h"
#include "Exception.h"
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
old mode 100644
new mode 100755
index a9e15af..654778c
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -24,10 +24,18 @@
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
#ifdef USE_BOOST_THREAD
#include <boost/thread.hpp>
#endif
+#ifdef HAVE_PTHREAD_H
+#include <pthread.h>
+#endif
+
namespace apache { namespace thrift { namespace concurrency {
class Thread;
@@ -74,8 +82,13 @@
#ifdef USE_BOOST_THREAD
typedef boost::thread::id id_t;
+
+ static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); }
+ static inline id_t get_current() { return boost::this_thread::get_id(); }
#else
typedef uint64_t id_t;
+ static inline bool is_current(pthread_t t) { return pthread_equal(pthread_self(), t); }
+ static inline id_t get_current() { return pthread_self(); }
#endif
virtual ~Thread() {};
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index e56a9b5..b756bf1 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -17,6 +17,10 @@
* under the License.
*/
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
#include "ThreadManager.h"
#include "Exception.h"
#include "Monitor.h"
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 0e44ab2..7d42a2e 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -26,7 +26,7 @@
#include "TNonblockingServer.h"
#include <concurrency/Exception.h>
#include <transport/TSocket.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <iostream>
@@ -53,12 +53,19 @@
#include <errno.h>
#include <assert.h>
+
+#ifdef HAVE_SCHED_H
#include <sched.h>
+#endif
#ifndef AF_LOCAL
#define AF_LOCAL AF_UNIX
#endif
+#ifdef _MSC_VER
+#define PRIu32 "I32u"
+#endif
+
namespace apache { namespace thrift { namespace server {
using namespace apache::thrift::protocol;
@@ -1208,10 +1215,12 @@
// Launch all the secondary IO threads in separate threads
if (ioThreads_.size() > 1) {
- ioThreadFactory_.reset(new PosixThreadFactory(
- PosixThreadFactory::OTHER, // scheduler
- PosixThreadFactory::NORMAL, // priority
+ ioThreadFactory_.reset(new PlatformThreadFactory(
+#ifndef USE_BOOST_THREAD
+ PlatformThreadFactory::OTHER, // scheduler
+ PlatformThreadFactory::NORMAL, // priority
1, // stack size (MB)
+#endif
false // detached
));
@@ -1262,7 +1271,7 @@
GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
errno);
}
- listenSocket_ = TNonblockingServer::INVALID_SOCKET;
+ listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
}
for (int i = 0; i < 2; ++i) {
@@ -1271,26 +1280,30 @@
GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
errno);
}
- notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET;
+ notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
}
}
}
void TNonblockingIOThread::createNotificationPipe() {
- if (pipe(notificationPipeFDs_) != 0) {
- GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+ if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
throw TException("can't create notification pipe");
}
- int flags;
- if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
- fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+ if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+ evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
close(notificationPipeFDs_[0]);
close(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
}
for (int i = 0; i < 2; ++i) {
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+ int flags;
if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+#else
+ if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
+#endif
close(notificationPipeFDs_[0]);
close(notificationPipeFDs_[1]);
throw TException("TNonblockingServer::createNotificationPipe() "
@@ -1349,7 +1362,7 @@
}
const int kSize = sizeof(conn);
- if (write(fd, &conn, kSize) != kSize) {
+ if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
return false;
}
@@ -1357,7 +1370,7 @@
}
/* static */
-void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) {
+void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
assert(ioThread);
(void)which;
@@ -1365,7 +1378,7 @@
while (true) {
TNonblockingServer::TConnection* connection = 0;
const int kSize = sizeof(connection);
- int nBytes = read(fd, &connection, kSize);
+ int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
if (nBytes == kSize) {
if (connection == NULL) {
// this is the command to stop our thread, exit the handler!
@@ -1416,12 +1429,13 @@
// mechanism to stop the thread, but happily if we're running in the
// same thread, this means the thread can't be blocking in the event
// loop either.
- if (!pthread_equal(pthread_self(), threadId_)) {
+ if (!Thread::is_current(threadId_)) {
notify(NULL);
}
}
void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+#ifdef HAVE_SCHED_H
// Start out with a standard, low-priority setup for the sched params.
struct sched_param sp;
bzero((void*) &sp, sizeof(sp));
@@ -1446,10 +1460,11 @@
} else {
GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
}
+#endif
}
void TNonblockingIOThread::run() {
- threadId_ = pthread_self();
+ threadId_ = Thread::get_current();
assert(eventBase_ == 0);
eventBase_ = event_base_new();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index e5d3311..5f5ea11 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -27,7 +27,7 @@
#include <concurrency/ThreadManager.h>
#include <climits>
#include <concurrency/Thread.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
#include <concurrency/Mutex.h>
#include <stack>
#include <vector>
@@ -48,7 +48,7 @@
using apache::thrift::protocol::TProtocol;
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
-using apache::thrift::concurrency::PosixThreadFactory;
+using apache::thrift::concurrency::PlatformThreadFactory;
using apache::thrift::concurrency::ThreadFactory;
using apache::thrift::concurrency::Thread;
using apache::thrift::concurrency::Mutex;
@@ -146,7 +146,7 @@
static const int DEFAULT_IO_THREADS = 1;
/// File descriptor of an invalid socket
- static const int INVALID_SOCKET = -1;
+ static const int INVALID_SOCKET_VALUE = -1;
/// # of IO threads this server will use
size_t numIOThreads_;
@@ -167,7 +167,7 @@
bool threadPoolProcessing_;
// Factory to create the IO threads
- boost::shared_ptr<PosixThreadFactory> ioThreadFactory_;
+ boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
// Vector of IOThread objects that will handle our IO
std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
@@ -804,13 +804,13 @@
// Returns the thread id associated with this object. This should
// only be called after the thread has been started.
- pthread_t getThreadId() const { return threadId_; }
+ Thread::id_t getThreadId() const { return threadId_; }
// Returns the send-fd for task complete notifications.
- int getNotificationSendFD() const { return notificationPipeFDs_[1]; }
+ evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
// Returns the read-fd for task complete notifications.
- int getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
+ evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
// Returns the actual thread object associated with this IO thread.
boost::shared_ptr<Thread> getThread() const { return thread_; }
@@ -839,7 +839,7 @@
*
* @param fd the descriptor the event occurred on.
*/
- static void notifyHandler(int fd, short which, void* v);
+ static void notifyHandler(evutil_socket_t fd, short which, void* v);
/**
* C-callable event handler for listener events. Provides a callback
@@ -876,7 +876,7 @@
const int number_;
/// The actual physical thread id.
- pthread_t threadId_;
+ Thread::id_t threadId_;
/// If listenSocket_ >= 0, adds an event on the event_base to accept conns
int listenSocket_;
@@ -894,7 +894,7 @@
struct event notificationEvent_;
/// File descriptors for pipe used for task completion notification.
- int notificationPipeFDs_[2];
+ evutil_socket_t notificationPipeFDs_[2];
/// Actual IO Thread
boost::shared_ptr<Thread> thread_;
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index c16e32f..ec30d76 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -17,6 +17,10 @@
* under the License.
*/
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
#include "server/TThreadPoolServer.h"
#include "transport/TTransportException.h"
#include "concurrency/Thread.h"
diff --git a/lib/cpp/src/windows/config.h b/lib/cpp/src/windows/config.h
index 3af158a..95a5676 100644
--- a/lib/cpp/src/windows/config.h
+++ b/lib/cpp/src/windows/config.h
@@ -30,6 +30,8 @@
#pragma warning(disable: 4996) // Depreciated posix name.
#pragma warning(disable: 4250) // Inherits via dominance.
+#pragma warning(disable: 4244) // conversion from '...' to '...', possible loss of data.
+#pragma warning(disable: 4267) // conversion from '...' to '...', possible loss of data.
#define VERSION "0.9.0-dev"
#define HAVE_GETTIMEOFDAY 1
@@ -105,9 +107,9 @@
}
#endif // WINVER
-inline void close(SOCKET socket)
+inline int close(SOCKET socket)
{
- ::closesocket(socket);
+ return ::closesocket(socket);
}
#endif // _THRIFT_WINDOWS_CONFIG_H_
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
index 1626c6e..bf41935 100644
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -65,8 +65,12 @@
UnitTests_SOURCES = \
UnitTestMain.cpp \
TMemoryBufferTest.cpp \
- TBufferBaseTest.cpp \
+ TBufferBaseTest.cpp
+
+if !WITH_BOOSTTHREADS
+UnitTests_SOURCES += \
RWMutexStarveTest.cpp
+endif
UnitTests_LDADD = \
libtestgencpp.la \