blob: 2c7cf56d25e8b1c56462f881bc1e9317f59b8c46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifdef _WIN32
#include "TTransportException.h"
#include "TPipe.h"
namespace apache { namespace thrift { namespace transport {
using namespace std;
/**
* TPipe implementation.
*/
//---- Constructors ----
TPipe::TPipe(HANDLE hpipe) :
pipename_(""),
hPipe_(hpipe),
TimeoutSeconds_(3),
isAnonymous(false)
{}
TPipe::TPipe(string pipename) :
pipename_(pipename),
hPipe_(INVALID_HANDLE_VALUE),
TimeoutSeconds_(3),
isAnonymous(false)
{}
TPipe::TPipe(HANDLE hPipeRd, HANDLE hPipeWrt) :
pipename_(""),
hPipe_(hPipeRd),
hPipeWrt_(hPipeWrt),
TimeoutSeconds_(3),
isAnonymous(true)
{}
TPipe::TPipe() :
pipename_(""),
hPipe_(INVALID_HANDLE_VALUE),
TimeoutSeconds_(3)
{}
//---- Destructor ----
TPipe::~TPipe() {
close();
}
bool TPipe::isOpen() {
return (hPipe_ != INVALID_HANDLE_VALUE);
}
//---------------------------------------------------------
// Transport callbacks
//---------------------------------------------------------
bool TPipe::peek() {
if (!isOpen()) {
return false;
}
DWORD bytesavail = 0;
int PeekRet = 0;
PeekRet = PeekNamedPipe(hPipe_, NULL, 0, NULL, &bytesavail, NULL);
return (PeekRet != 0 && bytesavail > 0);
}
void TPipe::open() {
if (isOpen()) {
return;
}
int SleepInterval = 500; //ms
int retries = TimeoutSeconds_ * 1000 / SleepInterval;
for(int i=0; i<retries; i++)
{
hPipe_ = CreateFile(
pipename_.c_str(),
GENERIC_READ | GENERIC_WRITE,
0, // no sharing
NULL, // default security attributes
OPEN_EXISTING, // opens existing pipe
0, // default attributes
NULL); // no template file
if (hPipe_ == INVALID_HANDLE_VALUE)
sleep(SleepInterval);
else
break;
}
if (hPipe_ == INVALID_HANDLE_VALUE)
throw TTransportException(TTransportException::NOT_OPEN, "Unable to open pipe");
// The pipe connected; change to message-read mode.
DWORD dwMode = PIPE_READMODE_MESSAGE;
int fSuccess = SetNamedPipeHandleState(
hPipe_, // pipe handle
&dwMode, // new pipe mode
NULL, // don't set maximum bytes
NULL); // don't set maximum time
if (fSuccess == 0)
{
throw TTransportException(TTransportException::NOT_OPEN, "SetNamedPipeHandleState failed");
close();
}
}
void TPipe::close() {
if (isOpen())
{
CloseHandle(hPipe_);
hPipe_ = INVALID_HANDLE_VALUE;
}
}
uint32_t TPipe::read(uint8_t* buf, uint32_t len) {
if (!isOpen())
throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open pipe");
DWORD cbRead;
int fSuccess = ReadFile(
hPipe_, // pipe handle
buf, // buffer to receive reply
len, // size of buffer
&cbRead, // number of bytes read
NULL); // not overlapped
if ( !fSuccess && GetLastError() != ERROR_MORE_DATA )
return 0; // No more data, possibly because client disconnected.
return cbRead;
}
void TPipe::write(const uint8_t* buf, uint32_t len) {
if (!isOpen())
throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open pipe");
HANDLE WritePipe = isAnonymous? hPipeWrt_: hPipe_;
DWORD cbWritten;
int fSuccess = WriteFile(
WritePipe, // pipe handle
buf, // message
len, // message length
&cbWritten, // bytes written
NULL); // not overlapped
if ( !fSuccess)
throw TTransportException(TTransportException::NOT_OPEN, "Write to pipe failed");
}
//---------------------------------------------------------
// Accessors
//---------------------------------------------------------
string TPipe::getPipename() {
return pipename_;
}
void TPipe::setPipename(std::string pipename) {
pipename_ = pipename;
}
HANDLE TPipe::getPipeHandle() {
return hPipe_;
}
void TPipe::setPipeHandle(HANDLE pipehandle) {
hPipe_ = pipehandle;
}
HANDLE TPipe::getWrtPipeHandle() {
return hPipeWrt_;
}
void TPipe::setWrtPipeHandle(HANDLE pipehandle) {
hPipeWrt_ = pipehandle;
}
long TPipe::getConnectTimeout() {
return TimeoutSeconds_;
}
void TPipe::setConnectTimeout(long seconds) {
TimeoutSeconds_ = seconds;
}
}}} // apache::thrift::transport
#endif //_WIN32