THRIFT-3496 C++: Cob style client fails when sending a consecutive request
This closes #752
diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc
index 8770ade..5056d87 100644
--- a/compiler/cpp/src/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/generate/t_cpp_generator.cc
@@ -2437,6 +2437,10 @@
out <<
indent() << "::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_);" << endl;
}
+ if (style == "Cob") {
+ out <<
+ indent() << _this << "otrans_->resetBuffer();" << endl;
+ }
out <<
indent() << _this << "oprot_->writeMessageBegin(\"" <<
(*f_iter)->get_name() <<
diff --git a/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp b/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp
index 1279bc6..74acfaa 100644
--- a/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp
+++ b/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp
@@ -39,7 +39,7 @@
const char* address,
int port,
struct event_base* eb)
- : host_(host), path_(path), recvBuf_(NULL), conn_(NULL) {
+ : host_(host), path_(path), conn_(NULL) {
conn_ = evhttp_connection_new(address, port);
if (conn_ == NULL) {
throw TException("evhttp_connection_new failed");
@@ -56,9 +56,6 @@
void TEvhttpClientChannel::sendAndRecvMessage(const VoidCallback& cob,
apache::thrift::transport::TMemoryBuffer* sendBuf,
apache::thrift::transport::TMemoryBuffer* recvBuf) {
- cob_ = cob;
- recvBuf_ = recvBuf;
-
struct evhttp_request* req = evhttp_request_new(response, this);
if (req == NULL) {
throw TException("evhttp_request_new failed");
@@ -88,6 +85,8 @@
if (rv != 0) {
throw TException("evhttp_make_request failed");
}
+
+ completionQueue_.push(Completion(cob, recvBuf));
}
void TEvhttpClientChannel::sendMessage(const VoidCallback& cob,
@@ -107,9 +106,12 @@
}
void TEvhttpClientChannel::finish(struct evhttp_request* req) {
+ assert(!completionQueue_.empty());
+ Completion completion = completionQueue_.front();
+ completionQueue_.pop();
if (req == NULL) {
try {
- cob_();
+ completion.first();
} catch (const TTransportException& e) {
if (e.getType() == TTransportException::END_OF_FILE)
throw TException("connect failed");
@@ -119,7 +121,7 @@
return;
} else if (req->response_code != 200) {
try {
- cob_();
+ completion.first();
} catch (const TTransportException& e) {
std::stringstream ss;
ss << "server returned code " << req->response_code;
@@ -132,9 +134,9 @@
}
return;
}
- recvBuf_->resetBuffer(EVBUFFER_DATA(req->input_buffer),
+ completion.second->resetBuffer(EVBUFFER_DATA(req->input_buffer),
static_cast<uint32_t>(EVBUFFER_LENGTH(req->input_buffer)));
- cob_();
+ completion.first();
return;
}
diff --git a/lib/cpp/src/thrift/async/TEvhttpClientChannel.h b/lib/cpp/src/thrift/async/TEvhttpClientChannel.h
index 72ed40f..72eb32d 100644
--- a/lib/cpp/src/thrift/async/TEvhttpClientChannel.h
+++ b/lib/cpp/src/thrift/async/TEvhttpClientChannel.h
@@ -20,7 +20,9 @@
#ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_
#define _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ 1
+#include <queue>
#include <string>
+#include <utility>
#include <boost/shared_ptr.hpp>
#include <thrift/async/TAsyncChannel.h>
@@ -72,8 +74,9 @@
std::string host_;
std::string path_;
- VoidCallback cob_;
- apache::thrift::transport::TMemoryBuffer* recvBuf_;
+ typedef std::pair<VoidCallback, apache::thrift::transport::TMemoryBuffer*> Completion;
+ typedef std::queue<Completion> CompletionQueue;
+ CompletionQueue completionQueue_;
struct evhttp_connection* conn_;
};
}
diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp
index 5d06c56..6aebcdc 100644
--- a/test/cpp/src/TestClient.cpp
+++ b/test/cpp/src/TestClient.cpp
@@ -23,6 +23,7 @@
#include <locale>
#include <ios>
#include <iostream>
+#include <sstream>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/protocol/THeaderProtocol.h>
@@ -65,45 +66,38 @@
return ret;
}
-static void testString_clientReturn(const char* host,
- int port,
- event_base* base,
- TProtocolFactory* protocolFactory,
+static void testString_clientReturn(event_base* base,
+ int testNr,
ThriftTestCobClient* client) {
- (void)host;
- (void)port;
- (void)protocolFactory;
try {
string s;
client->recv_testString(s);
- cout << "testString: " << s << endl;
+ std::ostringstream os;
+ os << "test" << testNr;
+ const bool ok = (s == os.str());
+ cout << "testString: " << s << " " << ((ok) ? "ok" : "failed") << endl;
} catch (TException& exn) {
cout << "Error: " << exn.what() << endl;
}
- event_base_loopbreak(base); // end test
+ if (testNr == 9)
+ event_base_loopbreak(base); // end test
}
-static void testVoid_clientReturn(const char* host,
- int port,
- event_base* base,
- TProtocolFactory* protocolFactory,
- ThriftTestCobClient* client) {
+static void testVoid_clientReturn(event_base* base, ThriftTestCobClient* client) {
try {
client->recv_testVoid();
cout << "testVoid" << endl;
- // next test
- delete client;
- boost::shared_ptr<TAsyncChannel> channel(new TEvhttpClientChannel(host, "/", host, port, base));
- client = new ThriftTestCobClient(channel, protocolFactory);
- client->testString(tcxx::bind(testString_clientReturn,
- host,
- port,
- base,
- protocolFactory,
- tcxx::placeholders::_1),
- "Test");
+ for (int testNr = 0; testNr < 10; ++testNr) {
+ std::ostringstream os;
+ os << "test" << testNr;
+ client->testString(tcxx::bind(testString_clientReturn,
+ base,
+ testNr,
+ tcxx::placeholders::_1),
+ os.str());
+ }
} catch (TException& exn) {
cout << "Error: " << exn.what() << endl;
}
@@ -306,10 +300,7 @@
new TEvhttpClientChannel(host.c_str(), "/", host.c_str(), port, base));
ThriftTestCobClient* client = new ThriftTestCobClient(channel, protocolFactory.get());
client->testVoid(tcxx::bind(testVoid_clientReturn,
- host.c_str(),
- port,
base,
- protocolFactory.get(),
tcxx::placeholders::_1));
event_base_loop(base, 0);