THRIFT-2577 C++ TFileTransport missuse of closesocket on windows platform
Patch: suuyaoo
This closes #142
---
diff --git a/lib/cpp/src/thrift/transport/PlatformSocket.h b/lib/cpp/src/thrift/transport/PlatformSocket.h
index 58d68a4..019f4cb 100644
--- a/lib/cpp/src/thrift/transport/PlatformSocket.h
+++ b/lib/cpp/src/thrift/transport/PlatformSocket.h
@@ -22,6 +22,7 @@
#ifdef _WIN32
# define THRIFT_GET_SOCKET_ERROR ::WSAGetLastError()
+# define THRIFT_ERRNO (*_errno())
# define THRIFT_EINPROGRESS WSAEINPROGRESS
# define THRIFT_EAGAIN WSAEWOULDBLOCK
# define THRIFT_EINTR WSAEINTR
@@ -40,6 +41,15 @@
# define THRIFT_F_SETFL 1
# define THRIFT_GETTIMEOFDAY thrift_gettimeofday
# define THRIFT_CLOSESOCKET closesocket
+# define THRIFT_CLOSE _close
+# define THRIFT_OPEN _open
+# define THRIFT_FTRUNCATE _chsize_s
+# define THRIFT_FSYNC _commit
+# define THRIFT_LSEEK _lseek
+# define THRIFT_WRITE _write
+# define THRIFT_READ _read
+# define THRIFT_FSTAT _fstat
+# define THRIFT_STAT _stat
# define THRIFT_GAI_STRERROR gai_strerrorA
# define THRIFT_SSIZET ptrdiff_t
# define THRIFT_SNPRINTF _snprintf
@@ -61,6 +71,7 @@
#else //not _WIN32
# include <errno.h>
# define THRIFT_GET_SOCKET_ERROR errno
+# define THRIFT_ERRNO errno
# define THRIFT_EINTR EINTR
# define THRIFT_EINPROGRESS EINPROGRESS
# define THRIFT_ECONNRESET ECONNRESET
@@ -79,6 +90,15 @@
# define THRIFT_F_SETFL F_SETFL
# define THRIFT_GETTIMEOFDAY gettimeofday
# define THRIFT_CLOSESOCKET close
+# define THRIFT_CLOSE close
+# define THRIFT_OPEN open
+# define THRIFT_FTRUNCATE ftruncate
+# define THRIFT_FSYNC fsync
+# define THRIFT_LSEEK lseek
+# define THRIFT_WRITE write
+# define THRIFT_READ read
+# define THRIFT_STAT stat
+# define THRIFT_FSTAT fstat
# define THRIFT_GAI_STRERROR gai_strerror
# define THRIFT_SSIZET ssize_t
# define THRIFT_SNPRINTF snprintf
diff --git a/lib/cpp/src/thrift/transport/TFDTransport.cpp b/lib/cpp/src/thrift/transport/TFDTransport.cpp
index 3b72de5..26365f0 100644
--- a/lib/cpp/src/thrift/transport/TFDTransport.cpp
+++ b/lib/cpp/src/thrift/transport/TFDTransport.cpp
@@ -40,8 +40,8 @@
return;
}
- int rv = ::THRIFT_CLOSESOCKET(fd_);
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int rv = ::THRIFT_CLOSE(fd_);
+ int errno_copy = THRIFT_ERRNO;
fd_ = -1;
// Have to check uncaught_exception because this is called in the destructor.
if (rv < 0 && !std::uncaught_exception()) {
@@ -55,14 +55,14 @@
unsigned int maxRetries = 5; // same as the TSocket default
unsigned int retries = 0;
while (true) {
- THRIFT_SSIZET rv = ::read(fd_, buf, len);
+ THRIFT_SSIZET rv = ::THRIFT_READ(fd_, buf, len);
if (rv < 0) {
- if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && retries < maxRetries) {
+ if (THRIFT_ERRNO == THRIFT_EINTR && retries < maxRetries) {
// If interrupted, try again
++retries;
continue;
}
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
throw TTransportException(TTransportException::UNKNOWN,
"TFDTransport::read()",
errno_copy);
@@ -75,10 +75,10 @@
void TFDTransport::write(const uint8_t* buf, uint32_t len) {
while (len > 0) {
- THRIFT_SSIZET rv = ::write(fd_, buf, len);
+ THRIFT_SSIZET rv = ::THRIFT_WRITE(fd_, buf, len);
if (rv < 0) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
throw TTransportException(TTransportException::UNKNOWN,
"TFDTransport::write()",
errno_copy);
diff --git a/lib/cpp/src/thrift/transport/TFileTransport.cpp b/lib/cpp/src/thrift/transport/TFileTransport.cpp
index c94ecd2..c267f2e 100644
--- a/lib/cpp/src/thrift/transport/TFileTransport.cpp
+++ b/lib/cpp/src/thrift/transport/TFileTransport.cpp
@@ -99,8 +99,8 @@
// flush any events in the queue
flush();
GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
- if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ if (-1 == ::THRIFT_CLOSE(fd_)) {
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
} else {
@@ -154,8 +154,8 @@
// close logfile
if (fd_ > 0) {
- if(-1 == ::THRIFT_CLOSESOCKET(fd_)) {
- GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_GET_SOCKET_ERROR);
+ if(-1 == ::THRIFT_CLOSE(fd_)) {
+ GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO);
} else {
//successfully closed fd
fd_ = 0;
@@ -300,7 +300,7 @@
try {
openLogFile();
} catch (...) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
fd_ = 0;
hasIOError = true;
@@ -313,14 +313,10 @@
seekToEnd();
// throw away any partial events
offset_ += readState_.lastDispatchPtr_;
-#ifndef _WIN32
- ftruncate(fd_, offset_);
-#else
- _chsize_s(fd_, offset_);
-#endif
+ THRIFT_FTRUNCATE(fd_, offset_);
readState_.resetAllValues();
} catch (...) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
hasIOError = true;
}
@@ -340,11 +336,9 @@
// Try to empty buffers before exit
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
-#ifndef _WIN32
- fsync(fd_);
-#endif
- if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ ::THRIFT_FSYNC(fd_);
+ if (-1 == ::THRIFT_CLOSE(fd_)) {
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
} else {
//fd successfully closed
@@ -369,7 +363,7 @@
return;
}
if (!fd_) {
- ::THRIFT_CLOSESOCKET(fd_);
+ ::THRIFT_CLOSE(fd_);
fd_ = 0;
}
try {
@@ -403,14 +397,14 @@
// if adding this event will cross a chunk boundary, pad the chunk with zeros
if (chunk1 != chunk2) {
// refetch the offset to keep in sync
- offset_ = lseek(fd_, 0, SEEK_CUR);
+ offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR);
int32_t padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
uint8_t* zeros = new uint8_t[padding];
memset(zeros, '\0', padding);
boost::scoped_array<uint8_t> array(zeros);
if (-1 == ::write(fd_, zeros, padding)) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
hasIOError = true;
continue;
@@ -422,8 +416,8 @@
// write the dequeued event to the file
if (outEvent->eventSize_ > 0) {
- if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
hasIOError = true;
continue;
@@ -487,9 +481,7 @@
if (flush) {
// sync (force flush) file to disk
-#ifndef _WIN32
- fsync(fd_);
-#endif
+ THRIFT_FSYNC(fd_);
unflushed = 0;
getNextFlushTime(&ts_next_flush);
@@ -600,7 +592,7 @@
if (readState_.bufferPtr_ == readState_.bufferLen_) {
// advance the offset pointer
offset_ += readState_.bufferLen_;
- readState_.bufferLen_ = static_cast<uint32_t>(::read(fd_, readBuff_, readBuffSize_));
+ readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
// if (readState_.bufferLen_) {
// T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
// }
@@ -809,11 +801,11 @@
seekToEnd = true;
chunk = numChunks - 1;
// this is the min offset to process events till
- minEndOffset = lseek(fd_, 0, SEEK_END);
+ minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END);
}
off_t newOffset = off_t(chunk) * chunkSize_;
- offset_ = lseek(fd_, newOffset, SEEK_SET);
+ offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET);
readState_.resetAllValues();
currentEvent_ = NULL;
if (offset_ == -1) {
@@ -847,11 +839,11 @@
return 0;
}
- struct stat f_info;
- int rv = fstat(fd_, &f_info);
+ struct THRIFT_STAT f_info;
+ int rv = ::THRIFT_FSTAT(fd_, &f_info);
if (rv < 0) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
throw TTransportException(TTransportException::UNKNOWN,
"TFileTransport::getNumChunks() (fstat)",
errno_copy);
@@ -877,17 +869,16 @@
#ifndef _WIN32
mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
- fd_ = ::open(filename_.c_str(), flags, mode);
#else
int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
- fd_ = ::_open(filename_.c_str(), flags, mode);
#endif
+ fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode);
offset_ = 0;
// make sure open call was successful
if(fd_ == -1) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
}
diff --git a/lib/cpp/src/thrift/transport/TSimpleFileTransport.cpp b/lib/cpp/src/thrift/transport/TSimpleFileTransport.cpp
index 9af1445..6bd716e 100644
--- a/lib/cpp/src/thrift/transport/TSimpleFileTransport.cpp
+++ b/lib/cpp/src/thrift/transport/TSimpleFileTransport.cpp
@@ -54,7 +54,7 @@
#else
int mode = _S_IREAD | _S_IWRITE;
#endif
- int fd = ::open(path.c_str(),
+ int fd = ::THRIFT_OPEN(path.c_str(),
flags,
mode);
if (fd < 0) {
diff --git a/lib/cpp/test/TFDTransportTest.cpp b/lib/cpp/test/TFDTransportTest.cpp
index e30d9c0..7b962d8 100644
--- a/lib/cpp/test/TFDTransportTest.cpp
+++ b/lib/cpp/test/TFDTransportTest.cpp
@@ -19,6 +19,7 @@
#include <cstdlib>
#include <stdexcept>
+#include <iostream>
#include <thrift/Thrift.h>
#include <thrift/transport/TFDTransport.h>
using apache::thrift::transport::TTransportException;
@@ -37,8 +38,10 @@
TFDTransport t(256, TFDTransport::CLOSE_ON_DESTROY);
t.close();
}
+ std::cout << "NOT OK 0!" << std::endl;
std::abort();
} catch (TTransportException) {
+ std::cout << "OK!" << std::endl;
}
try {