THRIFT-923. cpp: Implement a fully nonblocking server and client
There are three major parts of this:
1/ New callback-style interfaces for for a few key Thrift components:
TAsyncProcessor for servers and TAsyncChannel for clients.
2/ Concrete implementations of TAsyncChannel and a server for
TAsyncProcessor based on evhttp.
3/ Async-style code generation for C++
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005127 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc
index ef0dd36..0ba4540 100644
--- a/compiler/cpp/src/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/generate/t_cpp_generator.cc
@@ -59,6 +59,9 @@
iter = parsed_options.find("include_prefix");
use_include_prefix_ = (iter != parsed_options.end());
+ iter = parsed_options.find("cob_style");
+ gen_cob_style_ = (iter != parsed_options.end());
+
out_dir_base_ = "gen-cpp";
}
@@ -100,15 +103,16 @@
* Service-level generation functions
*/
- void generate_service_interface (t_service* tservice);
- void generate_service_null (t_service* tservice);
+ void generate_service_interface (t_service* tservice, string style);
+ void generate_service_null (t_service* tservice, string style);
void generate_service_multiface (t_service* tservice);
void generate_service_helpers (t_service* tservice);
- void generate_service_client (t_service* tservice);
- void generate_service_processor (t_service* tservice);
+ void generate_service_client (t_service* tservice, string style);
+ void generate_service_processor (t_service* tservice, string style);
void generate_service_skeleton (t_service* tservice);
- void generate_process_function (t_service* tservice, t_function* tfunction);
+ void generate_process_function (t_service* tservice, t_function* tfunction, string style);
void generate_function_helpers (t_service* tservice, t_function* tfunction);
+ void generate_service_async_skeleton (t_service* tservice);
/**
* Serialization constructs
@@ -166,7 +170,12 @@
t_list* tlist,
std::string iter);
- /**
+ void generate_function_call (ostream& out,
+ t_function* tfunction,
+ string target,
+ string iface,
+ string arg_prefix);
+ /*
* Helper rendering functions
*/
@@ -176,8 +185,9 @@
std::string type_name(t_type* ttype, bool in_typedef=false, bool arg=false);
std::string base_type_name(t_base_type::t_base tbase);
std::string declare_field(t_field* tfield, bool init=false, bool pointer=false, bool constant=false, bool reference=false);
- std::string function_signature(t_function* tfunction, std::string prefix="", bool name_params=true);
- std::string argument_list(t_struct* tstruct, bool name_params=true);
+ std::string function_signature(t_function* tfunction, std::string style, std::string prefix="", bool name_params=true);
+ std::string cob_function_signature(t_function* tfunction, std::string prefix="", bool name_params=true);
+ std::string argument_list(t_struct* tstruct, bool name_params=true, bool start_comma=false);
std::string type_to_enum(t_type* ttype);
std::string local_reflection_name(const char*, t_type* ttype, bool external=false);
@@ -223,6 +233,11 @@
bool use_include_prefix_;
/**
+ * True iff we should generate "Continuation OBject"-style classes as well.
+ */
+ bool gen_cob_style_;
+
+ /**
* Strings for namespace, computed once up front then used directly
*/
@@ -1172,7 +1187,7 @@
type_to_enum((*f_iter)->get_type()) << ", " <<
(*f_iter)->get_key() << ");" << endl;
// Write field contents
- if (pointers) {
+ if (pointers && !(*f_iter)->get_type()->is_xception()) {
generate_serialize_field(out, *f_iter, "(*(this->", "))");
} else {
generate_serialize_field(out, *f_iter, "this->");
@@ -1294,8 +1309,23 @@
f_header_ <<
"#ifndef " << svcname << "_H" << endl <<
"#define " << svcname << "_H" << endl <<
- endl <<
- "#include <TProcessor.h>" << endl <<
+ endl;
+ if (gen_cob_style_) {
+ f_header_ <<
+ "#include <tr1/functional>" << endl <<
+ // TODO(dreiss): Libify the base client so we don't have to include this.
+ "#include <transport/TTransportUtils.h>" << endl <<
+ "namespace apache { namespace thrift { namespace async {" << endl <<
+ "class TAsyncChannel;" << endl <<
+ "}}}" << endl;
+ }
+ f_header_ <<
+ "#include <TProcessor.h>" << endl;
+ if (gen_cob_style_) {
+ f_header_ <<
+ "#include <async/TAsyncProcessor.h>" << endl;
+ }
+ f_header_ <<
"#include \"" << get_include_prefix(*get_program()) << program_name_ <<
"_types.h\"" << endl;
@@ -1317,21 +1347,34 @@
f_service_ <<
autogen_comment();
f_service_ <<
- "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" <<
- endl <<
- endl <<
+ "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" << endl;
+ if (gen_cob_style_) {
+ f_service_ <<
+ "#include \"async/TAsyncChannel.h\"" << endl;
+ }
+ f_service_ << endl <<
ns_open_ << endl <<
endl;
// Generate all the components
- generate_service_interface(tservice);
- generate_service_null(tservice);
+ generate_service_interface(tservice, "");
+ generate_service_null(tservice, "");
generate_service_helpers(tservice);
- generate_service_client(tservice);
- generate_service_processor(tservice);
+ generate_service_client(tservice, "");
+ generate_service_processor(tservice, "");
generate_service_multiface(tservice);
generate_service_skeleton(tservice);
+ // Generate all the cob components
+ if (gen_cob_style_) {
+ generate_service_interface(tservice, "CobCl");
+ generate_service_interface(tservice, "CobSv");
+ generate_service_null(tservice, "CobSv");
+ generate_service_client(tservice, "Cob");
+ generate_service_processor(tservice, "Cob");
+ generate_service_async_skeleton(tservice);
+ }
+
// Close the namespace
f_service_ <<
ns_close_ << endl <<
@@ -1360,6 +1403,7 @@
t_struct* ts = (*f_iter)->get_arglist();
string name_orig = ts->get_name();
+ // TODO(dreiss): Why is this stuff not in generate_function_helpers?
ts->set_name(tservice->get_name() + "_" + (*f_iter)->get_name() + "_args");
generate_struct_definition(f_header_, ts, false);
generate_struct_reader(f_service_, ts);
@@ -1378,23 +1422,29 @@
*
* @param tservice The service to generate a header definition for
*/
-void t_cpp_generator::generate_service_interface(t_service* tservice) {
+void t_cpp_generator::generate_service_interface(t_service* tservice, string style) {
+
+ if (style == "CobCl") {
+ // Forward declare the client.
+ indent(f_header_) << "class " << service_name_ << "CobClient;" << endl << endl;
+ }
+
string extends = "";
if (tservice->get_extends() != NULL) {
- extends = " : virtual public " + type_name(tservice->get_extends()) + "If";
+ extends = " : virtual public " + type_name(tservice->get_extends()) + style + "If";
}
f_header_ <<
- "class " << service_name_ << "If" << extends << " {" << endl <<
+ "class " << service_name_ << style << "If" << extends << " {" << endl <<
" public:" << endl;
indent_up();
f_header_ <<
- indent() << "virtual ~" << service_name_ << "If() {}" << endl;
+ indent() << "virtual ~" << service_name_ << style << "If" << "() {}" << endl;
vector<t_function*> functions = tservice->get_functions();
vector<t_function*>::iterator f_iter;
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
f_header_ <<
- indent() << "virtual " << function_signature(*f_iter) << " = 0;" << endl;
+ indent() << "virtual " << function_signature(*f_iter, style) << " = 0;" << endl;
}
indent_down();
f_header_ <<
@@ -1406,36 +1456,49 @@
*
* @param tservice The service to generate a header definition for
*/
-void t_cpp_generator::generate_service_null(t_service* tservice) {
+void t_cpp_generator::generate_service_null(t_service* tservice, string style) {
string extends = "";
if (tservice->get_extends() != NULL) {
- extends = " , virtual public " + type_name(tservice->get_extends()) + "Null";
+ extends = " , virtual public " + type_name(tservice->get_extends()) + style + "Null";
}
f_header_ <<
- "class " << service_name_ << "Null : virtual public " << service_name_ << "If" << extends << " {" << endl <<
+ "class " << service_name_ << style << "Null : virtual public " << service_name_ << style << "If" << extends << " {" << endl <<
" public:" << endl;
indent_up();
f_header_ <<
- indent() << "virtual ~" << service_name_ << "Null() {}" << endl;
+ indent() << "virtual ~" << service_name_ << style << "Null() {}" << endl;
vector<t_function*> functions = tservice->get_functions();
vector<t_function*>::iterator f_iter;
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
f_header_ <<
- indent() << function_signature(*f_iter, "", false) << " {" << endl;
+ indent() << function_signature(*f_iter, style, "", false) << " {" << endl;
indent_up();
+
t_type* returntype = (*f_iter)->get_returntype();
- if (returntype->is_void()) {
- f_header_ <<
- indent() << "return;" << endl;
- } else if (is_complex_type(returntype)) {
- f_header_ <<
- indent() << "return;" << endl;
+ t_field returnfield(returntype, "_return");
+
+ if (style == "") {
+ if (returntype->is_void() || is_complex_type(returntype)) {
+ f_header_ << indent() << "return;" << endl;
+ } else {
+ f_header_ <<
+ indent() << declare_field(&returnfield, true) << endl <<
+ indent() << "return _return;" << endl;
+ }
+ } else if (style == "CobSv") {
+ if (returntype->is_void()) {
+ f_header_ << indent() << "return cob();" << endl;
} else {
t_field returnfield(returntype, "_return");
f_header_ <<
indent() << declare_field(&returnfield, true) << endl <<
- indent() << "return _return;" << endl;
+ indent() << "return cob(_return);" << endl;
}
+
+ } else {
+ throw "UNKNOWN STYLE";
+ }
+
indent_down();
f_header_ <<
indent() << "}" << endl;
@@ -1445,6 +1508,109 @@
"};" << endl << endl;
}
+void t_cpp_generator::generate_function_call(ostream& out, t_function* tfunction, string target, string iface, string arg_prefix) {
+ bool first = true;
+ t_type* ret_type = get_true_type(tfunction->get_returntype());
+ out << indent();
+ if (!tfunction->is_oneway() && !ret_type->is_void()) {
+ if (is_complex_type(ret_type)) {
+ first = false;
+ out << iface << "->" << tfunction->get_name() << "(" << target;
+ } else {
+ out << target << " = " << iface << "->" << tfunction->get_name() << "(";
+ }
+ } else {
+ out << iface << "->" << tfunction->get_name() << "(";
+ }
+ const std::vector<t_field*>& fields = tfunction->get_arglist()->get_members();
+ vector<t_field*>::const_iterator f_iter;
+ for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+ if (first) {
+ first = false;
+ } else {
+ out << ", ";
+ }
+ out << arg_prefix << (*f_iter)->get_name();
+ }
+ out << ");" << endl;
+}
+
+void t_cpp_generator::generate_service_async_skeleton(t_service* tservice) {
+ string svcname = tservice->get_name();
+
+ // Service implementation file includes
+ string f_skeleton_name = get_out_dir()+svcname+"_async_server.skeleton.cpp";
+
+ string ns = namespace_prefix(tservice->get_program()->get_namespace("cpp"));
+
+ ofstream f_skeleton;
+ f_skeleton.open(f_skeleton_name.c_str());
+ f_skeleton <<
+ "// This autogenerated skeleton file illustrates one way to adapt a synchronous" << endl <<
+ "// interface into an asynchronous interface. You should copy it to another" << endl <<
+ "// filename to avoid overwriting it and rewrite as asynchronous any functions" << endl <<
+ "// that would otherwise introduce unwanted latency." << endl <<
+ endl <<
+ "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" << endl <<
+ "#include <protocol/TBinaryProtocol.h>" << endl <<
+ "#include <async/TEventServer.h>" << endl <<
+ endl <<
+ "using namespace ::apache::thrift;" << endl <<
+ "using namespace ::apache::thrift::protocol;" << endl <<
+ "using namespace ::apache::thrift::transport;" << endl <<
+ "using namespace ::apache::thrift::async;" << endl <<
+ endl <<
+ "using boost::shared_ptr;" << endl <<
+ endl;
+
+ if (!ns.empty()) {
+ f_skeleton <<
+ "using namespace " << string(ns, 0, ns.size()-2) << ";" << endl <<
+ endl;
+ }
+
+ f_skeleton <<
+ "class " << svcname << "AsyncHandler : " <<
+ "public " << svcname << "CobSvIf {" << endl <<
+ " public:" << endl;
+ indent_up();
+ f_skeleton <<
+ indent() << svcname << "AsyncHandler() {" << endl <<
+ indent() << " syncHandler_ = std::auto_ptr<" << svcname <<
+ "Handler>(new " << svcname << "Handler);" << endl <<
+ indent() << " // Your initialization goes here" << endl <<
+ indent() << "}" << endl;
+ f_skeleton <<
+ indent() << "virtual ~" << service_name_ << "AsyncHandler();" << endl;
+
+ vector<t_function*> functions = tservice->get_functions();
+ vector<t_function*>::iterator f_iter;
+ for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+ f_skeleton <<
+ endl <<
+ indent() << function_signature(*f_iter, "CobSv", "", true) << " {" << endl;
+ indent_up();
+
+ t_type* returntype = (*f_iter)->get_returntype();
+ t_field returnfield(returntype, "_return");
+
+ string target = returntype->is_void() ? "" : "_return";
+ if (!returntype->is_void()) {
+ f_skeleton <<
+ indent() << declare_field(&returnfield, true) << endl;
+ }
+ generate_function_call(f_skeleton, *f_iter, target, "syncHandler_", "");
+ f_skeleton << indent() << "return cob(" << target << ");" << endl;
+
+ scope_down(f_skeleton);
+ }
+ f_skeleton << endl <<
+ " protected:" << endl <<
+ indent() << "std::auto_ptr<" << svcname << "Handler> syncHandler_;" << endl;
+ indent_down();
+ f_skeleton <<
+ "};" << endl << endl;
+}
/**
* Generates a multiface, which is a single server that just takes a set
@@ -1531,7 +1697,7 @@
call += ")";
f_header_ <<
- indent() << function_signature(*f_iter) << " {" << endl;
+ indent() << function_signature(*f_iter, "") << " {" << endl;
indent_up();
f_header_ <<
indent() << "uint32_t sz = ifaces_.size();" << endl <<
@@ -1576,75 +1742,115 @@
*
* @param tservice The service to generate a server for.
*/
-void t_cpp_generator::generate_service_client(t_service* tservice) {
+void t_cpp_generator::generate_service_client(t_service* tservice, string style) {
+ string ifstyle;
+ if (style == "Cob") {
+ ifstyle = "CobCl";
+ }
+
string extends = "";
string extends_client = "";
if (tservice->get_extends() != NULL) {
extends = type_name(tservice->get_extends());
- extends_client = ", public " + extends + "Client";
+ extends_client = ", public " + extends + style + "Client";
}
// Generate the header portion
f_header_ <<
- "class " << service_name_ << "Client : " <<
- "virtual public " << service_name_ << "If" <<
+ "class " << service_name_ << style << "Client : " <<
+ "virtual public " << service_name_ << ifstyle << "If" <<
extends_client << " {" << endl <<
" public:" << endl;
indent_up();
- f_header_ <<
- indent() << service_name_ << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) :" << endl;
- if (extends.empty()) {
+ if (style != "Cob") {
f_header_ <<
- indent() << " piprot_(prot)," << endl <<
- indent() << " poprot_(prot) {" << endl <<
- indent() << " iprot_ = prot.get();" << endl <<
- indent() << " oprot_ = prot.get();" << endl <<
- indent() << "}" << endl;
- } else {
- f_header_ <<
- indent() << " " << extends << "Client(prot, prot) {}" << endl;
- }
+ indent() << service_name_ << style << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) :" << endl;
+ if (extends.empty()) {
+ f_header_ <<
+ indent() << " piprot_(prot)," << endl <<
+ indent() << " poprot_(prot) {" << endl <<
+ indent() << " iprot_ = prot.get();" << endl <<
+ indent() << " oprot_ = prot.get();" << endl <<
+ indent() << "}" << endl;
+ } else {
+ f_header_ <<
+ indent() << " " << extends << style << "Client(prot, prot) {}" << endl;
+ }
- f_header_ <<
- indent() << service_name_ << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) :" << endl;
- if (extends.empty()) {
f_header_ <<
- indent() << " piprot_(iprot)," << endl <<
- indent() << " poprot_(oprot) {" << endl <<
- indent() << " iprot_ = iprot.get();" << endl <<
- indent() << " oprot_ = oprot.get();" << endl <<
+ indent() << service_name_ << style << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) :" << endl;
+ if (extends.empty()) {
+ if (style == "Cob") {
+ f_header_ <<
+ indent() << " rpc_ctx_(ctx)," << endl;
+ }
+ f_header_ <<
+ indent() << " piprot_(iprot)," << endl <<
+ indent() << " poprot_(oprot) {" << endl <<
+ indent() << " iprot_ = iprot.get();" << endl <<
+ indent() << " oprot_ = oprot.get();" << endl <<
+ indent() << "}" << endl;
+ } else {
+ f_header_ <<
+ indent() << " " << extends << style << "Client(iprot, oprot) {}" << endl;
+ }
+
+ // Generate getters for the protocols.
+ f_header_ <<
+ indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {" << endl <<
+ indent() << " return piprot_;" << endl <<
indent() << "}" << endl;
- } else {
+
f_header_ <<
- indent() << " " << extends << "Client(iprot, oprot) {}" << endl;
- }
+ indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {" << endl <<
+ indent() << " return poprot_;" << endl <<
+ indent() << "}" << endl;
+ } else /* if (style == "Cob") */ {
// Generate getters for the protocols.
- f_header_ <<
- indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {" << endl <<
- indent() << " return piprot_;" << endl <<
- indent() << "}" << endl;
+ f_header_ <<
+ indent() << service_name_ << style << "Client("
+ << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, "
+ << "::apache::thrift::protocol::TProtocolFactory* protocolFactory) :" << endl;
+ if (extends.empty()) {
+ f_header_ <<
+ indent() << " channel_(channel)," << endl <<
+ indent() << " itrans_(new ::apache::thrift::transport::TMemoryBuffer())," << endl <<
+ indent() << " otrans_(new ::apache::thrift::transport::TMemoryBuffer())," << endl <<
+ indent() << " piprot_(protocolFactory->getProtocol(itrans_))," << endl <<
+ indent() << " poprot_(protocolFactory->getProtocol(otrans_)) {" << endl <<
+ indent() << " iprot_ = piprot_.get();" << endl <<
+ indent() << " oprot_ = poprot_.get();" << endl <<
+ indent() << "}" << endl;
+ } else {
+ f_header_ <<
+ indent() << " " << extends << style << "Client(channel, protocolFactory) {}" << endl;
+ }
+ }
- f_header_ <<
- indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {" << endl <<
- indent() << " return poprot_;" << endl <<
- indent() << "}" << endl;
+ if (style == "Cob") {
+ f_header_ <<
+ indent() << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {" << endl <<
+ indent() << " return channel_;" << endl <<
+ indent() << "}" << endl;
+ }
vector<t_function*> functions = tservice->get_functions();
vector<t_function*>::const_iterator f_iter;
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+ indent(f_header_) << function_signature(*f_iter, ifstyle) << ";" << endl;
+ // TODO(dreiss): Use private inheritance to avoid generating thise in cob-style.
t_function send_function(g_type_void,
- string("send_") + (*f_iter)->get_name(),
- (*f_iter)->get_arglist());
- indent(f_header_) << function_signature(*f_iter) << ";" << endl;
- indent(f_header_) << function_signature(&send_function) << ";" << endl;
+ string("send_") + (*f_iter)->get_name(),
+ (*f_iter)->get_arglist());
+ indent(f_header_) << function_signature(&send_function, "") << ";" << endl;
if (!(*f_iter)->is_oneway()) {
t_struct noargs(program_);
t_function recv_function((*f_iter)->get_returntype(),
- string("recv_") + (*f_iter)->get_name(),
- &noargs);
- indent(f_header_) << function_signature(&recv_function) << ";" << endl;
+ string("recv_") + (*f_iter)->get_name(),
+ &noargs);
+ indent(f_header_) << function_signature(&recv_function, "") << ";" << endl;
}
}
indent_down();
@@ -1653,11 +1859,19 @@
f_header_ <<
" protected:" << endl;
indent_up();
+
+ if (style == "Cob") {
+ f_header_ <<
+ indent() << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;" << endl <<
+ indent() << "boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;" << endl <<
+ indent() << "boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;" << endl;
+ }
f_header_ <<
indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;" << endl <<
indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;" << endl <<
indent() << "::apache::thrift::protocol::TProtocol* iprot_;" << endl <<
indent() << "::apache::thrift::protocol::TProtocol* oprot_;" << endl;
+
indent_down();
}
@@ -1665,7 +1879,7 @@
"};" << endl <<
endl;
- string scope = service_name_ + "Client::";
+ string scope = service_name_ + style + "Client::";
// Generate client method implementations
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
@@ -1673,7 +1887,7 @@
// Open function
indent(f_service_) <<
- function_signature(*f_iter, scope) << endl;
+ function_signature(*f_iter, ifstyle, scope) << endl;
scope_up(f_service_);
indent(f_service_) <<
"send_" << funname << "(";
@@ -1695,155 +1909,171 @@
}
f_service_ << ");" << endl;
- if (!(*f_iter)->is_oneway()) {
- f_service_ << indent();
- if (!(*f_iter)->get_returntype()->is_void()) {
- if (is_complex_type((*f_iter)->get_returntype())) {
- f_service_ << "recv_" << funname << "(_return);" << endl;
+ if (style != "Cob") {
+ if (!(*f_iter)->is_oneway()) {
+ f_service_ << indent();
+ if (!(*f_iter)->get_returntype()->is_void()) {
+ if (is_complex_type((*f_iter)->get_returntype())) {
+ f_service_ << "recv_" << funname << "(_return);" << endl;
+ } else {
+ f_service_ << "return recv_" << funname << "();" << endl;
+ }
} else {
- f_service_ << "return recv_" << funname << "();" << endl;
+ f_service_ <<
+ "recv_" << funname << "();" << endl;
}
+ }
+ } else {
+ if (!(*f_iter)->is_oneway()) {
+ f_service_ <<
+ indent() << _this << "channel_->sendAndRecvMessage(" <<
+ "std::tr1::bind(cob, this), " << _this << "otrans_.get(), " <<
+ _this << "itrans_.get());" << endl;
} else {
f_service_ <<
- "recv_" << funname << "();" << endl;
+ indent() << _this << "channel_->sendMessage(" <<
+ "std::tr1::bind(cob, this), " << _this << "otrans_.get());" << endl;
}
}
scope_down(f_service_);
f_service_ << endl;
- // Function for sending
- t_function send_function(g_type_void,
- string("send_") + (*f_iter)->get_name(),
- (*f_iter)->get_arglist());
+ //if (style != "Cob") // TODO(dreiss): Libify the client and don't generate this for cob-style
+ if (true) {
+ // Function for sending
+ t_function send_function(g_type_void,
+ string("send_") + (*f_iter)->get_name(),
+ (*f_iter)->get_arglist());
- // Open the send function
- indent(f_service_) <<
- function_signature(&send_function, scope) << endl;
- scope_up(f_service_);
-
- // Function arguments and results
- string argsname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_pargs";
- string resultname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_presult";
-
- // Serialize the request
- f_service_ <<
- indent() << "int32_t cseqid = 0;" << endl <<
- indent() << "oprot_->writeMessageBegin(\"" << (*f_iter)->get_name() << "\", ::apache::thrift::protocol::T_CALL, cseqid);" << endl <<
- endl <<
- indent() << argsname << " args;" << endl;
-
- for (fld_iter = fields.begin(); fld_iter != fields.end(); ++fld_iter) {
- f_service_ <<
- indent() << "args." << (*fld_iter)->get_name() << " = &" << (*fld_iter)->get_name() << ";" << endl;
- }
-
- f_service_ <<
- indent() << "args.write(oprot_);" << endl <<
- endl <<
- indent() << "oprot_->writeMessageEnd();" << endl <<
- indent() << "oprot_->getTransport()->flush();" << endl <<
- indent() << "oprot_->getTransport()->writeEnd();" << endl;
-
- scope_down(f_service_);
- f_service_ << endl;
-
- // Generate recv function only if not an oneway function
- if (!(*f_iter)->is_oneway()) {
- t_struct noargs(program_);
- t_function recv_function((*f_iter)->get_returntype(),
- string("recv_") + (*f_iter)->get_name(),
- &noargs);
- // Open function
+ // Open the send function
indent(f_service_) <<
- function_signature(&recv_function, scope) << endl;
+ function_signature(&send_function, "", scope) << endl;
scope_up(f_service_);
+ // Function arguments and results
+ string argsname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_pargs";
+ string resultname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_presult";
+
+ // Serialize the request
f_service_ <<
+ indent() << "int32_t cseqid = 0;" << endl <<
+ indent() << "oprot_->writeMessageBegin(\"" << (*f_iter)->get_name() << "\", ::apache::thrift::protocol::T_CALL, cseqid);" << endl <<
endl <<
- indent() << "int32_t rseqid = 0;" << endl <<
- indent() << "std::string fname;" << endl <<
- indent() << "::apache::thrift::protocol::TMessageType mtype;" << endl <<
+ indent() << argsname << " args;" << endl;
+
+ for (fld_iter = fields.begin(); fld_iter != fields.end(); ++fld_iter) {
+ f_service_ <<
+ indent() << "args." << (*fld_iter)->get_name() << " = &" << (*fld_iter)->get_name() << ";" << endl;
+ }
+
+ f_service_ <<
+ indent() << "args.write(oprot_);" << endl <<
endl <<
- indent() << "iprot_->readMessageBegin(fname, mtype, rseqid);" << endl <<
- indent() << "if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {" << endl <<
- indent() << " ::apache::thrift::TApplicationException x;" << endl <<
- indent() << " x.read(iprot_);" << endl <<
- indent() << " iprot_->readMessageEnd();" << endl <<
- indent() << " iprot_->getTransport()->readEnd();" << endl <<
- indent() << " throw x;" << endl <<
- indent() << "}" << endl <<
- indent() << "if (mtype != ::apache::thrift::protocol::T_REPLY) {" << endl <<
- indent() << " iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
- indent() << " iprot_->readMessageEnd();" << endl <<
- indent() << " iprot_->getTransport()->readEnd();" << endl <<
- indent() << " throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);" << endl <<
- indent() << "}" << endl <<
- indent() << "if (fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl <<
- indent() << " iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
- indent() << " iprot_->readMessageEnd();" << endl <<
- indent() << " iprot_->getTransport()->readEnd();" << endl <<
- indent() << " throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME);" << endl <<
- indent() << "}" << endl;
+ indent() << "oprot_->writeMessageEnd();" << endl <<
+ indent() << "oprot_->getTransport()->flush();" << endl <<
+ indent() << "oprot_->getTransport()->writeEnd();" << endl;
- if (!(*f_iter)->get_returntype()->is_void() &&
- !is_complex_type((*f_iter)->get_returntype())) {
- t_field returnfield((*f_iter)->get_returntype(), "_return");
- f_service_ <<
- indent() << declare_field(&returnfield) << endl;
- }
-
- f_service_ <<
- indent() << resultname << " result;" << endl;
-
- if (!(*f_iter)->get_returntype()->is_void()) {
- f_service_ <<
- indent() << "result.success = &_return;" << endl;
- }
-
- f_service_ <<
- indent() << "result.read(iprot_);" << endl <<
- indent() << "iprot_->readMessageEnd();" << endl <<
- indent() << "iprot_->getTransport()->readEnd();" << endl <<
- endl;
-
- // Careful, only look for _result if not a void function
- if (!(*f_iter)->get_returntype()->is_void()) {
- if (is_complex_type((*f_iter)->get_returntype())) {
- f_service_ <<
- indent() << "if (result.__isset.success) {" << endl <<
- indent() << " // _return pointer has now been filled" << endl <<
- indent() << " return;" << endl <<
- indent() << "}" << endl;
- } else {
- f_service_ <<
- indent() << "if (result.__isset.success) {" << endl <<
- indent() << " return _return;" << endl <<
- indent() << "}" << endl;
- }
- }
-
- t_struct* xs = (*f_iter)->get_xceptions();
- const std::vector<t_field*>& xceptions = xs->get_members();
- vector<t_field*>::const_iterator x_iter;
- for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
- f_service_ <<
- indent() << "if (result.__isset." << (*x_iter)->get_name() << ") {" << endl <<
- indent() << " throw result." << (*x_iter)->get_name() << ";" << endl <<
- indent() << "}" << endl;
- }
-
- // We only get here if we are a void function
- if ((*f_iter)->get_returntype()->is_void()) {
- indent(f_service_) <<
- "return;" << endl;
- } else {
- f_service_ <<
- indent() << "throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
- }
-
- // Close function
scope_down(f_service_);
f_service_ << endl;
+
+ // Generate recv function only if not an oneway function
+ if (!(*f_iter)->is_oneway()) {
+ t_struct noargs(program_);
+ t_function recv_function((*f_iter)->get_returntype(),
+ string("recv_") + (*f_iter)->get_name(),
+ &noargs);
+ // Open function
+ indent(f_service_) <<
+ function_signature(&recv_function, "", scope) << endl;
+ scope_up(f_service_);
+
+ f_service_ <<
+ endl <<
+ indent() << "int32_t rseqid = 0;" << endl <<
+ indent() << "std::string fname;" << endl <<
+ indent() << "::apache::thrift::protocol::TMessageType mtype;" << endl <<
+ endl <<
+ indent() << "iprot_->readMessageBegin(fname, mtype, rseqid);" << endl <<
+ indent() << "if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {" << endl <<
+ indent() << " ::apache::thrift::TApplicationException x;" << endl <<
+ indent() << " x.read(iprot_);" << endl <<
+ indent() << " iprot_->readMessageEnd();" << endl <<
+ indent() << " iprot_->getTransport()->readEnd();" << endl <<
+ indent() << " throw x;" << endl <<
+ indent() << "}" << endl <<
+ indent() << "if (mtype != ::apache::thrift::protocol::T_REPLY) {" << endl <<
+ indent() << " iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
+ indent() << " iprot_->readMessageEnd();" << endl <<
+ indent() << " iprot_->getTransport()->readEnd();" << endl <<
+ indent() << " throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);" << endl <<
+ indent() << "}" << endl <<
+ indent() << "if (fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl <<
+ indent() << " iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
+ indent() << " iprot_->readMessageEnd();" << endl <<
+ indent() << " iprot_->getTransport()->readEnd();" << endl <<
+ indent() << " throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME);" << endl <<
+ indent() << "}" << endl;
+
+ if (!(*f_iter)->get_returntype()->is_void() &&
+ !is_complex_type((*f_iter)->get_returntype())) {
+ t_field returnfield((*f_iter)->get_returntype(), "_return");
+ f_service_ <<
+ indent() << declare_field(&returnfield) << endl;
+ }
+
+ f_service_ <<
+ indent() << resultname << " result;" << endl;
+
+ if (!(*f_iter)->get_returntype()->is_void()) {
+ f_service_ <<
+ indent() << "result.success = &_return;" << endl;
+ }
+
+ f_service_ <<
+ indent() << "result.read(iprot_);" << endl <<
+ indent() << "iprot_->readMessageEnd();" << endl <<
+ indent() << "iprot_->getTransport()->readEnd();" << endl <<
+ endl;
+
+ // Careful, only look for _result if not a void function
+ if (!(*f_iter)->get_returntype()->is_void()) {
+ if (is_complex_type((*f_iter)->get_returntype())) {
+ f_service_ <<
+ indent() << "if (result.__isset.success) {" << endl <<
+ indent() << " // _return pointer has now been filled" << endl <<
+ indent() << " return;" << endl <<
+ indent() << "}" << endl;
+ } else {
+ f_service_ <<
+ indent() << "if (result.__isset.success) {" << endl <<
+ indent() << " return _return;" << endl <<
+ indent() << "}" << endl;
+ }
+ }
+
+ t_struct* xs = (*f_iter)->get_xceptions();
+ const std::vector<t_field*>& xceptions = xs->get_members();
+ vector<t_field*>::const_iterator x_iter;
+ for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+ f_service_ <<
+ indent() << "if (result.__isset." << (*x_iter)->get_name() << ") {" << endl <<
+ indent() << " throw result." << (*x_iter)->get_name() << ";" << endl <<
+ indent() << "}" << endl;
+ }
+
+ // We only get here if we are a void function
+ if ((*f_iter)->get_returntype()->is_void()) {
+ indent(f_service_) <<
+ "return;" << endl;
+ } else {
+ f_service_ <<
+ indent() << "throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
+ }
+
+ // Close function
+ scope_down(f_service_);
+ f_service_ << endl;
+ }
}
}
}
@@ -1853,7 +2083,22 @@
*
* @param tservice The service to generate a server for.
*/
-void t_cpp_generator::generate_service_processor(t_service* tservice) {
+void t_cpp_generator::generate_service_processor(t_service* tservice, string style) {
+ string ifstyle;
+ string pstyle;
+ string finish_cob;
+ string finish_cob_decl;
+ string cob_arg;
+ string ret_type = "bool ";
+ if (style == "Cob") {
+ ifstyle = "CobSv";
+ pstyle = "Async";
+ finish_cob = "std::tr1::function<void(bool ok)> cob, ";
+ finish_cob_decl = "std::tr1::function<void(bool ok)>, ";
+ cob_arg = "cob, ";
+ ret_type = "void ";
+ }
+
// Generate the dispatch methods
vector<t_function*> functions = tservice->get_functions();
vector<t_function*>::iterator f_iter;
@@ -1862,13 +2107,13 @@
string extends_processor = "";
if (tservice->get_extends() != NULL) {
extends = type_name(tservice->get_extends());
- extends_processor = ", public " + extends + "Processor";
+ extends_processor = ", public " + extends + pstyle + "Processor";
}
// Generate the header portion
f_header_ <<
- "class " << service_name_ << "Processor : " <<
- "virtual public ::apache::thrift::TProcessor" <<
+ "class " << service_name_ << pstyle << "Processor : " <<
+ "virtual public ::apache::thrift::T" << pstyle << "Processor" <<
extends_processor << " {" << endl;
// Protected data members
@@ -1876,9 +2121,9 @@
" protected:" << endl;
indent_up();
f_header_ <<
- indent() << "boost::shared_ptr<" << service_name_ << "If> iface_;" << endl;
+ indent() << "boost::shared_ptr<" << service_name_ << ifstyle << "If> iface_;" << endl;
f_header_ <<
- indent() << "virtual bool process_fn(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl;
+ indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl;
indent_down();
// Process function declarations
@@ -1886,10 +2131,20 @@
" private:" << endl;
indent_up();
f_header_ <<
- indent() << "std::map<std::string, void (" << service_name_ << "Processor::*)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)> processMap_;" << endl;
+ indent() << "std::map<std::string, void (" << service_name_ << pstyle << "Processor::*)(" << finish_cob_decl << "int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)> processMap_;" << endl;
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
indent(f_header_) <<
- "void process_" << (*f_iter)->get_name() << "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl;
+ "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl;
+ if (style == "Cob") {
+ // XXX Factor this out, even if it is a pain.
+ string ret_arg = ((*f_iter)->get_returntype()->is_void()
+ ? ""
+ : ", const " + type_name((*f_iter)->get_returntype()) + "& _return");
+ // XXX Don't declare throw if it doesn't exist
+ f_header_ <<
+ "void return_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ");" << endl <<
+ "void throw_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw);" << endl;
+ }
}
indent_down();
@@ -1903,6 +2158,7 @@
declare_map += (*f_iter)->get_name();
declare_map += "\"] = &";
declare_map += service_name_;
+ declare_map += pstyle;
declare_map += "Processor::process_";
declare_map += (*f_iter)->get_name();
declare_map += ";\n";
@@ -1911,28 +2167,28 @@
f_header_ <<
" public:" << endl <<
- indent() << service_name_ << "Processor(boost::shared_ptr<" << service_name_ << "If> iface) :" << endl;
+ indent() << service_name_ << pstyle << "Processor(boost::shared_ptr<" << service_name_ << ifstyle << "If> iface) :" << endl;
if (extends.empty()) {
f_header_ <<
indent() << " iface_(iface) {" << endl;
} else {
f_header_ <<
- indent() << " " << extends << "Processor(iface)," << endl <<
+ indent() << " " << extends << pstyle << "Processor(iface)," << endl <<
indent() << " iface_(iface) {" << endl;
}
f_header_ <<
declare_map <<
indent() << "}" << endl <<
endl <<
- indent() << "virtual bool process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl <<
- indent() << "virtual ~" << service_name_ << "Processor() {}" << endl;
+ indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl <<
+ indent() << "virtual ~" << service_name_ << pstyle << "Processor() {}" << endl;
indent_down();
f_header_ <<
"};" << endl << endl;
// Generate the server implementation
f_service_ <<
- "bool " << service_name_ << "Processor::process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" << endl;
+ ret_type << service_name_ << pstyle << "Processor::process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" << endl;
indent_up();
f_service_ <<
@@ -1955,10 +2211,11 @@
indent() << " oprot->writeMessageEnd();" << endl <<
indent() << " oprot->getTransport()->flush();" << endl <<
indent() << " oprot->getTransport()->writeEnd();" << endl <<
- indent() << " return true;" << endl <<
+ indent() << (style == "Cob" ? " return cob(true);" : " return true;") << endl <<
indent() << "}" << endl <<
endl <<
- indent() << "return process_fn(iprot, oprot, fname, seqid);" <<
+ indent() << "return process_fn(" << (style == "Cob" ? "cob, " : "")
+ << "iprot, oprot, fname, seqid);" <<
endl;
indent_down();
@@ -1967,12 +2224,12 @@
endl;
f_service_ <<
- "bool " << service_name_ << "Processor::process_fn(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid) {" << endl;
+ ret_type << service_name_ << pstyle << "Processor::process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid) {" << endl;
indent_up();
// HOT: member function pointer map
f_service_ <<
- indent() << "std::map<std::string, void (" << service_name_ << "Processor::*)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)>::iterator pfn;" << endl <<
+ indent() << "std::map<std::string, void (" << service_name_ << pstyle << "Processor::*)(" << finish_cob_decl << "int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)>::iterator pfn;" << endl <<
indent() << "pfn = processMap_.find(fname);" << endl <<
indent() << "if (pfn == processMap_.end()) {" << endl;
if (extends.empty()) {
@@ -1986,15 +2243,26 @@
indent() << " oprot->writeMessageEnd();" << endl <<
indent() << " oprot->getTransport()->flush();" << endl <<
indent() << " oprot->getTransport()->writeEnd();" << endl <<
- indent() << " return true;" << endl;
+ indent() << (style == "Cob" ? " return cob(true);" : " return true;") << endl;
} else {
f_service_ <<
- indent() << " return " << extends << "Processor::process_fn(iprot, oprot, fname, seqid);" << endl;
+ indent() << " return "
+ << extends << pstyle << "Processor::process_fn("
+ << (style == "Cob" ? "cob, " : "")
+ << "iprot, oprot, fname, seqid);" << endl;
}
f_service_ <<
indent() << "}" << endl <<
- indent() << "(this->*(pfn->second))(seqid, iprot, oprot);" << endl <<
- indent() << "return true;" << endl;
+ indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot);" << endl;
+
+ // TODO(dreiss): return pfn ret?
+ if (style == "Cob") {
+ f_service_ <<
+ indent() << "return;" << endl;
+ } else {
+ f_service_ <<
+ indent() << "return true;" << endl;
+ }
indent_down();
f_service_ <<
@@ -2003,7 +2271,7 @@
// Generate the process subfunctions
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
- generate_process_function(tservice, *f_iter);
+ generate_process_function(tservice, *f_iter, style);
}
}
@@ -2036,8 +2304,11 @@
generate_struct_result_writer(f_service_, &result);
result.set_name(tservice->get_name() + "_" + tfunction->get_name() + "_presult");
- generate_struct_definition(f_header_, &result, false, true, true, false);
+ generate_struct_definition(f_header_, &result, false, true, true, gen_cob_style_);
generate_struct_reader(f_service_, &result, true);
+ if (gen_cob_style_) {
+ generate_struct_writer(f_service_, &result, true);
+ }
}
@@ -2047,165 +2318,303 @@
* @param tfunction The function to write a dispatcher for
*/
void t_cpp_generator::generate_process_function(t_service* tservice,
- t_function* tfunction) {
- // Open function
- f_service_ <<
- "void " << tservice->get_name() << "Processor::" <<
- "process_" << tfunction->get_name() <<
- "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl;
- scope_up(f_service_);
-
- string argsname = tservice->get_name() + "_" + tfunction->get_name() + "_args";
- string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result";
-
- f_service_ <<
- indent() << "void* ctx = NULL;" << endl <<
- indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
- indent() << "}" << endl <<
- indent() << "// A glorified finally block since ctx is a void*" << endl <<
- indent() << "class ContextFreer {" << endl <<
- indent() << " public:" << endl <<
- indent() << " ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
- indent() << " handler_(handler), context_(context) {}" << endl <<
- indent() << " ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
- indent() << " private:" << endl <<
- indent() << " ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
- indent() << " void* context_;" << endl <<
- indent() << "};" << endl <<
- indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
- indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
- indent() << "}" << endl << endl <<
- indent() << argsname << " args;" << endl <<
- indent() << "args.read(iprot);" << endl <<
- indent() << "iprot->readMessageEnd();" << endl <<
- indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
- indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
- indent() << "}" << endl <<
- endl;
+ t_function* tfunction,
+ string style) {
+ t_struct* arg_struct = tfunction->get_arglist();
+ const std::vector<t_field*>& fields = arg_struct->get_members();
+ vector<t_field*>::const_iterator f_iter;
t_struct* xs = tfunction->get_xceptions();
const std::vector<t_field*>& xceptions = xs->get_members();
vector<t_field*>::const_iterator x_iter;
- // Declare result
- if (!tfunction->is_oneway()) {
+ // I tried to do this as one function. I really did. But it was too hard.
+ if (style != "Cob") {
+ // Open function
f_service_ <<
- indent() << resultname << " result;" << endl;
- }
+ "void " << tservice->get_name() << "Processor::" <<
+ "process_" << tfunction->get_name() << "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl;
+ scope_up(f_service_);
- // Try block for functions with exceptions
- f_service_ <<
- indent() << "try {" << endl;
- indent_up();
+ string argsname = tservice->get_name() + "_" + tfunction->get_name() + "_args";
+ string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result";
- // Generate the function call
- t_struct* arg_struct = tfunction->get_arglist();
- const std::vector<t_field*>& fields = arg_struct->get_members();
- vector<t_field*>::const_iterator f_iter;
+ f_service_ <<
+ indent() << "void* ctx = NULL;" << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl <<
+ indent() << "// A glorified finally block since ctx is a void*" << endl <<
+ indent() << "class ContextFreer {" << endl <<
+ indent() << " public:" << endl <<
+ indent() << " ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
+ indent() << " handler_(handler), context_(context) {}" << endl <<
+ indent() << " ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
+ indent() << " private:" << endl <<
+ indent() << " ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
+ indent() << " void* context_;" << endl <<
+ indent() << "};" << endl <<
+ indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl << endl <<
+ indent() << argsname << " args;" << endl <<
+ indent() << "args.read(iprot);" << endl <<
+ indent() << "iprot->readMessageEnd();" << endl <<
+ indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl <<
+ endl;
- bool first = true;
- f_service_ << indent();
- if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
- if (is_complex_type(tfunction->get_returntype())) {
- first = false;
- f_service_ << "iface_->" << tfunction->get_name() << "(result.success";
- } else {
- f_service_ << "result.success = iface_->" << tfunction->get_name() << "(";
+ // Declare result
+ if (!tfunction->is_oneway()) {
+ f_service_ <<
+ indent() << resultname << " result;" << endl;
}
- } else {
+
+ // Try block for functions with exceptions
f_service_ <<
- "iface_->" << tfunction->get_name() << "(";
- }
- for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
- if (first) {
- first = false;
- } else {
- f_service_ << ", ";
- }
- f_service_ << "args." << (*f_iter)->get_name();
- }
- f_service_ << ");" << endl;
+ indent() << "try {" << endl;
+ indent_up();
- // Set isset on success field
- if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
- f_service_ <<
- indent() << "result.__isset.success = true;" << endl;
- }
-
- indent_down();
- f_service_ << indent() << "}";
-
- if (!tfunction->is_oneway()) {
- for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
- f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl;
- if (!tfunction->is_oneway()) {
- indent_up();
- f_service_ <<
- indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl <<
- indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
- indent_down();
- f_service_ << indent() << "}";
+ // Generate the function call
+ bool first = true;
+ f_service_ << indent();
+ if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
+ if (is_complex_type(tfunction->get_returntype())) {
+ first = false;
+ f_service_ << "iface_->" << tfunction->get_name() << "(result.success";
} else {
- f_service_ << "}";
+ f_service_ << "result.success = iface_->" << tfunction->get_name() << "(";
+ }
+ } else {
+ f_service_ <<
+ "iface_->" << tfunction->get_name() << "(";
+ }
+ for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+ if (first) {
+ first = false;
+ } else {
+ f_service_ << ", ";
+ }
+ f_service_ << "args." << (*f_iter)->get_name();
+ }
+ f_service_ << ");" << endl;
+
+ // Set isset on success field
+ if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
+ f_service_ <<
+ indent() << "result.__isset.success = true;" << endl;
+ }
+
+ indent_down();
+ f_service_ << indent() << "}";
+
+ if (!tfunction->is_oneway()) {
+ for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+ f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl;
+ if (!tfunction->is_oneway()) {
+ indent_up();
+ f_service_ <<
+ indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl <<
+ indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
+ indent_down();
+ f_service_ << indent() << "}";
+ } else {
+ f_service_ << "}";
+ }
}
}
- }
- f_service_ << " catch (const std::exception& e) {" << endl;
+ f_service_ << " catch (const std::exception& e) {" << endl;
- indent_up();
- f_service_ <<
- indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
- indent() << "}" << endl;
+ indent_up();
+ f_service_ <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl;
if (!tfunction->is_oneway()) {
f_service_ <<
endl <<
- indent() << "::apache::thrift::TApplicationException x(e.what());" << endl <<
- indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
- indent() << "x.write(oprot);" << endl <<
- indent() << "oprot->writeMessageEnd();" << endl <<
- indent() << "oprot->getTransport()->flush();" << endl <<
- indent() << "oprot->getTransport()->writeEnd();" << endl;
+ indent() << "::apache::thrift::TApplicationException x(e.what());" << endl <<
+ indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
+ indent() << "x.write(oprot);" << endl <<
+ indent() << "oprot->writeMessageEnd();" << endl <<
+ indent() << "oprot->getTransport()->flush();" << endl <<
+ indent() << "oprot->getTransport()->writeEnd();" << endl;
}
f_service_ << indent() << "return;" << endl;
indent_down();
f_service_ << indent() << "}" << endl << endl;
- // Shortcut out here for oneway functions
- if (tfunction->is_oneway()) {
- f_service_ <<
- indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
- indent() << "}" << endl << endl <<
- indent() << "return;" << endl;
- // Close function
- scope_down(f_service_);
- f_service_ << endl;
- return;
- }
+ // Shortcut out here for oneway functions
+ if (tfunction->is_oneway()) {
+ f_service_ <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl << endl <<
+ indent() << "return;" << endl;
+ indent_down();
+ f_service_ << "}" << endl <<
+ endl;
+ return;
+ }
- // Serialize the result into a struct
- f_service_ <<
+ // Serialize the result into a struct
+ f_service_ <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
indent() << " eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
indent() << "}" << endl << endl <<
- indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
- indent() << "result.write(oprot);" << endl <<
- indent() << "oprot->writeMessageEnd();" << endl <<
- indent() << "oprot->getTransport()->flush();" << endl <<
- indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
- indent() << "if (eventHandler_.get() != NULL) {" << endl <<
- indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
- indent() << "}" << endl;
+ indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
+ indent() << "result.write(oprot);" << endl <<
+ indent() << "oprot->writeMessageEnd();" << endl <<
+ indent() << "oprot->getTransport()->flush();" << endl <<
+ indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl;
- // Close function
- scope_down(f_service_);
- f_service_ << endl;
+ // Close function
+ scope_down(f_service_);
+ f_service_ << endl;
+ }
+
+ // Cob style.
+ else {
+ // Processor entry point.
+ f_service_ <<
+ "void " << tservice->get_name() << "AsyncProcessor::" <<
+ "process_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl;
+ scope_up(f_service_);
+
+ f_service_ <<
+ indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl <<
+ indent() << "try {" << endl;
+ indent_up();
+ f_service_ <<
+ indent() << "args.read(iprot);" << endl <<
+ indent() << "iprot->readMessageEnd();" << endl <<
+ indent() << "iprot->getTransport()->readEnd();" << endl;
+ scope_down(f_service_);
+
+ // TODO(dreiss): Handle TExceptions? Expose to server?
+ f_service_ <<
+ indent() << "catch (const std::exception& exn) {" << endl <<
+ indent() << " return cob(false);" << endl <<
+ indent() << "}" << endl;
+
+ // TODO(dreiss): Figure out a strategy for exceptions in async handlers.
+ f_service_ <<
+ indent() << "iface_->" << tfunction->get_name() << "(";
+ indent_up(); indent_up();
+ if (tfunction->is_oneway()) {
+ // No return. Just hand off our cob.
+ // TODO(dreiss): Call the cob immediately?
+ f_service_ <<
+ "std::tr1::bind(cob, true)" << endl;
+ } else {
+ f_service_ << endl;
+ string ret_placeholder = ", std::tr1::placeholders::_1";
+ string comma = "";
+ if (tfunction->get_returntype()->is_void()) {
+ ret_placeholder = "";
+ }
+ f_service_ <<
+ indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
+ << "return_" << tfunction->get_name()
+ << ", this, cob, seqid, oprot" << ret_placeholder << ")";
+ if (!xceptions.empty()) {
+ f_service_
+ << ',' << endl <<
+ indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
+ << "throw_" << tfunction->get_name()
+ << ", this, cob, seqid, oprot, std::tr1::placeholders::_1)";
+ }
+ }
+
+ // XXX Whitespace cleanup.
+ for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+ f_service_
+ << ',' << endl <<
+ indent() << "args." << (*f_iter)->get_name();
+ }
+ f_service_ << ");" << endl;
+ indent_down(); indent_down();
+ scope_down(f_service_);
+ f_service_ << endl;
+
+ // Normal return.
+ if (!tfunction->is_oneway()) {
+ string ret_arg = (tfunction->get_returntype()->is_void()
+ ? ""
+ : ", const " + type_name(tfunction->get_returntype()) + "& _return");
+ f_service_ <<
+ "void " << tservice->get_name() << "AsyncProcessor::" <<
+ "return_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ')' << endl;
+ scope_up(f_service_);
+ f_service_ <<
+ indent() << tservice->get_name() << "_" << tfunction->get_name() << "_presult result;" << endl;
+ if (!tfunction->get_returntype()->is_void()) {
+ // The const_cast here is unfortunate, but it would be a pain to avoid,
+ // and we only do a write with this struct, which is const-safe.
+ f_service_ <<
+ indent() << "result.success = const_cast<" << type_name(tfunction->get_returntype()) << "*>(&_return);" << endl <<
+ indent() << "result.__isset.success = true;" << endl;
+ }
+ // Serialize the result into a struct
+ f_service_ <<
+ endl <<
+ indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
+ indent() << "result.write(oprot);" << endl <<
+ indent() << "oprot->writeMessageEnd();" << endl <<
+ indent() << "oprot->getTransport()->flush();" << endl <<
+ indent() << "oprot->getTransport()->writeEnd();" << endl <<
+ indent() << "return cob(true);" << endl;
+ scope_down(f_service_);
+ f_service_ << endl;
+ }
+
+ // Exception return.
+ if (!tfunction->is_oneway() && !xceptions.empty()) {
+ f_service_ <<
+ "void " << tservice->get_name() << "AsyncProcessor::" <<
+ "throw_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw)" << endl;
+ scope_up(f_service_);
+ f_service_ <<
+ indent() << tservice->get_name() << "_" << tfunction->get_name() << "_result result;" << endl << endl <<
+ indent() << "try {" << endl;
+ indent_up();
+ f_service_ <<
+ indent() << "_throw->throw_it();" << endl <<
+ indent() << "return cob(false);" << endl; // Is this possible? TBD.
+ indent_down();
+ f_service_ <<
+ indent() << '}';
+ for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+ f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl;
+ indent_up();
+ f_service_ <<
+ indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl <<
+ indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
+ scope_down(f_service_);
+ }
+
+ // Serialize the result into a struct
+ f_service_ <<
+ endl <<
+ indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
+ indent() << "result.write(oprot);" << endl <<
+ indent() << "oprot->writeMessageEnd();" << endl <<
+ indent() << "oprot->getTransport()->flush();" << endl <<
+ indent() << "oprot->getTransport()->writeEnd();" << endl <<
+ indent() << "return cob(true);" << endl;
+
+ scope_down(f_service_);
+ f_service_ << endl;
+ } // for each function
+ } // cob style
}
/**
@@ -2261,7 +2670,7 @@
vector<t_function*>::iterator f_iter;
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
f_skeleton <<
- indent() << function_signature(*f_iter) << " {" << endl <<
+ indent() << function_signature(*f_iter, "") << " {" << endl <<
indent() << " // Your implementation goes here" << endl <<
indent() << " printf(\"" << (*f_iter)->get_name() << "\\n\");" << endl <<
indent() << "}" << endl <<
@@ -2924,21 +3333,46 @@
* @return String of rendered function definition
*/
string t_cpp_generator::function_signature(t_function* tfunction,
+ string style,
string prefix,
bool name_params) {
t_type* ttype = tfunction->get_returntype();
t_struct* arglist = tfunction->get_arglist();
+ bool has_xceptions = !tfunction->get_xceptions()->get_members().empty();
- if (is_complex_type(ttype)) {
- bool empty = arglist->get_members().size() == 0;
+ if (style == "") {
+ if (is_complex_type(ttype)) {
+ return
+ "void " + prefix + tfunction->get_name() +
+ "(" + type_name(ttype) + (name_params ? "& _return" : "& /* _return */") +
+ argument_list(arglist, name_params, true) + ")";
+ } else {
+ return
+ type_name(ttype) + " " + prefix + tfunction->get_name() +
+ "(" + argument_list(arglist, name_params) + ")";
+ }
+ } else if (style.substr(0,3) == "Cob") {
+ string cob_type;
+ string exn_cob;
+ if (style == "CobCl") {
+ cob_type = "(" + service_name_ + "CobClient* client)";
+ } else if (style =="CobSv") {
+ cob_type = (ttype->is_void()
+ ? "()"
+ : ("(" + type_name(ttype) + " const& _return)"));
+ if (has_xceptions) {
+ exn_cob = ", std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob";
+ }
+ } else {
+ throw "UNKNOWN STYLE";
+ }
+
return
"void " + prefix + tfunction->get_name() +
- "(" + type_name(ttype) + (name_params ? "& _return" : "& /* _return */") +
- (empty ? "" : (", " + argument_list(arglist, name_params))) + ")";
+ "(std::tr1::function<void" + cob_type + "> cob" + exn_cob +
+ argument_list(arglist, name_params, true) + ")";
} else {
- return
- type_name(ttype) + " " + prefix + tfunction->get_name() +
- "(" + argument_list(arglist, name_params) + ")";
+ throw "UNKNOWN STYLE";
}
}
@@ -2948,12 +3382,12 @@
* @param tstruct The struct definition
* @return Comma sepearated list of all field names in that struct
*/
-string t_cpp_generator::argument_list(t_struct* tstruct, bool name_params) {
+string t_cpp_generator::argument_list(t_struct* tstruct, bool name_params, bool start_comma) {
string result = "";
const vector<t_field*>& fields = tstruct->get_members();
vector<t_field*>::const_iterator f_iter;
- bool first = true;
+ bool first = !start_comma;
for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
if (first) {
first = false;
diff --git a/configure.ac b/configure.ac
index bf8ef6d..320c250 100644
--- a/configure.ac
+++ b/configure.ac
@@ -311,6 +311,14 @@
AX_SIGNED_RIGHT_SHIFT
fi
+dnl autoscan thinks we need this macro because we have a member function
+dnl called "error". Invoke the macro but don't run the check so autoscan
+dnl thinks we are in the clear. It's highly unlikely that we will ever
+dnl actually use the function that this checks for.
+if false ; then
+ AC_FUNC_ERROR_AT_LINE
+fi
+
AX_THRIFT_GEN(cpp, [C++], yes)
AM_CONDITIONAL([THRIFT_GEN_cpp], [test "$ax_thrift_gen_cpp" = "yes"])
AX_THRIFT_GEN(java, [Java], yes)
diff --git a/contrib/async-test/Makefile b/contrib/async-test/Makefile
new file mode 100644
index 0000000..33e7f8a
--- /dev/null
+++ b/contrib/async-test/Makefile
@@ -0,0 +1,33 @@
+THRIFT = thrift
+CXXFLAGS = `pkg-config --cflags thrift thrift-nb` -levent
+LDLIBS = `pkg-config --libs thrift thrift-nb` -levent
+CXXFLAGS += -g -O0
+
+GENNAMES = Aggr aggr_types
+GENHEADERS = $(addsuffix .h, $(GENNAMES))
+GENSRCS = $(addsuffix .cpp, $(GENNAMES))
+GENOBJS = $(addsuffix .o, $(GENNAMES))
+
+PYLIBS = aggr/__init__.py
+
+PROGS = test-server
+
+all: $(PYLIBS) $(PROGS)
+
+test-server: test-server.o $(GENOBJS)
+
+test-server.o: $(GENSRCS)
+
+aggr/__init__.py: aggr.thrift
+ $(RM) $(dir $@)
+ $(THRIFT) --gen py:newstyle $<
+ mv gen-py/$(dir $@) .
+
+$(GENSRCS): aggr.thrift
+ $(THRIFT) --gen cpp:cob_style $<
+ mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) .
+
+clean:
+ $(RM) -r *.o $(PROGS) $(GENSRCS) $(GENHEADERS) gen-* aggr
+
+.PHONY: clean
diff --git a/contrib/async-test/aggr.thrift b/contrib/async-test/aggr.thrift
new file mode 100644
index 0000000..c016a65
--- /dev/null
+++ b/contrib/async-test/aggr.thrift
@@ -0,0 +1,8 @@
+exception Error {
+ 1: string desc;
+}
+
+service Aggr {
+ void addValue(1: i32 value);
+ list<i32> getValues() throws (1: Error err);
+}
diff --git a/contrib/async-test/test-leaf.py b/contrib/async-test/test-leaf.py
new file mode 100755
index 0000000..8b7c3e3
--- /dev/null
+++ b/contrib/async-test/test-leaf.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+import sys
+import time
+from thrift.transport import TTransport
+from thrift.transport import TSocket
+from thrift.protocol import TBinaryProtocol
+from thrift.server import THttpServer
+from aggr import Aggr
+
+class AggrHandler(Aggr.Iface):
+ def __init__(self):
+ self.values = []
+
+ def addValue(self, value):
+ self.values.append(value)
+
+ def getValues(self, ):
+ time.sleep(1)
+ return self.values
+
+processor = Aggr.Processor(AggrHandler())
+pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+THttpServer.THttpServer(processor, ('', int(sys.argv[1])), pfactory).serve()
diff --git a/contrib/async-test/test-server.cpp b/contrib/async-test/test-server.cpp
new file mode 100644
index 0000000..a55c348
--- /dev/null
+++ b/contrib/async-test/test-server.cpp
@@ -0,0 +1,97 @@
+#include <tr1/functional>
+#include "protocol/TBinaryProtocol.h"
+#include "async/TAsyncProtocolProcessor.h"
+#include "async/TEvhttpServer.h"
+#include "async/TEvhttpClientChannel.h"
+#include "Aggr.h"
+
+using std::tr1::bind;
+using std::tr1::placeholders::_1;
+
+using apache::thrift::TException;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::async::TEvhttpServer;
+using apache::thrift::async::TAsyncProcessor;
+using apache::thrift::async::TAsyncBufferProcessor;
+using apache::thrift::async::TAsyncProtocolProcessor;
+using apache::thrift::async::TAsyncChannel;
+using apache::thrift::async::TEvhttpClientChannel;
+
+class AggrAsyncHandler : public AggrCobSvIf {
+ protected:
+ struct RequestContext {
+ std::tr1::function<void(std::vector<int32_t> const& _return)> cob;
+ std::vector<int32_t> ret;
+ int pending_calls;
+ };
+
+ public:
+ AggrAsyncHandler()
+ : eb_(NULL)
+ , pfact_(new TBinaryProtocolFactory())
+ {
+ leaf_ports_.push_back(8081);
+ leaf_ports_.push_back(8082);
+ }
+
+ void addValue(std::tr1::function<void()> cob, const int32_t value) {
+ // Silently drop writes to the aggrgator.
+ return cob();
+ }
+
+ void getValues(std::tr1::function<void(
+ std::vector<int32_t> const& _return)> cob,
+ std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
+ RequestContext* ctx = new RequestContext();
+ ctx->cob = cob;
+ ctx->pending_calls = leaf_ports_.size();
+ for (std::vector<int>::iterator it = leaf_ports_.begin();
+ it != leaf_ports_.end(); ++it) {
+ boost::shared_ptr<TAsyncChannel> channel(
+ new TEvhttpClientChannel(
+ "localhost", "/", "127.0.0.1", *it, eb_));
+ AggrCobClient* client = new AggrCobClient(channel, pfact_.get());
+ client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1));
+ }
+ }
+
+ void setEventBase(struct event_base* eb) {
+ eb_ = eb;
+ }
+
+ void clientReturn(RequestContext* ctx, AggrCobClient* client) {
+ ctx->pending_calls -= 1;
+
+ try {
+ std::vector<int32_t> subret;
+ client->recv_getValues(subret);
+ ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end());
+ } catch (TException& exn) {
+ // TODO: Log error
+ }
+
+ delete client;
+
+ if (ctx->pending_calls == 0) {
+ ctx->cob(ctx->ret);
+ delete ctx;
+ }
+ }
+
+ protected:
+ struct event_base* eb_;
+ std::vector<int> leaf_ports_;
+ boost::shared_ptr<TProtocolFactory> pfact_;
+};
+
+
+int main() {
+ boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler());
+ boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler));
+ boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory());
+ boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact));
+ boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080));
+ handler->setEventBase(server->getEventBase());
+ server->serve();
+}
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 664a58a..39382c8 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -70,9 +70,11 @@
src/server/TSimpleServer.cpp \
src/server/TThreadPoolServer.cpp \
src/server/TThreadedServer.cpp \
+ src/async/TAsyncChannel.cpp \
src/processor/PeekProcessor.cpp
-libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp
+libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp \
+ src/async/TAsyncProtocolProcessor.cpp
libthriftz_la_SOURCES = src/transport/TZlibTransport.cpp
@@ -148,6 +150,17 @@
src/processor/PeekProcessor.h \
src/processor/StatsProcessor.h
+include_asyncdir = $(include_thriftdir)/async
+include_async_HEADERS = \
+ src/async/TAsyncChannel.h \
+ src/async/TAsyncProcessor.h \
+ src/async/TAsyncBufferProcessor.h \
+ src/async/TAsyncProtocolProcessor.h \
+ src/async/TEvhttpClientChannel.h \
+ src/async/TEvhttpServer.h \
+ src/async/SimpleCallback.h
+
+
noinst_PROGRAMS = concurrency_test
concurrency_test_SOURCES = src/concurrency/test/Tests.cpp \
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index 22c10f1..f71a50b 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -119,4 +119,4 @@
}} // apache::thrift
-#endif // #ifndef _THRIFT_PROCESSOR_H_
+#endif // #ifndef _THRIFT_TPROCESSOR_H_
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index 1bce23f..cb7d55a 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -110,6 +110,29 @@
struct TypeSpec;
}}
+class TDelayedException {
+ public:
+ template <class E> static TDelayedException* delayException(const E& e);
+ virtual void throw_it() = 0;
+ virtual ~TDelayedException() {};
+};
+
+template <class E> class TExceptionWrapper : public TDelayedException {
+ public:
+ TExceptionWrapper(const E& e) : e_(e) {}
+ virtual void throw_it() {
+ E temp(e_);
+ delete this;
+ throw temp;
+ }
+ private:
+ E e_;
+};
+
+template <class E>
+TDelayedException* TDelayedException::delayException(const E& e) {
+ return new TExceptionWrapper<E>(e);
+}
}} // apache::thrift
diff --git a/lib/cpp/src/async/SimpleCallback.h b/lib/cpp/src/async/SimpleCallback.h
new file mode 100644
index 0000000..4218328
--- /dev/null
+++ b/lib/cpp/src/async/SimpleCallback.h
@@ -0,0 +1,98 @@
+#ifndef _THRIFT_ASYNC_SIMPLECALLBACK_H_
+#define _THRIFT_ASYNC_SIMPLECALLBACK_H_ 1
+
+#include <Thrift.h>
+namespace apache { namespace thrift {
+
+/**
+ * A template class for forming simple method callbacks with either an empty
+ * argument list or one argument of known type.
+ *
+ * For more efficiency where tr1::function is overkill.
+ */
+
+template<typename C, ///< class whose method we wish to wrap
+ typename A = void, ///< type of argument
+ typename R = void> ///< type of return value
+class SimpleCallback {
+ typedef R (C::*cfptr_t)(A); ///< pointer-to-member-function type
+ cfptr_t fptr_; ///< the embedded function pointer
+ C* obj_; ///< object whose function we're wrapping
+ public:
+ /**
+ * Constructor for empty callback object.
+ */
+ SimpleCallback() :
+ fptr_(NULL), obj_(NULL) {}
+ /**
+ * Construct callback wrapper for member function.
+ *
+ * @param fptr pointer-to-member-function
+ * @param "this" for object associated with callback
+ */
+ SimpleCallback(cfptr_t fptr, const C* obj) :
+ fptr_(fptr), obj_(const_cast<C*>(obj))
+ {}
+
+ /**
+ * Make a call to the member function we've wrapped.
+ *
+ * @param i argument for the wrapped member function
+ * @return value from that function
+ */
+ R operator()(A i) const {
+ (obj_->*fptr_)(i);
+ }
+
+ operator bool() const {
+ return obj_ != NULL && fptr_ != NULL;
+ }
+
+ ~SimpleCallback() {}
+};
+
+/**
+ * Specialization of SimpleCallback for empty argument list.
+ */
+template<typename C, ///< class whose method we wish to wrap
+ typename R> ///< type of return value
+class SimpleCallback<C, void, R> {
+ typedef R (C::*cfptr_t)(); ///< pointer-to-member-function type
+ cfptr_t fptr_; ///< the embedded function pointer
+ C* obj_; ///< object whose function we're wrapping
+ public:
+ /**
+ * Constructor for empty callback object.
+ */
+ SimpleCallback() :
+ fptr_(NULL), obj_(NULL) {}
+
+ /**
+ * Construct callback wrapper for member function.
+ *
+ * @param fptr pointer-to-member-function
+ * @param obj "this" for object associated with callback
+ */
+ SimpleCallback(cfptr_t fptr, const C* obj) :
+ fptr_(fptr), obj_(const_cast<C*>(obj))
+ {}
+
+ /**
+ * Make a call to the member function we've wrapped.
+ *
+ * @return value from that function
+ */
+ R operator()() const {
+ (obj_->*fptr_)();
+ }
+
+ operator bool() const {
+ return obj_ != NULL && fptr_ != NULL;
+ }
+
+ ~SimpleCallback() {}
+};
+
+}} // apache::thrift
+
+#endif /* !_THRIFT_ASYNC_SIMPLECALLBACK_H_ */
diff --git a/lib/cpp/src/async/TAsyncBufferProcessor.h b/lib/cpp/src/async/TAsyncBufferProcessor.h
new file mode 100644
index 0000000..06a503e
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncBufferProcessor.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_
+#define _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ 1
+
+#include <tr1/functional>
+#include <boost/shared_ptr.hpp>
+
+#include "transport/TBufferTransports.h"
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncBufferProcessor {
+ public:
+ // Process data in "in", putting the result in "out".
+ // Call _return(true) when done, or _return(false) to
+ // forcefully close the connection (if applicable).
+ // "in" and "out" should be TMemoryBuffer or similar,
+ // not a wrapper around a socket.
+ virtual void process(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf,
+ boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf) = 0;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_
diff --git a/lib/cpp/src/async/TAsyncChannel.cpp b/lib/cpp/src/async/TAsyncChannel.cpp
new file mode 100644
index 0000000..2bf02fe
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncChannel.cpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <async/TAsyncChannel.h>
+#include <tr1/functional>
+
+namespace apache { namespace thrift { namespace async {
+
+bool TAsyncChannel::sendAndRecvMessage(const VoidCallback& cob,
+ TMemoryBuffer* sendBuf,
+ TMemoryBuffer* recvBuf) {
+ std::tr1::function<void()> send_done =
+ std::tr1::bind(&TAsyncChannel::recvMessage, this, cob, recvBuf);
+
+ return sendMessage(send_done, sendBuf);
+}
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TAsyncChannel.h b/lib/cpp/src/async/TAsyncChannel.h
new file mode 100644
index 0000000..d5cd419
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncChannel.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_
+#define _THRIFT_ASYNC_TASYNCCHANNEL_H_ 1
+
+#include <tr1/functional>
+#include <Thrift.h>
+#include <transport/TTransportUtils.h>
+
+namespace apache { namespace thrift { namespace transport {
+class TMemoryBuffer;
+}}}
+
+namespace apache { namespace thrift { namespace async {
+using apache::thrift::transport::TMemoryBuffer;
+
+class TAsyncTransport;
+
+class TAsyncChannel {
+ public:
+ typedef std::tr1::function<void()> VoidCallback;
+
+ virtual ~TAsyncChannel() {}
+
+ // is the channel in a good state?
+ virtual bool good() const = 0;
+ virtual bool error() const = 0;
+ virtual bool timedOut() const = 0;
+
+ /**
+ * Send a message over the channel.
+ *
+ * @return true iff the cob has been or will be called, else false
+ */
+ virtual bool sendMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0;
+
+ /**
+ * Receive a message from the channel.
+ *
+ * @return true iff the cob has been or will be called, else false
+ */
+ virtual bool recvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0;
+
+ /**
+ * Send a message over the channel and receive a response.
+ *
+ * @return true iff the cob has been or will be called, else false
+ */
+ virtual bool sendAndRecvMessage(const VoidCallback& cob,
+ apache::thrift::transport::TMemoryBuffer* sendBuf,
+ apache::thrift::transport::TMemoryBuffer* recvBuf);
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_
diff --git a/lib/cpp/src/async/TAsyncProcessor.h b/lib/cpp/src/async/TAsyncProcessor.h
new file mode 100644
index 0000000..abf5816
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncProcessor.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TASYNCPROCESSOR_H_
+#define _THRIFT_TASYNCPROCESSOR_H_ 1
+
+#include <tr1/functional>
+#include <boost/shared_ptr.hpp>
+#include <protocol/TProtocol.h>
+#include <TProcessor.h>
+
+namespace apache { namespace thrift { namespace async {
+
+/**
+ * Async version of a TProcessor. It is not expected to complete by the time
+ * the call to process returns. Instead, it calls a cob to signal completion.
+ */
+class TAsyncProcessor {
+ public:
+ virtual ~TAsyncProcessor() {}
+
+ virtual void process(std::tr1::function<void(bool success)> _return,
+ boost::shared_ptr<protocol::TProtocol> in,
+ boost::shared_ptr<protocol::TProtocol> out) = 0;
+
+ void process(std::tr1::function<void(bool success)> _return,
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> io) {
+ return process(_return, io, io);
+ }
+
+ protected:
+ TAsyncProcessor() {}
+};
+
+}}} // apache::thrift::async
+
+// XXX I'm lazy for now
+namespace apache { namespace thrift {
+using apache::thrift::async::TAsyncProcessor;
+}}
+
+#endif // #ifndef _THRIFT_TASYNCPROCESSOR_H_
diff --git a/lib/cpp/src/async/TAsyncProtocolProcessor.cpp b/lib/cpp/src/async/TAsyncProtocolProcessor.cpp
new file mode 100644
index 0000000..05d504b
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncProtocolProcessor.cpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TAsyncProtocolProcessor.h"
+
+using apache::thrift::transport::TBufferBase;
+using apache::thrift::protocol::TProtocol;
+
+namespace apache { namespace thrift { namespace async {
+
+void TAsyncProtocolProcessor::process(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<TBufferBase> ibuf,
+ boost::shared_ptr<TBufferBase> obuf) {
+ boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf));
+ boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf));
+ return underlying_->process(
+ std::tr1::bind(
+ &TAsyncProtocolProcessor::finish,
+ _return,
+ oprot,
+ std::tr1::placeholders::_1),
+ iprot, oprot);
+}
+
+/* static */ void TAsyncProtocolProcessor::finish(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<TProtocol> oprot,
+ bool healthy) {
+ // This is a stub function to hold a reference to oprot.
+ return _return(healthy);
+}
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TAsyncProtocolProcessor.h b/lib/cpp/src/async/TAsyncProtocolProcessor.h
new file mode 100644
index 0000000..7ec718b
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncProtocolProcessor.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TNAME_ME_H_
+#define _THRIFT_TNAME_ME_H_ 1
+
+#include "TAsyncProcessor.h"
+#include "TAsyncBufferProcessor.h"
+#include "protocol/TProtocol.h"
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncProtocolProcessor : public TAsyncBufferProcessor {
+ public:
+ TAsyncProtocolProcessor(
+ boost::shared_ptr<TAsyncProcessor> underlying,
+ boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact)
+ : underlying_(underlying)
+ , pfact_(pfact)
+ {}
+
+ virtual void process(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf,
+ boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf);
+
+ private:
+ static void finish(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> oprot,
+ bool healthy);
+
+ boost::shared_ptr<TAsyncProcessor> underlying_;
+ boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact_;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TNAME_ME_H_
diff --git a/lib/cpp/src/async/TEvhttpClientChannel.cpp b/lib/cpp/src/async/TEvhttpClientChannel.cpp
new file mode 100644
index 0000000..54676a1
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpClientChannel.cpp
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TEvhttpClientChannel.h"
+#include <evhttp.h>
+
+namespace apache { namespace thrift { namespace async {
+
+
+TEvhttpClientChannel::TEvhttpClientChannel(
+ const std::string& host,
+ const std::string& path,
+ const char* address,
+ int port,
+ struct event_base* eb)
+ : host_(host)
+ , path_(path)
+ , recvBuf_(NULL)
+ , conn_(NULL)
+{
+ conn_ = evhttp_connection_new(address, port);
+ if (conn_ == NULL) {
+ abort(); // XXX
+ }
+ evhttp_connection_set_base(conn_, eb);
+}
+
+
+TEvhttpClientChannel::~TEvhttpClientChannel() {
+ if (conn_ != NULL) {
+ evhttp_connection_free(conn_);
+ }
+}
+
+
+bool TEvhttpClientChannel::sendAndRecvMessage(
+ const VoidCallback& cob,
+ apache::thrift::transport::TMemoryBuffer* sendBuf,
+ apache::thrift::transport::TMemoryBuffer* recvBuf) {
+ cob_ = cob;
+ recvBuf_ = recvBuf;
+
+ struct evhttp_request* req = evhttp_request_new(response, this);
+ if (req == NULL) {
+ abort(); // XXX
+ }
+
+ int rv;
+
+ rv = evhttp_add_header(req->output_headers, "Host", host_.c_str());
+ if (rv != 0) {
+ abort(); // XXX
+ }
+
+ rv = evhttp_add_header(req->output_headers, "Content-Type", "application/x-thrift");
+ if (rv != 0) {
+ abort(); // XXX
+ }
+
+ uint8_t* obuf;
+ uint32_t sz;
+ sendBuf->getBuffer(&obuf, &sz);
+ rv = evbuffer_add(req->output_buffer, obuf, sz);
+ if (rv != 0) {
+ abort(); // XXX
+ }
+
+ rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());
+ if (rv != 0) {
+ abort(); // XXX
+ }
+
+ return true;
+}
+
+
+bool TEvhttpClientChannel::sendMessage(
+ const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) {
+ abort(); // XXX
+}
+
+
+bool TEvhttpClientChannel::recvMessage(
+ const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) {
+ abort(); // XXX
+}
+
+
+void TEvhttpClientChannel::finish(struct evhttp_request* req) {
+ if (req == NULL) {
+ return cob_();
+ } else if (req->response_code != 200) {
+ return cob_();
+ }
+ recvBuf_->resetBuffer(
+ EVBUFFER_DATA(req->input_buffer),
+ EVBUFFER_LENGTH(req->input_buffer));
+ return cob_();
+}
+
+
+/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) {
+ TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg;
+ self->finish(req);
+}
+
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TEvhttpClientChannel.h b/lib/cpp/src/async/TEvhttpClientChannel.h
new file mode 100644
index 0000000..d2bc4b3
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpClientChannel.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_
+#define _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ 1
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "TAsyncChannel.h"
+
+struct event_base;
+struct evhttp_connection;
+struct evhttp_request;
+
+namespace apache { namespace thrift { namespace transport {
+class TMemoryBuffer;
+}}}
+
+namespace apache { namespace thrift { namespace async {
+
+class TEvhttpClientChannel : public TAsyncChannel {
+ public:
+ using TAsyncChannel::VoidCallback;
+
+ TEvhttpClientChannel(
+ const std::string& host,
+ const std::string& path,
+ const char* address,
+ int port,
+ struct event_base* eb);
+ ~TEvhttpClientChannel();
+
+ virtual bool sendAndRecvMessage(const VoidCallback& cob,
+ apache::thrift::transport::TMemoryBuffer* sendBuf,
+ apache::thrift::transport::TMemoryBuffer* recvBuf);
+
+ virtual bool sendMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message);
+ virtual bool recvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message);
+
+ void finish(struct evhttp_request* req);
+
+ //XXX
+ virtual bool good() const { return true; }
+ virtual bool error() const { return false; }
+ virtual bool timedOut() const { return false; }
+
+ private:
+ static void response(struct evhttp_request* req, void* arg);
+
+ std::string host_;
+ std::string path_;
+ VoidCallback cob_;
+ apache::thrift::transport::TMemoryBuffer* recvBuf_;
+ struct evhttp_connection* conn_;
+
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_
diff --git a/lib/cpp/src/async/TEvhttpServer.cpp b/lib/cpp/src/async/TEvhttpServer.cpp
new file mode 100644
index 0000000..2997597
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpServer.cpp
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TEvhttpServer.h"
+#include "TAsyncBufferProcessor.h"
+#include "transport/TBufferTransports.h"
+#include <evhttp.h>
+
+using apache::thrift::transport::TMemoryBuffer;
+
+namespace apache { namespace thrift { namespace async {
+
+
+struct TEvhttpServer::RequestContext {
+ struct evhttp_request* req;
+ boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> ibuf;
+ boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> obuf;
+
+ RequestContext(struct evhttp_request* req);
+};
+
+
+TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor)
+ : processor_(processor)
+ , eb_(NULL)
+ , eh_(NULL)
+{}
+
+
+TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port)
+ : processor_(processor)
+ , eb_(NULL)
+ , eh_(NULL)
+{
+ // Create event_base and evhttp.
+ eb_ = event_base_new();
+ if (eb_ == NULL) {
+ abort(); // XXX
+ }
+ eh_ = evhttp_new(eb_);
+ if (eh_ == NULL) {
+ event_base_free(eb_);
+ abort(); // XXX
+ }
+
+ // Bind to port.
+ int ret = evhttp_bind_socket(eh_, NULL, port);
+ if (ret < 0) {
+ evhttp_free(eh_);
+ event_base_free(eb_);
+ }
+
+ // Register a handler. If you use the other constructor,
+ // you will want to do this yourself.
+ // Don't forget to unregister before destorying this TEvhttpServer.
+ evhttp_set_cb(eh_, "/", request, (void*)this);
+}
+
+
+TEvhttpServer::~TEvhttpServer() {
+ if (eh_ != NULL) {
+ evhttp_free(eh_);
+ }
+ if (eb_ != NULL) {
+ event_base_free(eb_);
+ }
+}
+
+
+int TEvhttpServer::serve() {
+ if (eb_ == NULL) {
+ abort(); // XXX
+ }
+ return event_base_dispatch(eb_);
+}
+
+
+TEvhttpServer::RequestContext::RequestContext(struct evhttp_request* req) : req(req)
+ , ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer), EVBUFFER_LENGTH(req->input_buffer)))
+ , obuf(new TMemoryBuffer())
+{}
+
+
+void TEvhttpServer::request(struct evhttp_request* req, void* self) {
+ static_cast<TEvhttpServer*>(self)->process(req);
+}
+
+
+void TEvhttpServer::process(struct evhttp_request* req) {
+ RequestContext* ctx = new RequestContext(req);
+ return processor_->process(
+ std::tr1::bind(
+ &TEvhttpServer::complete,
+ this,
+ ctx,
+ std::tr1::placeholders::_1),
+ ctx->ibuf,
+ ctx->obuf);
+}
+
+
+void TEvhttpServer::complete(RequestContext* ctx, bool success) {
+ std::auto_ptr<RequestContext> ptr(ctx);
+
+ int code = 200;
+ const char* reason = "OK";
+
+ int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");
+ if (rv != 0) {
+ // TODO: Log an error.
+ }
+
+ struct evbuffer* buf = evbuffer_new();
+ if (buf == NULL) {
+ // TODO: Log an error.
+ } else {
+ uint8_t* obuf;
+ uint32_t sz;
+ ctx->obuf->getBuffer(&obuf, &sz);
+ int ret = evbuffer_add(buf, obuf, sz);
+ if (ret != 0) {
+ // TODO: Log an error.
+ }
+ }
+
+ evhttp_send_reply(ctx->req, code, reason, buf);
+ if (buf != NULL) {
+ evbuffer_free(buf);
+ }
+}
+
+
+struct event_base* TEvhttpServer::getEventBase() {
+ return eb_;
+}
+
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TEvhttpServer.h b/lib/cpp/src/async/TEvhttpServer.h
new file mode 100644
index 0000000..edc6ffb
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpServer.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TEVHTTP_SERVER_H_
+#define _THRIFT_TEVHTTP_SERVER_H_ 1
+
+#include <boost/shared_ptr.hpp>
+
+struct event_base;
+struct evhttp;
+struct evhttp_request;
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncBufferProcessor;
+
+class TEvhttpServer {
+ public:
+ /**
+ * Create a TEvhttpServer for use with an external evhttp instance.
+ * Must be manually installed with evhttp_set_cb, using
+ * TEvhttpServer::request as the callback and the
+ * address of the server as the extra arg.
+ * Do not call "serve" on this server.
+ */
+ TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor);
+
+ /**
+ * Create a TEvhttpServer with an embedded event_base and evhttp,
+ * listening on port and responding on the endpoint "/".
+ * Call "serve" on this server to serve forever.
+ */
+ TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port);
+
+ ~TEvhttpServer();
+
+ static void request(struct evhttp_request* req, void* self);
+ int serve();
+
+ struct event_base* getEventBase();
+
+ private:
+ struct RequestContext;
+
+ void process(struct evhttp_request* req);
+ void complete(RequestContext* ctx, bool success);
+
+ boost::shared_ptr<TAsyncBufferProcessor> processor_;
+ struct event_base* eb_;
+ struct evhttp* eh_;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TEVHTTP_SERVER_H_