THRIFT-1217 Use evutil_socketpair instead of pipe
Patch: alexandre parenteau

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1144286 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 69ae235..4774b36 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -22,10 +22,20 @@
 #include <transport/TSocket.h>
 
 #include <iostream>
+
+#ifdef HAVE_SYS_SOCKET_H
 #include <sys/socket.h>
+#endif
+
+#ifdef HAVE_NETINET_IN_H
 #include <netinet/in.h>
 #include <netinet/tcp.h>
+#endif
+
+#ifdef HAVE_NETDB_H
 #include <netdb.h>
+#endif
+
 #include <fcntl.h>
 #include <errno.h>
 #include <assert.h>
@@ -708,7 +718,7 @@
   #ifdef IPV6_V6ONLY
   if (res->ai_family == AF_INET6) {
     int zero = 0;
-    if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
+    if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
       GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
     }
   }
@@ -718,9 +728,9 @@
   int one = 1;
 
   // Set reuseaddr to avoid 2MSL delay on server restart
-  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
 
-  if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
+  if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
     close(s);
     freeaddrinfo(res0);
     throw TException("TNonblockingServer::serve() bind");
@@ -750,20 +760,20 @@
   struct linger ling = {0, 0};
 
   // Keepalive to ensure full result flushing
-  setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
+  setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
 
   // Turn linger off to avoid hung sockets
-  setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+  setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
 
   // Set TCP nodelay if available, MAC OS X Hack
   // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
   #ifndef TCP_NOPUSH
-  setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
+  setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
   #endif
 
   #ifdef TCP_LOW_MIN_RTO
   if (TSocket::getUseLowMinRto()) {
-    setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
+    setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
   }
   #endif
 
@@ -777,13 +787,12 @@
 }
 
 void TNonblockingServer::createNotificationPipe() {
-  if (pipe(notificationPipeFDs_) != 0) {
-    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
-      throw TException("can't create notification pipe");
+  if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
+    throw TException("can't create notification pipe");
   }
-  int flags;
-  if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
-      fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+  if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+     evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
     close(notificationPipeFDs_[0]);
     close(notificationPipeFDs_[1]);
     throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 0252f10..7b1cf4d 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -44,6 +44,36 @@
 // Forward declaration of class
 class TConnection;
 
+#ifdef LIBEVENT_VERSION_NUMBER
+#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
+#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
+#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
+#else
+// assume latest version 1 series
+#define LIBEVENT_VERSION_MAJOR 1
+#define LIBEVENT_VERSION_MINOR 14
+#define LIBEVENT_VERSION_REL 13
+#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
+#endif
+
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+ typedef int evutil_socket_t;
+#endif
+
+#ifndef SOCKOPT_CAST_T
+#define SOCKOPT_CAST_T void
+#endif
+
+template<class T>
+inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
+  return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
+}
+
+template<class T>
+inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
+  return reinterpret_cast<SOCKOPT_CAST_T*>(v);
+}
+
 /**
  * This is a non-blocking server in C++ for high performance that operates a
  * single IO thread. It assumes that all incoming requests are framed with a
@@ -176,7 +206,7 @@
   uint64_t nTotalConnectionsDropped_;
 
   /// File descriptors for pipe used for task completion notification.
-  int notificationPipeFDs_[2];
+  evutil_socket_t notificationPipeFDs_[2];
 
   /**
    * This is a stack of all the objects that have been created but that
@@ -634,7 +664,7 @@
    * @param which the flags associated with the event.
    * @param v void* callback arg where we placed TNonblockingServer's "this".
    */
-  static void eventHandler(int fd, short which, void* v) {
+  static void eventHandler(evutil_socket_t fd, short which, void* v) {
     ((TNonblockingServer*)v)->handleEvent(fd, which);
   }
 
@@ -874,7 +904,7 @@
    * @param which the flags associated with the event.
    * @param v void* callback arg where we placed TConnection's "this".
    */
-  static void eventHandler(int fd, short /* which */, void* v) {
+  static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
     assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
     ((TConnection*)v)->workSocket();
   }
@@ -887,17 +917,17 @@
    *
    * @param fd the descriptor the event occured on.
    */
-  static void taskHandler(int fd, short /* which */, void* /* v */) {
+  static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
     TConnection* connection;
     ssize_t nBytes;
-    while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*)))
+    while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
         == sizeof(TConnection*)) {
       connection->transition();
     }
     if (nBytes > 0) {
       throw TException("TConnection::taskHandler unexpected partial read");
     }
-    if (errno != EWOULDBLOCK && errno != EAGAIN) {
+    if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
       GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
     }
   }
@@ -911,8 +941,8 @@
    */
   bool notifyServer() {
     TConnection* connection = this;
-    if (write(server_->getNotificationSendFD(), (const void*)&connection,
-             sizeof(TConnection*)) != sizeof(TConnection*)) {
+    if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
+             sizeof(TConnection*), 0) != sizeof(TConnection*)) {
       return false;
     }