THRIFT-923. cpp: Implement a fully nonblocking server and client

There are three major parts of this:
1/ New callback-style interfaces for for a few key Thrift components:
   TAsyncProcessor for servers and TAsyncChannel for clients.
2/ Concrete implementations of TAsyncChannel and a server for
   TAsyncProcessor based on evhttp.
3/ Async-style code generation for C++

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005127 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc
index ef0dd36..0ba4540 100644
--- a/compiler/cpp/src/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/generate/t_cpp_generator.cc
@@ -59,6 +59,9 @@
     iter = parsed_options.find("include_prefix");
     use_include_prefix_ = (iter != parsed_options.end());
 
+    iter = parsed_options.find("cob_style");
+    gen_cob_style_ = (iter != parsed_options.end());
+
     out_dir_base_ = "gen-cpp";
   }
 
@@ -100,15 +103,16 @@
    * Service-level generation functions
    */
 
-  void generate_service_interface (t_service* tservice);
-  void generate_service_null      (t_service* tservice);
+  void generate_service_interface (t_service* tservice, string style);
+  void generate_service_null      (t_service* tservice, string style);
   void generate_service_multiface (t_service* tservice);
   void generate_service_helpers   (t_service* tservice);
-  void generate_service_client    (t_service* tservice);
-  void generate_service_processor (t_service* tservice);
+  void generate_service_client    (t_service* tservice, string style);
+  void generate_service_processor (t_service* tservice, string style);
   void generate_service_skeleton  (t_service* tservice);
-  void generate_process_function  (t_service* tservice, t_function* tfunction);
+  void generate_process_function  (t_service* tservice, t_function* tfunction, string style);
   void generate_function_helpers  (t_service* tservice, t_function* tfunction);
+  void generate_service_async_skeleton (t_service* tservice);
 
   /**
    * Serialization constructs
@@ -166,7 +170,12 @@
                                           t_list*     tlist,
                                           std::string iter);
 
-  /**
+  void generate_function_call            (ostream& out,
+                                          t_function* tfunction,
+                                          string target,
+                                          string iface,
+                                          string arg_prefix);
+  /*
    * Helper rendering functions
    */
 
@@ -176,8 +185,9 @@
   std::string type_name(t_type* ttype, bool in_typedef=false, bool arg=false);
   std::string base_type_name(t_base_type::t_base tbase);
   std::string declare_field(t_field* tfield, bool init=false, bool pointer=false, bool constant=false, bool reference=false);
-  std::string function_signature(t_function* tfunction, std::string prefix="", bool name_params=true);
-  std::string argument_list(t_struct* tstruct, bool name_params=true);
+  std::string function_signature(t_function* tfunction, std::string style, std::string prefix="", bool name_params=true);
+  std::string cob_function_signature(t_function* tfunction, std::string prefix="", bool name_params=true);
+  std::string argument_list(t_struct* tstruct, bool name_params=true, bool start_comma=false);
   std::string type_to_enum(t_type* ttype);
   std::string local_reflection_name(const char*, t_type* ttype, bool external=false);
 
@@ -223,6 +233,11 @@
   bool use_include_prefix_;
 
   /**
+   * True iff we should generate "Continuation OBject"-style classes as well.
+   */
+  bool gen_cob_style_;
+
+  /**
    * Strings for namespace, computed once up front then used directly
    */
 
@@ -1172,7 +1187,7 @@
       type_to_enum((*f_iter)->get_type()) << ", " <<
       (*f_iter)->get_key() << ");" << endl;
     // Write field contents
-    if (pointers) {
+    if (pointers && !(*f_iter)->get_type()->is_xception()) {
       generate_serialize_field(out, *f_iter, "(*(this->", "))");
     } else {
       generate_serialize_field(out, *f_iter, "this->");
@@ -1294,8 +1309,23 @@
   f_header_ <<
     "#ifndef " << svcname << "_H" << endl <<
     "#define " << svcname << "_H" << endl <<
-    endl <<
-    "#include <TProcessor.h>" << endl <<
+    endl;
+  if (gen_cob_style_) {
+    f_header_ <<
+      "#include <tr1/functional>" << endl <<
+      // TODO(dreiss): Libify the base client so we don't have to include this.
+      "#include <transport/TTransportUtils.h>" << endl <<
+      "namespace apache { namespace thrift { namespace async {" << endl <<
+      "class TAsyncChannel;" << endl <<
+      "}}}" << endl;
+  }
+  f_header_ <<
+    "#include <TProcessor.h>" << endl;
+  if (gen_cob_style_) {
+    f_header_ <<
+      "#include <async/TAsyncProcessor.h>" << endl;
+  }
+  f_header_ <<
     "#include \"" << get_include_prefix(*get_program()) << program_name_ <<
     "_types.h\"" << endl;
 
@@ -1317,21 +1347,34 @@
   f_service_ <<
     autogen_comment();
   f_service_ <<
-    "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" <<
-    endl <<
-    endl <<
+    "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" << endl;
+  if (gen_cob_style_) {
+    f_service_ <<
+      "#include \"async/TAsyncChannel.h\"" << endl;
+  }
+  f_service_ << endl <<
     ns_open_ << endl <<
     endl;
 
   // Generate all the components
-  generate_service_interface(tservice);
-  generate_service_null(tservice);
+  generate_service_interface(tservice, "");
+  generate_service_null(tservice, "");
   generate_service_helpers(tservice);
-  generate_service_client(tservice);
-  generate_service_processor(tservice);
+  generate_service_client(tservice, "");
+  generate_service_processor(tservice, "");
   generate_service_multiface(tservice);
   generate_service_skeleton(tservice);
 
+  // Generate all the cob components
+  if (gen_cob_style_) {
+    generate_service_interface(tservice, "CobCl");
+    generate_service_interface(tservice, "CobSv");
+    generate_service_null(tservice, "CobSv");
+    generate_service_client(tservice, "Cob");
+    generate_service_processor(tservice, "Cob");
+    generate_service_async_skeleton(tservice);
+  }
+
   // Close the namespace
   f_service_ <<
     ns_close_ << endl <<
@@ -1360,6 +1403,7 @@
     t_struct* ts = (*f_iter)->get_arglist();
     string name_orig = ts->get_name();
 
+    // TODO(dreiss): Why is this stuff not in generate_function_helpers?
     ts->set_name(tservice->get_name() + "_" + (*f_iter)->get_name() + "_args");
     generate_struct_definition(f_header_, ts, false);
     generate_struct_reader(f_service_, ts);
@@ -1378,23 +1422,29 @@
  *
  * @param tservice The service to generate a header definition for
  */
-void t_cpp_generator::generate_service_interface(t_service* tservice) {
+void t_cpp_generator::generate_service_interface(t_service* tservice, string style) {
+
+  if (style == "CobCl") {
+    // Forward declare the client.
+    indent(f_header_) << "class " << service_name_ << "CobClient;" << endl << endl;
+  }
+
   string extends = "";
   if (tservice->get_extends() != NULL) {
-    extends = " : virtual public " + type_name(tservice->get_extends()) + "If";
+    extends = " : virtual public " + type_name(tservice->get_extends()) + style + "If";
   }
   f_header_ <<
-    "class " << service_name_ << "If" << extends << " {" << endl <<
+    "class " << service_name_ << style << "If" << extends << " {" << endl <<
     " public:" << endl;
   indent_up();
   f_header_ <<
-    indent() << "virtual ~" << service_name_ << "If() {}" << endl;
+    indent() << "virtual ~" << service_name_ << style << "If" << "() {}" << endl;
 
   vector<t_function*> functions = tservice->get_functions();
   vector<t_function*>::iterator f_iter;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
     f_header_ <<
-      indent() << "virtual " << function_signature(*f_iter) << " = 0;" << endl;
+      indent() << "virtual " << function_signature(*f_iter, style) << " = 0;" << endl;
   }
   indent_down();
   f_header_ <<
@@ -1406,36 +1456,49 @@
  *
  * @param tservice The service to generate a header definition for
  */
-void t_cpp_generator::generate_service_null(t_service* tservice) {
+void t_cpp_generator::generate_service_null(t_service* tservice, string style) {
   string extends = "";
   if (tservice->get_extends() != NULL) {
-    extends = " , virtual public " + type_name(tservice->get_extends()) + "Null";
+    extends = " , virtual public " + type_name(tservice->get_extends()) + style + "Null";
   }
   f_header_ <<
-    "class " << service_name_ << "Null : virtual public " << service_name_ << "If" << extends << " {" << endl <<
+    "class " << service_name_ << style << "Null : virtual public " << service_name_ << style << "If" << extends << " {" << endl <<
     " public:" << endl;
   indent_up();
   f_header_ <<
-    indent() << "virtual ~" << service_name_ << "Null() {}" << endl;
+    indent() << "virtual ~" << service_name_ << style << "Null() {}" << endl;
   vector<t_function*> functions = tservice->get_functions();
   vector<t_function*>::iterator f_iter;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
     f_header_ <<
-      indent() << function_signature(*f_iter, "", false) << " {" << endl;
+      indent() << function_signature(*f_iter, style, "", false) << " {" << endl;
     indent_up();
+
     t_type* returntype = (*f_iter)->get_returntype();
-    if (returntype->is_void()) {
-      f_header_ <<
-        indent() << "return;" << endl;
-    } else if (is_complex_type(returntype)) {
-      f_header_ <<
-        indent() << "return;" << endl;
+    t_field returnfield(returntype, "_return");
+
+    if (style == "") {
+      if (returntype->is_void() || is_complex_type(returntype)) {
+        f_header_ << indent() << "return;" << endl;
+      } else {
+        f_header_ <<
+          indent() << declare_field(&returnfield, true) << endl <<
+          indent() << "return _return;" << endl;
+      }
+    } else if (style == "CobSv") {
+      if (returntype->is_void()) {
+        f_header_ << indent() << "return cob();" << endl;
     } else {
       t_field returnfield(returntype, "_return");
       f_header_ <<
         indent() << declare_field(&returnfield, true) << endl <<
-        indent() << "return _return;" << endl;
+        indent() << "return cob(_return);" << endl;
     }
+
+    } else {
+      throw "UNKNOWN STYLE";
+    }
+
     indent_down();
     f_header_ <<
       indent() << "}" << endl;
@@ -1445,6 +1508,109 @@
     "};" << endl << endl;
 }
 
+void t_cpp_generator::generate_function_call(ostream& out, t_function* tfunction, string target, string iface, string arg_prefix) {
+  bool first = true;
+  t_type* ret_type = get_true_type(tfunction->get_returntype());
+  out << indent();
+  if (!tfunction->is_oneway() && !ret_type->is_void()) {
+    if (is_complex_type(ret_type)) {
+      first = false;
+      out << iface << "->" << tfunction->get_name() << "(" << target;
+    } else {
+      out << target << " = " << iface << "->" << tfunction->get_name() << "(";
+    }
+  } else {
+    out << iface << "->" << tfunction->get_name() << "(";
+  }
+  const std::vector<t_field*>& fields = tfunction->get_arglist()->get_members();
+  vector<t_field*>::const_iterator f_iter;
+  for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+    if (first) {
+      first = false;
+    } else {
+      out << ", ";
+    }
+    out << arg_prefix << (*f_iter)->get_name();
+  }
+  out << ");" << endl;
+}
+
+void t_cpp_generator::generate_service_async_skeleton(t_service* tservice) {
+  string svcname = tservice->get_name();
+
+  // Service implementation file includes
+  string f_skeleton_name = get_out_dir()+svcname+"_async_server.skeleton.cpp";
+
+  string ns = namespace_prefix(tservice->get_program()->get_namespace("cpp"));
+
+  ofstream f_skeleton;
+  f_skeleton.open(f_skeleton_name.c_str());
+  f_skeleton <<
+    "// This autogenerated skeleton file illustrates one way to adapt a synchronous" << endl <<
+    "// interface into an asynchronous interface. You should copy it to another" << endl <<
+    "// filename to avoid overwriting it and rewrite as asynchronous any functions" << endl <<
+    "// that would otherwise introduce unwanted latency." << endl <<
+    endl <<
+    "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" << endl <<
+    "#include <protocol/TBinaryProtocol.h>" << endl <<
+    "#include <async/TEventServer.h>" << endl <<
+    endl <<
+    "using namespace ::apache::thrift;" << endl <<
+    "using namespace ::apache::thrift::protocol;" << endl <<
+    "using namespace ::apache::thrift::transport;" << endl <<
+    "using namespace ::apache::thrift::async;" << endl <<
+    endl <<
+    "using boost::shared_ptr;" << endl <<
+    endl;
+
+  if (!ns.empty()) {
+    f_skeleton <<
+      "using namespace " << string(ns, 0, ns.size()-2) << ";" << endl <<
+      endl;
+  }
+
+  f_skeleton <<
+    "class " << svcname << "AsyncHandler : " <<
+    "public " << svcname << "CobSvIf {" << endl <<
+    " public:" << endl;
+  indent_up();
+  f_skeleton <<
+    indent() << svcname << "AsyncHandler() {" << endl <<
+    indent() << "  syncHandler_ = std::auto_ptr<" << svcname <<
+                "Handler>(new " << svcname << "Handler);" << endl <<
+    indent() << "  // Your initialization goes here" << endl <<
+    indent() << "}" << endl;
+  f_skeleton <<
+    indent() << "virtual ~" << service_name_ << "AsyncHandler();" << endl;
+
+  vector<t_function*> functions = tservice->get_functions();
+  vector<t_function*>::iterator f_iter;
+  for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+    f_skeleton <<
+      endl <<
+      indent() << function_signature(*f_iter, "CobSv", "", true) << " {" << endl;
+    indent_up();
+
+    t_type* returntype = (*f_iter)->get_returntype();
+    t_field returnfield(returntype, "_return");
+
+    string target = returntype->is_void() ? "" : "_return";
+    if (!returntype->is_void()) {
+      f_skeleton <<
+        indent() << declare_field(&returnfield, true) << endl;
+    }
+    generate_function_call(f_skeleton, *f_iter, target, "syncHandler_", "");
+    f_skeleton << indent() << "return cob(" << target << ");" << endl;
+
+    scope_down(f_skeleton);
+  }
+  f_skeleton << endl <<
+    " protected:" << endl <<
+    indent() << "std::auto_ptr<" << svcname << "Handler> syncHandler_;" << endl;
+  indent_down();
+  f_skeleton <<
+    "};" << endl << endl;
+}
 
 /**
  * Generates a multiface, which is a single server that just takes a set
@@ -1531,7 +1697,7 @@
     call += ")";
 
     f_header_ <<
-      indent() << function_signature(*f_iter) << " {" << endl;
+      indent() << function_signature(*f_iter, "") << " {" << endl;
     indent_up();
     f_header_ <<
       indent() << "uint32_t sz = ifaces_.size();" << endl <<
@@ -1576,75 +1742,115 @@
  *
  * @param tservice The service to generate a server for.
  */
-void t_cpp_generator::generate_service_client(t_service* tservice) {
+void t_cpp_generator::generate_service_client(t_service* tservice, string style) {
+  string ifstyle;
+  if (style == "Cob") {
+    ifstyle = "CobCl";
+  }
+
   string extends = "";
   string extends_client = "";
   if (tservice->get_extends() != NULL) {
     extends = type_name(tservice->get_extends());
-    extends_client = ", public " + extends + "Client";
+    extends_client = ", public " + extends + style + "Client";
   }
 
   // Generate the header portion
   f_header_ <<
-    "class " << service_name_ << "Client : " <<
-    "virtual public " << service_name_ << "If" <<
+    "class " << service_name_ << style << "Client : " <<
+    "virtual public " << service_name_ << ifstyle << "If" <<
     extends_client << " {" << endl <<
     " public:" << endl;
 
   indent_up();
-  f_header_ <<
-    indent() << service_name_ << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) :" << endl;
-  if (extends.empty()) {
+  if (style != "Cob") {
     f_header_ <<
-      indent() << "  piprot_(prot)," << endl <<
-      indent() << "  poprot_(prot) {" << endl <<
-      indent() << "  iprot_ = prot.get();" << endl <<
-      indent() << "  oprot_ = prot.get();" << endl <<
-      indent() << "}" << endl;
-  } else {
-    f_header_ <<
-      indent() << "  " << extends << "Client(prot, prot) {}" << endl;
-  }
+      indent() << service_name_ << style << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) :" << endl;
+    if (extends.empty()) {
+      f_header_ <<
+        indent() << "  piprot_(prot)," << endl <<
+        indent() << "  poprot_(prot) {" << endl <<
+        indent() << "  iprot_ = prot.get();" << endl <<
+        indent() << "  oprot_ = prot.get();" << endl <<
+        indent() << "}" << endl;
+    } else {
+      f_header_ <<
+        indent() << "  " << extends << style << "Client(prot, prot) {}" << endl;
+    }
 
-  f_header_ <<
-    indent() << service_name_ << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) :" << endl;
-  if (extends.empty()) {
     f_header_ <<
-      indent() << "  piprot_(iprot)," << endl <<
-      indent() << "  poprot_(oprot) {" << endl <<
-      indent() << "  iprot_ = iprot.get();" << endl <<
-      indent() << "  oprot_ = oprot.get();" << endl <<
+      indent() << service_name_ << style << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) :" << endl;
+    if (extends.empty()) {
+      if (style == "Cob") {
+        f_header_ <<
+          indent() << "  rpc_ctx_(ctx)," << endl;
+      }
+      f_header_ <<
+        indent() << "  piprot_(iprot)," << endl <<
+        indent() << "  poprot_(oprot) {" << endl <<
+        indent() << "  iprot_ = iprot.get();" << endl <<
+        indent() << "  oprot_ = oprot.get();" << endl <<
+        indent() << "}" << endl;
+    } else {
+      f_header_ <<
+        indent() << "  " << extends << style << "Client(iprot, oprot) {}" << endl;
+    }
+    
+    // Generate getters for the protocols.
+    f_header_ <<
+      indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {" << endl <<
+      indent() << "  return piprot_;" << endl <<
       indent() << "}" << endl;
-  } else {
+      
     f_header_ <<
-      indent() << "  " << extends << "Client(iprot, oprot) {}" << endl;
-  }
+      indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {" << endl <<
+      indent() << "  return poprot_;" << endl <<
+      indent() << "}" << endl;
 
+  } else /* if (style == "Cob") */ {
   // Generate getters for the protocols.
-  f_header_ <<
-    indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {" << endl <<
-    indent() << "  return piprot_;" << endl <<
-    indent() << "}" << endl;
+    f_header_ <<
+      indent() << service_name_ << style << "Client("
+      << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, "
+      << "::apache::thrift::protocol::TProtocolFactory* protocolFactory) :" << endl;
+    if (extends.empty()) {
+      f_header_ <<
+        indent() << "  channel_(channel)," << endl <<
+        indent() << "  itrans_(new ::apache::thrift::transport::TMemoryBuffer())," << endl <<
+        indent() << "  otrans_(new ::apache::thrift::transport::TMemoryBuffer())," << endl <<
+        indent() << "  piprot_(protocolFactory->getProtocol(itrans_))," << endl <<
+        indent() << "  poprot_(protocolFactory->getProtocol(otrans_)) {" << endl <<
+        indent() << "  iprot_ = piprot_.get();" << endl <<
+        indent() << "  oprot_ = poprot_.get();" << endl <<
+        indent() << "}" << endl;
+    } else {
+      f_header_ <<
+        indent() << "  " << extends << style << "Client(channel, protocolFactory) {}" << endl;
+    }
+  }
 
-  f_header_ <<
-    indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {" << endl <<
-    indent() << "  return poprot_;" << endl <<
-    indent() << "}" << endl;
+  if (style == "Cob") {
+    f_header_ <<
+      indent() << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {" << endl <<
+      indent() << "  return channel_;" << endl <<
+      indent() << "}" << endl;
+  }
 
   vector<t_function*> functions = tservice->get_functions();
   vector<t_function*>::const_iterator f_iter;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+    indent(f_header_) << function_signature(*f_iter, ifstyle) << ";" << endl;
+    // TODO(dreiss): Use private inheritance to avoid generating thise in cob-style.
     t_function send_function(g_type_void,
-                             string("send_") + (*f_iter)->get_name(),
-                             (*f_iter)->get_arglist());
-    indent(f_header_) << function_signature(*f_iter) << ";" << endl;
-    indent(f_header_) << function_signature(&send_function) << ";" << endl;
+        string("send_") + (*f_iter)->get_name(),
+        (*f_iter)->get_arglist());
+    indent(f_header_) << function_signature(&send_function, "") << ";" << endl;
     if (!(*f_iter)->is_oneway()) {
       t_struct noargs(program_);
       t_function recv_function((*f_iter)->get_returntype(),
-                               string("recv_") + (*f_iter)->get_name(),
-                               &noargs);
-      indent(f_header_) << function_signature(&recv_function) << ";" << endl;
+          string("recv_") + (*f_iter)->get_name(),
+          &noargs);
+      indent(f_header_) << function_signature(&recv_function, "") << ";" << endl;
     }
   }
   indent_down();
@@ -1653,11 +1859,19 @@
     f_header_ <<
       " protected:" << endl;
     indent_up();
+
+    if (style == "Cob") {
+      f_header_ <<
+        indent() << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;"  << endl <<
+        indent() << "boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;"  << endl <<
+        indent() << "boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;"  << endl;
+    }
     f_header_ <<
       indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;"  << endl <<
       indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;"  << endl <<
       indent() << "::apache::thrift::protocol::TProtocol* iprot_;"  << endl <<
       indent() << "::apache::thrift::protocol::TProtocol* oprot_;"  << endl;
+
     indent_down();
   }
 
@@ -1665,7 +1879,7 @@
     "};" << endl <<
     endl;
 
-  string scope = service_name_ + "Client::";
+  string scope = service_name_ + style + "Client::";
 
   // Generate client method implementations
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
@@ -1673,7 +1887,7 @@
 
     // Open function
     indent(f_service_) <<
-      function_signature(*f_iter, scope) << endl;
+      function_signature(*f_iter, ifstyle, scope) << endl;
     scope_up(f_service_);
     indent(f_service_) <<
       "send_" << funname << "(";
@@ -1695,155 +1909,171 @@
     }
     f_service_ << ");" << endl;
 
-    if (!(*f_iter)->is_oneway()) {
-      f_service_ << indent();
-      if (!(*f_iter)->get_returntype()->is_void()) {
-        if (is_complex_type((*f_iter)->get_returntype())) {
-          f_service_ << "recv_" << funname << "(_return);" << endl;
+    if (style != "Cob") {
+      if (!(*f_iter)->is_oneway()) {
+        f_service_ << indent();
+        if (!(*f_iter)->get_returntype()->is_void()) {
+          if (is_complex_type((*f_iter)->get_returntype())) {
+            f_service_ << "recv_" << funname << "(_return);" << endl;
+          } else {
+            f_service_ << "return recv_" << funname << "();" << endl;
+          }
         } else {
-          f_service_ << "return recv_" << funname << "();" << endl;
+          f_service_ <<
+            "recv_" << funname << "();" << endl;
         }
+      }
+    } else {
+      if (!(*f_iter)->is_oneway()) {
+        f_service_ <<
+          indent() << _this << "channel_->sendAndRecvMessage(" <<
+          "std::tr1::bind(cob, this), " << _this << "otrans_.get(), " <<
+          _this << "itrans_.get());" << endl;
       } else {
         f_service_ <<
-          "recv_" << funname << "();" << endl;
+        indent() << _this << "channel_->sendMessage(" <<
+          "std::tr1::bind(cob, this), " << _this << "otrans_.get());" << endl;
       }
     }
     scope_down(f_service_);
     f_service_ << endl;
 
-    // Function for sending
-    t_function send_function(g_type_void,
-                             string("send_") + (*f_iter)->get_name(),
-                             (*f_iter)->get_arglist());
+    //if (style != "Cob") // TODO(dreiss): Libify the client and don't generate this for cob-style
+    if (true) {
+      // Function for sending
+      t_function send_function(g_type_void,
+                               string("send_") + (*f_iter)->get_name(),
+                               (*f_iter)->get_arglist());
 
-    // Open the send function
-    indent(f_service_) <<
-      function_signature(&send_function, scope) << endl;
-    scope_up(f_service_);
-
-    // Function arguments and results
-    string argsname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_pargs";
-    string resultname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_presult";
-
-    // Serialize the request
-    f_service_ <<
-      indent() << "int32_t cseqid = 0;" << endl <<
-      indent() << "oprot_->writeMessageBegin(\"" << (*f_iter)->get_name() << "\", ::apache::thrift::protocol::T_CALL, cseqid);" << endl <<
-      endl <<
-      indent() << argsname << " args;" << endl;
-
-    for (fld_iter = fields.begin(); fld_iter != fields.end(); ++fld_iter) {
-      f_service_ <<
-        indent() << "args." << (*fld_iter)->get_name() << " = &" << (*fld_iter)->get_name() << ";" << endl;
-    }
-
-    f_service_ <<
-      indent() << "args.write(oprot_);" << endl <<
-      endl <<
-      indent() << "oprot_->writeMessageEnd();" << endl <<
-      indent() << "oprot_->getTransport()->flush();" << endl <<
-      indent() << "oprot_->getTransport()->writeEnd();" << endl;
-
-    scope_down(f_service_);
-    f_service_ << endl;
-
-    // Generate recv function only if not an oneway function
-    if (!(*f_iter)->is_oneway()) {
-      t_struct noargs(program_);
-      t_function recv_function((*f_iter)->get_returntype(),
-                               string("recv_") + (*f_iter)->get_name(),
-                               &noargs);
-      // Open function
+      // Open the send function
       indent(f_service_) <<
-        function_signature(&recv_function, scope) << endl;
+        function_signature(&send_function, "", scope) << endl;
       scope_up(f_service_);
 
+      // Function arguments and results
+      string argsname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_pargs";
+      string resultname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_presult";
+
+      // Serialize the request
       f_service_ <<
+        indent() << "int32_t cseqid = 0;" << endl <<
+        indent() << "oprot_->writeMessageBegin(\"" << (*f_iter)->get_name() << "\", ::apache::thrift::protocol::T_CALL, cseqid);" << endl <<
         endl <<
-        indent() << "int32_t rseqid = 0;" << endl <<
-        indent() << "std::string fname;" << endl <<
-        indent() << "::apache::thrift::protocol::TMessageType mtype;" << endl <<
+        indent() << argsname << " args;" << endl;
+
+      for (fld_iter = fields.begin(); fld_iter != fields.end(); ++fld_iter) {
+        f_service_ <<
+          indent() << "args." << (*fld_iter)->get_name() << " = &" << (*fld_iter)->get_name() << ";" << endl;
+      }
+
+      f_service_ <<
+        indent() << "args.write(oprot_);" << endl <<
         endl <<
-        indent() << "iprot_->readMessageBegin(fname, mtype, rseqid);" << endl <<
-        indent() << "if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {" << endl <<
-        indent() << "  ::apache::thrift::TApplicationException x;" << endl <<
-        indent() << "  x.read(iprot_);" << endl <<
-        indent() << "  iprot_->readMessageEnd();" << endl <<
-        indent() << "  iprot_->getTransport()->readEnd();" << endl <<
-        indent() << "  throw x;" << endl <<
-        indent() << "}" << endl <<
-        indent() << "if (mtype != ::apache::thrift::protocol::T_REPLY) {" << endl <<
-        indent() << "  iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
-        indent() << "  iprot_->readMessageEnd();" << endl <<
-        indent() << "  iprot_->getTransport()->readEnd();" << endl <<
-        indent() << "  throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);" << endl <<
-        indent() << "}" << endl <<
-        indent() << "if (fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl <<
-        indent() << "  iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
-        indent() << "  iprot_->readMessageEnd();" << endl <<
-        indent() << "  iprot_->getTransport()->readEnd();" << endl <<
-        indent() << "  throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME);" << endl <<
-        indent() << "}" << endl;
+        indent() << "oprot_->writeMessageEnd();" << endl <<
+        indent() << "oprot_->getTransport()->flush();" << endl <<
+        indent() << "oprot_->getTransport()->writeEnd();" << endl;
 
-      if (!(*f_iter)->get_returntype()->is_void() &&
-          !is_complex_type((*f_iter)->get_returntype())) {
-        t_field returnfield((*f_iter)->get_returntype(), "_return");
-        f_service_ <<
-          indent() << declare_field(&returnfield) << endl;
-      }
-
-      f_service_ <<
-        indent() << resultname << " result;" << endl;
-
-      if (!(*f_iter)->get_returntype()->is_void()) {
-        f_service_ <<
-          indent() << "result.success = &_return;" << endl;
-      }
-
-      f_service_ <<
-        indent() << "result.read(iprot_);" << endl <<
-        indent() << "iprot_->readMessageEnd();" << endl <<
-        indent() << "iprot_->getTransport()->readEnd();" << endl <<
-        endl;
-
-      // Careful, only look for _result if not a void function
-      if (!(*f_iter)->get_returntype()->is_void()) {
-        if (is_complex_type((*f_iter)->get_returntype())) {
-          f_service_ <<
-            indent() << "if (result.__isset.success) {" << endl <<
-            indent() << "  // _return pointer has now been filled" << endl <<
-            indent() << "  return;" << endl <<
-            indent() << "}" << endl;
-        } else {
-          f_service_ <<
-            indent() << "if (result.__isset.success) {" << endl <<
-            indent() << "  return _return;" << endl <<
-            indent() << "}" << endl;
-        }
-      }
-
-      t_struct* xs = (*f_iter)->get_xceptions();
-      const std::vector<t_field*>& xceptions = xs->get_members();
-      vector<t_field*>::const_iterator x_iter;
-      for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
-        f_service_ <<
-          indent() << "if (result.__isset." << (*x_iter)->get_name() << ") {" << endl <<
-          indent() << "  throw result." << (*x_iter)->get_name() << ";" << endl <<
-          indent() << "}" << endl;
-      }
-
-      // We only get here if we are a void function
-      if ((*f_iter)->get_returntype()->is_void()) {
-        indent(f_service_) <<
-          "return;" << endl;
-      } else {
-        f_service_ <<
-          indent() << "throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
-      }
-
-      // Close function
       scope_down(f_service_);
       f_service_ << endl;
+
+      // Generate recv function only if not an oneway function
+      if (!(*f_iter)->is_oneway()) {
+        t_struct noargs(program_);
+        t_function recv_function((*f_iter)->get_returntype(),
+                                 string("recv_") + (*f_iter)->get_name(),
+                                 &noargs);
+        // Open function
+        indent(f_service_) <<
+          function_signature(&recv_function, "", scope) << endl;
+        scope_up(f_service_);
+
+        f_service_ <<
+          endl <<
+          indent() << "int32_t rseqid = 0;" << endl <<
+          indent() << "std::string fname;" << endl <<
+          indent() << "::apache::thrift::protocol::TMessageType mtype;" << endl <<
+          endl <<
+          indent() << "iprot_->readMessageBegin(fname, mtype, rseqid);" << endl <<
+          indent() << "if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {" << endl <<
+          indent() << "  ::apache::thrift::TApplicationException x;" << endl <<
+          indent() << "  x.read(iprot_);" << endl <<
+          indent() << "  iprot_->readMessageEnd();" << endl <<
+          indent() << "  iprot_->getTransport()->readEnd();" << endl <<
+          indent() << "  throw x;" << endl <<
+          indent() << "}" << endl <<
+          indent() << "if (mtype != ::apache::thrift::protocol::T_REPLY) {" << endl <<
+          indent() << "  iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
+          indent() << "  iprot_->readMessageEnd();" << endl <<
+          indent() << "  iprot_->getTransport()->readEnd();" << endl <<
+          indent() << "  throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);" << endl <<
+          indent() << "}" << endl <<
+          indent() << "if (fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl <<
+          indent() << "  iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl <<
+          indent() << "  iprot_->readMessageEnd();" << endl <<
+          indent() << "  iprot_->getTransport()->readEnd();" << endl <<
+          indent() << "  throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME);" << endl <<
+          indent() << "}" << endl;
+
+        if (!(*f_iter)->get_returntype()->is_void() &&
+            !is_complex_type((*f_iter)->get_returntype())) {
+          t_field returnfield((*f_iter)->get_returntype(), "_return");
+          f_service_ <<
+            indent() << declare_field(&returnfield) << endl;
+        }
+
+        f_service_ <<
+          indent() << resultname << " result;" << endl;
+
+        if (!(*f_iter)->get_returntype()->is_void()) {
+          f_service_ <<
+            indent() << "result.success = &_return;" << endl;
+        }
+
+        f_service_ <<
+          indent() << "result.read(iprot_);" << endl <<
+          indent() << "iprot_->readMessageEnd();" << endl <<
+          indent() << "iprot_->getTransport()->readEnd();" << endl <<
+          endl;
+
+        // Careful, only look for _result if not a void function
+        if (!(*f_iter)->get_returntype()->is_void()) {
+          if (is_complex_type((*f_iter)->get_returntype())) {
+            f_service_ <<
+              indent() << "if (result.__isset.success) {" << endl <<
+              indent() << "  // _return pointer has now been filled" << endl <<
+              indent() << "  return;" << endl <<
+              indent() << "}" << endl;
+          } else {
+            f_service_ <<
+              indent() << "if (result.__isset.success) {" << endl <<
+              indent() << "  return _return;" << endl <<
+              indent() << "}" << endl;
+          }
+        }
+
+        t_struct* xs = (*f_iter)->get_xceptions();
+        const std::vector<t_field*>& xceptions = xs->get_members();
+        vector<t_field*>::const_iterator x_iter;
+        for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+          f_service_ <<
+            indent() << "if (result.__isset." << (*x_iter)->get_name() << ") {" << endl <<
+            indent() << "  throw result." << (*x_iter)->get_name() << ";" << endl <<
+            indent() << "}" << endl;
+        }
+
+        // We only get here if we are a void function
+        if ((*f_iter)->get_returntype()->is_void()) {
+          indent(f_service_) <<
+            "return;" << endl;
+        } else {
+          f_service_ <<
+            indent() << "throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
+        }
+
+        // Close function
+        scope_down(f_service_);
+        f_service_ << endl;
+      }
     }
   }
 }
@@ -1853,7 +2083,22 @@
  *
  * @param tservice The service to generate a server for.
  */
-void t_cpp_generator::generate_service_processor(t_service* tservice) {
+void t_cpp_generator::generate_service_processor(t_service* tservice, string style) {
+  string ifstyle;
+  string pstyle;
+  string finish_cob;
+  string finish_cob_decl;
+  string cob_arg;
+  string ret_type = "bool ";
+  if (style == "Cob") {
+    ifstyle = "CobSv";
+    pstyle = "Async";
+    finish_cob = "std::tr1::function<void(bool ok)> cob, ";
+    finish_cob_decl = "std::tr1::function<void(bool ok)>, ";
+    cob_arg = "cob, ";
+    ret_type = "void ";
+  }
+
   // Generate the dispatch methods
   vector<t_function*> functions = tservice->get_functions();
   vector<t_function*>::iterator f_iter;
@@ -1862,13 +2107,13 @@
   string extends_processor = "";
   if (tservice->get_extends() != NULL) {
     extends = type_name(tservice->get_extends());
-    extends_processor = ", public " + extends + "Processor";
+    extends_processor = ", public " + extends + pstyle + "Processor";
   }
 
   // Generate the header portion
   f_header_ <<
-    "class " << service_name_ << "Processor : " <<
-    "virtual public ::apache::thrift::TProcessor" <<
+    "class " << service_name_ << pstyle << "Processor : " <<
+    "virtual public ::apache::thrift::T" << pstyle << "Processor" <<
     extends_processor << " {" << endl;
 
   // Protected data members
@@ -1876,9 +2121,9 @@
     " protected:" << endl;
   indent_up();
   f_header_ <<
-    indent() << "boost::shared_ptr<" << service_name_ << "If> iface_;" << endl;
+    indent() << "boost::shared_ptr<" << service_name_ << ifstyle << "If> iface_;" << endl;
   f_header_ <<
-    indent() << "virtual bool process_fn(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl;
+    indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl;
   indent_down();
 
   // Process function declarations
@@ -1886,10 +2131,20 @@
     " private:" << endl;
   indent_up();
   f_header_ <<
-    indent() << "std::map<std::string, void (" << service_name_ << "Processor::*)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)> processMap_;" << endl;
+    indent() << "std::map<std::string, void (" << service_name_ << pstyle << "Processor::*)(" << finish_cob_decl << "int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)> processMap_;" << endl;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
     indent(f_header_) <<
-      "void process_" << (*f_iter)->get_name() << "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl;
+      "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl;
+    if (style == "Cob") {
+      // XXX Factor this out, even if it is a pain.
+      string ret_arg = ((*f_iter)->get_returntype()->is_void()
+                        ? ""
+                        : ", const " + type_name((*f_iter)->get_returntype()) + "& _return");
+      // XXX Don't declare throw if it doesn't exist
+      f_header_ <<
+        "void return_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ");" << endl <<
+        "void throw_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw);" << endl;
+    }
   }
   indent_down();
 
@@ -1903,6 +2158,7 @@
     declare_map += (*f_iter)->get_name();
     declare_map += "\"] = &";
     declare_map += service_name_;
+    declare_map += pstyle;
     declare_map += "Processor::process_";
     declare_map += (*f_iter)->get_name();
     declare_map += ";\n";
@@ -1911,28 +2167,28 @@
 
   f_header_ <<
     " public:" << endl <<
-    indent() << service_name_ << "Processor(boost::shared_ptr<" << service_name_ << "If> iface) :" << endl;
+    indent() << service_name_ << pstyle << "Processor(boost::shared_ptr<" << service_name_ << ifstyle << "If> iface) :" << endl;
   if (extends.empty()) {
     f_header_ <<
       indent() << "  iface_(iface) {" << endl;
   } else {
     f_header_ <<
-      indent() << "  " << extends << "Processor(iface)," << endl <<
+      indent() << "  " << extends << pstyle << "Processor(iface)," << endl <<
       indent() << "  iface_(iface) {" << endl;
   }
   f_header_ <<
     declare_map <<
     indent() << "}" << endl <<
     endl <<
-    indent() << "virtual bool process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl <<
-    indent() << "virtual ~" << service_name_ << "Processor() {}" << endl;
+    indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl <<
+    indent() << "virtual ~" << service_name_ << pstyle << "Processor() {}" << endl;
   indent_down();
   f_header_ <<
     "};" << endl << endl;
 
   // Generate the server implementation
   f_service_ <<
-    "bool " << service_name_ << "Processor::process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" << endl;
+    ret_type << service_name_ << pstyle << "Processor::process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" << endl;
   indent_up();
 
   f_service_ <<
@@ -1955,10 +2211,11 @@
     indent() << "  oprot->writeMessageEnd();" << endl <<
     indent() << "  oprot->getTransport()->flush();" << endl <<
     indent() << "  oprot->getTransport()->writeEnd();" << endl <<
-    indent() << "  return true;" << endl <<
+    indent() << (style == "Cob" ? "  return cob(true);" : "  return true;") << endl <<
     indent() << "}" << endl <<
     endl <<
-    indent() << "return process_fn(iprot, oprot, fname, seqid);" <<
+    indent() << "return process_fn(" << (style == "Cob" ? "cob, " : "")
+             << "iprot, oprot, fname, seqid);" <<
     endl;
 
   indent_down();
@@ -1967,12 +2224,12 @@
     endl;
 
   f_service_ <<
-    "bool " << service_name_ << "Processor::process_fn(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid) {" << endl;
+    ret_type << service_name_ << pstyle << "Processor::process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid) {" << endl;
   indent_up();
 
   // HOT: member function pointer map
   f_service_ <<
-    indent() << "std::map<std::string, void (" << service_name_ << "Processor::*)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)>::iterator pfn;" << endl <<
+    indent() << "std::map<std::string, void (" << service_name_ << pstyle << "Processor::*)(" << finish_cob_decl << "int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*)>::iterator pfn;" << endl <<
     indent() << "pfn = processMap_.find(fname);" << endl <<
     indent() << "if (pfn == processMap_.end()) {" << endl;
   if (extends.empty()) {
@@ -1986,15 +2243,26 @@
       indent() << "  oprot->writeMessageEnd();" << endl <<
       indent() << "  oprot->getTransport()->flush();" << endl <<
       indent() << "  oprot->getTransport()->writeEnd();" << endl <<
-      indent() << "  return true;" << endl;
+      indent() << (style == "Cob" ? "  return cob(true);" : "  return true;") << endl;
   } else {
     f_service_ <<
-      indent() << "  return " << extends << "Processor::process_fn(iprot, oprot, fname, seqid);" << endl;
+      indent() << "  return "
+               << extends << pstyle << "Processor::process_fn("
+               << (style == "Cob" ? "cob, " : "")
+               << "iprot, oprot, fname, seqid);" << endl;
   }
   f_service_ <<
     indent() << "}" << endl <<
-    indent() << "(this->*(pfn->second))(seqid, iprot, oprot);" << endl <<
-    indent() << "return true;" << endl;
+    indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot);" << endl;
+
+  // TODO(dreiss): return pfn ret?
+  if (style == "Cob") {
+    f_service_ <<
+      indent() << "return;" << endl;
+  } else {
+    f_service_ <<
+      indent() << "return true;" << endl;
+  }
 
   indent_down();
   f_service_ <<
@@ -2003,7 +2271,7 @@
 
   // Generate the process subfunctions
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
-    generate_process_function(tservice, *f_iter);
+    generate_process_function(tservice, *f_iter, style);
   }
 }
 
@@ -2036,8 +2304,11 @@
   generate_struct_result_writer(f_service_, &result);
 
   result.set_name(tservice->get_name() + "_" + tfunction->get_name() + "_presult");
-  generate_struct_definition(f_header_, &result, false, true, true, false);
+  generate_struct_definition(f_header_, &result, false, true, true, gen_cob_style_);
   generate_struct_reader(f_service_, &result, true);
+  if (gen_cob_style_) {
+    generate_struct_writer(f_service_, &result, true);
+  }
 
 }
 
@@ -2047,165 +2318,303 @@
  * @param tfunction The function to write a dispatcher for
  */
 void t_cpp_generator::generate_process_function(t_service* tservice,
-                                                t_function* tfunction) {
-  // Open function
-  f_service_ <<
-    "void " << tservice->get_name() << "Processor::" <<
-    "process_" << tfunction->get_name() <<
-    "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl;
-  scope_up(f_service_);
-
-  string argsname = tservice->get_name() + "_" + tfunction->get_name() + "_args";
-  string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result";
-
-  f_service_ <<
-    indent() << "void* ctx = NULL;" << endl <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl <<
-    indent() << "// A glorified finally block since ctx is a void*" << endl <<
-    indent() << "class ContextFreer {" << endl <<
-    indent() << "  public:" << endl <<
-    indent() << "    ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
-    indent() << "      handler_(handler), context_(context) {}" << endl <<
-    indent() << "    ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
-    indent() << "  private:" << endl <<
-    indent() << "    ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
-    indent() << "    void* context_;" << endl <<
-    indent() << "};" << endl <<
-    indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl << endl <<
-    indent() << argsname << " args;" << endl <<
-    indent() << "args.read(iprot);" << endl <<
-    indent() << "iprot->readMessageEnd();" << endl <<
-    indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl <<
-    endl;
+                                                t_function* tfunction,
+                                                string style) {
+  t_struct* arg_struct = tfunction->get_arglist();
+  const std::vector<t_field*>& fields = arg_struct->get_members();
+  vector<t_field*>::const_iterator f_iter;
 
   t_struct* xs = tfunction->get_xceptions();
   const std::vector<t_field*>& xceptions = xs->get_members();
   vector<t_field*>::const_iterator x_iter;
 
-  // Declare result
-  if (!tfunction->is_oneway()) {
+  // I tried to do this as one function.  I really did.  But it was too hard.
+  if (style != "Cob") {
+    // Open function
     f_service_ <<
-      indent() << resultname << " result;" << endl;
-  }
+      "void " << tservice->get_name() << "Processor::" <<
+      "process_" << tfunction->get_name() << "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl;
+    scope_up(f_service_);
 
-  // Try block for functions with exceptions
-  f_service_ <<
-    indent() << "try {" << endl;
-  indent_up();
+    string argsname = tservice->get_name() + "_" + tfunction->get_name() + "_args";
+    string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result";
 
-  // Generate the function call
-  t_struct* arg_struct = tfunction->get_arglist();
-  const std::vector<t_field*>& fields = arg_struct->get_members();
-  vector<t_field*>::const_iterator f_iter;
+    f_service_ <<
+      indent() << "void* ctx = NULL;" << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl <<
+      indent() << "// A glorified finally block since ctx is a void*" << endl <<
+      indent() << "class ContextFreer {" << endl <<
+      indent() << "  public:" << endl <<
+      indent() << "    ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
+      indent() << "      handler_(handler), context_(context) {}" << endl <<
+      indent() << "    ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
+      indent() << "  private:" << endl <<
+      indent() << "    ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
+      indent() << "    void* context_;" << endl <<
+      indent() << "};" << endl <<
+      indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl << endl <<
+      indent() << argsname << " args;" << endl <<
+      indent() << "args.read(iprot);" << endl <<
+      indent() << "iprot->readMessageEnd();" << endl <<
+      indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl <<
+      endl;
 
-  bool first = true;
-  f_service_ << indent();
-  if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
-    if (is_complex_type(tfunction->get_returntype())) {
-      first = false;
-      f_service_ << "iface_->" << tfunction->get_name() << "(result.success";
-    } else {
-      f_service_ << "result.success = iface_->" << tfunction->get_name() << "(";
+    // Declare result
+    if (!tfunction->is_oneway()) {
+      f_service_ <<
+        indent() << resultname << " result;" << endl;
     }
-  } else {
+
+    // Try block for functions with exceptions
     f_service_ <<
-      "iface_->" << tfunction->get_name() << "(";
-  }
-  for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
-    if (first) {
-      first = false;
-    } else {
-      f_service_ << ", ";
-    }
-    f_service_ << "args." << (*f_iter)->get_name();
-  }
-  f_service_ << ");" << endl;
+      indent() << "try {" << endl;
+    indent_up();
 
-  // Set isset on success field
-  if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
-    f_service_ <<
-      indent() << "result.__isset.success = true;" << endl;
-  }
-
-  indent_down();
-  f_service_ << indent() << "}";
-
-  if (!tfunction->is_oneway()) {
-    for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
-      f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl;
-      if (!tfunction->is_oneway()) {
-        indent_up();
-        f_service_ <<
-          indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl <<
-          indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
-        indent_down();
-        f_service_ << indent() << "}";
+    // Generate the function call
+    bool first = true;
+    f_service_ << indent();
+    if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
+      if (is_complex_type(tfunction->get_returntype())) {
+        first = false;
+        f_service_ << "iface_->" << tfunction->get_name() << "(result.success";
       } else {
-        f_service_ << "}";
+        f_service_ << "result.success = iface_->" << tfunction->get_name() << "(";
+      }
+    } else {
+      f_service_ <<
+        "iface_->" << tfunction->get_name() << "(";
+    }
+    for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+      if (first) {
+        first = false;
+      } else {
+        f_service_ << ", ";
+      }
+      f_service_ << "args." << (*f_iter)->get_name();
+    }
+    f_service_ << ");" << endl;
+
+    // Set isset on success field
+    if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
+      f_service_ <<
+        indent() << "result.__isset.success = true;" << endl;
+    }
+
+    indent_down();
+    f_service_ << indent() << "}";
+
+    if (!tfunction->is_oneway()) {
+      for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+        f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl;
+        if (!tfunction->is_oneway()) {
+          indent_up();
+          f_service_ <<
+            indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl <<
+            indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
+          indent_down();
+          f_service_ << indent() << "}";
+        } else {
+          f_service_ << "}";
+        }
       }
     }
-  }
 
-  f_service_ << " catch (const std::exception& e) {" << endl;
+    f_service_ << " catch (const std::exception& e) {" << endl;
 
-  indent_up();
-  f_service_ <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl;
+    indent_up();
+    f_service_ <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl;
 
   if (!tfunction->is_oneway()) {
     f_service_ <<
       endl <<
-      indent() << "::apache::thrift::TApplicationException x(e.what());" << endl <<
-      indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
-      indent() << "x.write(oprot);" << endl <<
-      indent() << "oprot->writeMessageEnd();" << endl <<
-      indent() << "oprot->getTransport()->flush();" << endl <<
-      indent() << "oprot->getTransport()->writeEnd();" << endl;
+        indent() << "::apache::thrift::TApplicationException x(e.what());" << endl <<
+        indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
+        indent() << "x.write(oprot);" << endl <<
+        indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "oprot->getTransport()->flush();" << endl <<
+        indent() << "oprot->getTransport()->writeEnd();" << endl;
   }
   f_service_ << indent() << "return;" << endl;
   indent_down();
   f_service_ << indent() << "}" << endl << endl;
 
-  // Shortcut out here for oneway functions
-  if (tfunction->is_oneway()) {
-    f_service_ <<
-      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-      indent() << "  eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-      indent() << "}" << endl << endl <<
-      indent() << "return;" << endl;
-    // Close function
-    scope_down(f_service_);
-    f_service_ << endl;
-    return;
-  }
+    // Shortcut out here for oneway functions
+    if (tfunction->is_oneway()) {
+      f_service_ <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl << endl <<
+        indent() << "return;" << endl;
+      indent_down();
+      f_service_ << "}" << endl <<
+        endl;
+      return;
+    }
 
-  // Serialize the result into a struct
-  f_service_ <<
+    // Serialize the result into a struct
+    f_service_ <<
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
       indent() << "  eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
       indent() << "}" << endl << endl <<
-    indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
-    indent() << "result.write(oprot);" << endl <<
-    indent() << "oprot->writeMessageEnd();" << endl <<
-    indent() << "oprot->getTransport()->flush();" << endl <<
-    indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
-    indent() << "if (eventHandler_.get() != NULL) {" << endl <<
-    indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
-    indent() << "}" << endl;
+      indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
+      indent() << "result.write(oprot);" << endl <<
+      indent() << "oprot->writeMessageEnd();" << endl <<
+      indent() << "oprot->getTransport()->flush();" << endl <<
+      indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl;
 
-  // Close function
-  scope_down(f_service_);
-  f_service_ << endl;
+    // Close function
+    scope_down(f_service_);
+    f_service_ << endl;
+  }
+
+  // Cob style.
+  else {
+    // Processor entry point.
+    f_service_ <<
+      "void " << tservice->get_name() << "AsyncProcessor::" <<
+      "process_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl;
+    scope_up(f_service_);
+
+    f_service_ <<
+      indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl <<
+      indent() << "try {" << endl;
+    indent_up();
+    f_service_ <<
+      indent() << "args.read(iprot);" << endl <<
+      indent() << "iprot->readMessageEnd();" << endl <<
+      indent() << "iprot->getTransport()->readEnd();" << endl;
+    scope_down(f_service_);
+
+    // TODO(dreiss): Handle TExceptions?  Expose to server?
+    f_service_ <<
+      indent() << "catch (const std::exception& exn) {" << endl <<
+      indent() << "  return cob(false);" << endl <<
+      indent() << "}" << endl;
+
+    // TODO(dreiss): Figure out a strategy for exceptions in async handlers.
+    f_service_ <<
+      indent() << "iface_->" << tfunction->get_name() << "(";
+    indent_up(); indent_up();
+    if (tfunction->is_oneway()) {
+      // No return.  Just hand off our cob.
+      // TODO(dreiss): Call the cob immediately?
+      f_service_ <<
+        "std::tr1::bind(cob, true)" << endl;
+    } else {
+      f_service_ << endl;
+      string ret_placeholder = ", std::tr1::placeholders::_1";
+      string comma = "";
+      if (tfunction->get_returntype()->is_void()) {
+        ret_placeholder = "";
+      }
+      f_service_ <<
+        indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
+                 << "return_" << tfunction->get_name()
+                 << ", this, cob, seqid, oprot" << ret_placeholder << ")";
+      if (!xceptions.empty()) {
+        f_service_
+                   << ',' << endl <<
+          indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
+                   << "throw_" << tfunction->get_name()
+                   << ", this, cob, seqid, oprot, std::tr1::placeholders::_1)";
+      }
+    }
+
+    // XXX Whitespace cleanup.
+    for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+      f_service_
+                 << ',' << endl <<
+        indent() << "args." << (*f_iter)->get_name();
+    }
+    f_service_ << ");" << endl;
+    indent_down(); indent_down();
+    scope_down(f_service_);
+    f_service_ << endl;
+
+    // Normal return.
+    if (!tfunction->is_oneway()) {
+      string ret_arg = (tfunction->get_returntype()->is_void()
+                        ? ""
+                        : ", const " + type_name(tfunction->get_returntype()) + "& _return");
+      f_service_ <<
+        "void " << tservice->get_name() << "AsyncProcessor::" <<
+        "return_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ')' << endl;
+      scope_up(f_service_);
+      f_service_ <<
+        indent() << tservice->get_name() << "_" << tfunction->get_name() << "_presult result;" << endl;
+      if (!tfunction->get_returntype()->is_void()) {
+        // The const_cast here is unfortunate, but it would be a pain to avoid,
+        // and we only do a write with this struct, which is const-safe.
+        f_service_ <<
+          indent() << "result.success = const_cast<" << type_name(tfunction->get_returntype()) << "*>(&_return);" << endl <<
+          indent() << "result.__isset.success = true;" << endl;
+      }
+      // Serialize the result into a struct
+      f_service_ <<
+        endl <<
+        indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
+        indent() << "result.write(oprot);" << endl <<
+        indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "oprot->getTransport()->flush();" << endl <<
+        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "return cob(true);" << endl;
+      scope_down(f_service_);
+      f_service_ << endl;
+    }
+
+    // Exception return.
+    if (!tfunction->is_oneway() && !xceptions.empty()) {
+      f_service_ <<
+        "void " << tservice->get_name() << "AsyncProcessor::" <<
+        "throw_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw)" << endl;
+      scope_up(f_service_);
+      f_service_ <<
+        indent() << tservice->get_name() << "_" << tfunction->get_name() << "_result result;" << endl << endl <<
+        indent() << "try {" << endl;
+      indent_up();
+      f_service_ <<
+        indent() << "_throw->throw_it();" << endl <<
+        indent() << "return cob(false);" << endl;  // Is this possible?  TBD.
+      indent_down();
+      f_service_ <<
+        indent() << '}';
+      for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+        f_service_ << "  catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl;
+        indent_up();
+        f_service_ <<
+          indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl <<
+          indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
+        scope_down(f_service_);
+      }
+
+      // Serialize the result into a struct
+      f_service_ <<
+        endl <<
+        indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
+        indent() << "result.write(oprot);" << endl <<
+        indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "oprot->getTransport()->flush();" << endl <<
+        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "return cob(true);" << endl;
+
+      scope_down(f_service_);
+      f_service_ << endl;
+    } // for each function
+  } // cob style
 }
 
 /**
@@ -2261,7 +2670,7 @@
   vector<t_function*>::iterator f_iter;
   for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
     f_skeleton <<
-      indent() << function_signature(*f_iter) << " {" << endl <<
+      indent() << function_signature(*f_iter, "") << " {" << endl <<
       indent() << "  // Your implementation goes here" << endl <<
       indent() << "  printf(\"" << (*f_iter)->get_name() << "\\n\");" << endl <<
       indent() << "}" << endl <<
@@ -2924,21 +3333,46 @@
  * @return String of rendered function definition
  */
 string t_cpp_generator::function_signature(t_function* tfunction,
+                                           string style,
                                            string prefix,
                                            bool name_params) {
   t_type* ttype = tfunction->get_returntype();
   t_struct* arglist = tfunction->get_arglist();
+  bool has_xceptions = !tfunction->get_xceptions()->get_members().empty();
 
-  if (is_complex_type(ttype)) {
-    bool empty = arglist->get_members().size() == 0;
+  if (style == "") {
+    if (is_complex_type(ttype)) {
+      return
+        "void " + prefix + tfunction->get_name() +
+        "(" + type_name(ttype) + (name_params ? "& _return" : "& /* _return */") +
+        argument_list(arglist, name_params, true) + ")";
+    } else {
+      return
+        type_name(ttype) + " " + prefix + tfunction->get_name() +
+        "(" + argument_list(arglist, name_params) + ")";
+    }
+  } else if (style.substr(0,3) == "Cob") {
+    string cob_type;
+    string exn_cob;
+    if (style == "CobCl") {
+      cob_type = "(" + service_name_ + "CobClient* client)";
+    } else if (style =="CobSv") {
+      cob_type = (ttype->is_void()
+                  ? "()"
+                  : ("(" + type_name(ttype) + " const& _return)"));
+      if (has_xceptions) {
+        exn_cob = ", std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob";
+      }
+    } else {
+      throw "UNKNOWN STYLE";
+    }
+
     return
       "void " + prefix + tfunction->get_name() +
-      "(" + type_name(ttype) + (name_params ? "& _return" : "& /* _return */") +
-      (empty ? "" : (", " + argument_list(arglist, name_params))) + ")";
+      "(std::tr1::function<void" + cob_type + "> cob" + exn_cob +
+      argument_list(arglist, name_params, true) + ")";
   } else {
-    return
-      type_name(ttype) + " " + prefix + tfunction->get_name() +
-      "(" + argument_list(arglist, name_params) + ")";
+    throw "UNKNOWN STYLE";
   }
 }
 
@@ -2948,12 +3382,12 @@
  * @param tstruct The struct definition
  * @return Comma sepearated list of all field names in that struct
  */
-string t_cpp_generator::argument_list(t_struct* tstruct, bool name_params) {
+string t_cpp_generator::argument_list(t_struct* tstruct, bool name_params, bool start_comma) {
   string result = "";
 
   const vector<t_field*>& fields = tstruct->get_members();
   vector<t_field*>::const_iterator f_iter;
-  bool first = true;
+  bool first = !start_comma;
   for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
     if (first) {
       first = false;
diff --git a/configure.ac b/configure.ac
index bf8ef6d..320c250 100644
--- a/configure.ac
+++ b/configure.ac
@@ -311,6 +311,14 @@
   AX_SIGNED_RIGHT_SHIFT
 fi
 
+dnl autoscan thinks we need this macro because we have a member function
+dnl called "error".  Invoke the macro but don't run the check so autoscan
+dnl thinks we are in the clear.  It's highly unlikely that we will ever
+dnl actually use the function that this checks for.
+if false ; then
+  AC_FUNC_ERROR_AT_LINE
+fi
+
 AX_THRIFT_GEN(cpp, [C++], yes)
 AM_CONDITIONAL([THRIFT_GEN_cpp], [test "$ax_thrift_gen_cpp" = "yes"])
 AX_THRIFT_GEN(java, [Java], yes)
diff --git a/contrib/async-test/Makefile b/contrib/async-test/Makefile
new file mode 100644
index 0000000..33e7f8a
--- /dev/null
+++ b/contrib/async-test/Makefile
@@ -0,0 +1,33 @@
+THRIFT = thrift
+CXXFLAGS = `pkg-config --cflags thrift thrift-nb` -levent
+LDLIBS = `pkg-config --libs thrift thrift-nb` -levent
+CXXFLAGS += -g -O0
+
+GENNAMES = Aggr aggr_types
+GENHEADERS = $(addsuffix .h, $(GENNAMES))
+GENSRCS = $(addsuffix .cpp, $(GENNAMES))
+GENOBJS = $(addsuffix .o, $(GENNAMES))
+
+PYLIBS = aggr/__init__.py
+
+PROGS =  test-server
+
+all: $(PYLIBS) $(PROGS)
+
+test-server: test-server.o $(GENOBJS)
+
+test-server.o: $(GENSRCS)
+
+aggr/__init__.py: aggr.thrift
+	$(RM) $(dir $@)
+	$(THRIFT) --gen py:newstyle $<
+	mv gen-py/$(dir $@) .
+
+$(GENSRCS): aggr.thrift
+	$(THRIFT) --gen cpp:cob_style $<
+	mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) .
+
+clean:
+	$(RM) -r *.o $(PROGS) $(GENSRCS) $(GENHEADERS) gen-* aggr
+
+.PHONY: clean
diff --git a/contrib/async-test/aggr.thrift b/contrib/async-test/aggr.thrift
new file mode 100644
index 0000000..c016a65
--- /dev/null
+++ b/contrib/async-test/aggr.thrift
@@ -0,0 +1,8 @@
+exception Error {
+  1: string desc;
+}
+
+service Aggr {
+  void addValue(1: i32 value);
+  list<i32> getValues() throws (1: Error err);
+}
diff --git a/contrib/async-test/test-leaf.py b/contrib/async-test/test-leaf.py
new file mode 100755
index 0000000..8b7c3e3
--- /dev/null
+++ b/contrib/async-test/test-leaf.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+import sys
+import time
+from thrift.transport import TTransport
+from thrift.transport import TSocket
+from thrift.protocol import TBinaryProtocol
+from thrift.server import THttpServer
+from aggr import Aggr
+
+class AggrHandler(Aggr.Iface):
+  def __init__(self):
+    self.values = []
+
+  def addValue(self, value):
+    self.values.append(value)
+
+  def getValues(self, ):
+    time.sleep(1)
+    return self.values
+
+processor = Aggr.Processor(AggrHandler())
+pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+THttpServer.THttpServer(processor, ('', int(sys.argv[1])), pfactory).serve()
diff --git a/contrib/async-test/test-server.cpp b/contrib/async-test/test-server.cpp
new file mode 100644
index 0000000..a55c348
--- /dev/null
+++ b/contrib/async-test/test-server.cpp
@@ -0,0 +1,97 @@
+#include <tr1/functional>
+#include "protocol/TBinaryProtocol.h"
+#include "async/TAsyncProtocolProcessor.h"
+#include "async/TEvhttpServer.h"
+#include "async/TEvhttpClientChannel.h"
+#include "Aggr.h"
+
+using std::tr1::bind;
+using std::tr1::placeholders::_1;
+
+using apache::thrift::TException;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::async::TEvhttpServer;
+using apache::thrift::async::TAsyncProcessor;
+using apache::thrift::async::TAsyncBufferProcessor;
+using apache::thrift::async::TAsyncProtocolProcessor;
+using apache::thrift::async::TAsyncChannel;
+using apache::thrift::async::TEvhttpClientChannel;
+
+class AggrAsyncHandler : public AggrCobSvIf {
+ protected:
+  struct RequestContext {
+    std::tr1::function<void(std::vector<int32_t> const& _return)> cob;
+    std::vector<int32_t> ret;
+    int pending_calls;
+  };
+
+ public:
+  AggrAsyncHandler()
+    : eb_(NULL)
+    , pfact_(new TBinaryProtocolFactory())
+  {
+    leaf_ports_.push_back(8081);
+    leaf_ports_.push_back(8082);
+  }
+
+  void addValue(std::tr1::function<void()> cob, const int32_t value) {
+    // Silently drop writes to the aggrgator.
+    return cob();
+  }
+
+  void getValues(std::tr1::function<void(
+        std::vector<int32_t> const& _return)> cob,
+      std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
+    RequestContext* ctx = new RequestContext();
+    ctx->cob = cob;
+    ctx->pending_calls = leaf_ports_.size();
+    for (std::vector<int>::iterator it = leaf_ports_.begin();
+        it != leaf_ports_.end(); ++it) {
+      boost::shared_ptr<TAsyncChannel> channel(
+          new TEvhttpClientChannel(
+            "localhost", "/", "127.0.0.1", *it, eb_));
+      AggrCobClient* client = new AggrCobClient(channel, pfact_.get());
+      client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1));
+    }
+  }
+
+  void setEventBase(struct event_base* eb) {
+    eb_ = eb;
+  }
+
+  void clientReturn(RequestContext* ctx, AggrCobClient* client) {
+    ctx->pending_calls -= 1;
+
+    try {
+      std::vector<int32_t> subret;
+      client->recv_getValues(subret);
+      ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end());
+    } catch (TException& exn) {
+      // TODO: Log error
+    }
+
+    delete client;
+
+    if (ctx->pending_calls == 0) {
+      ctx->cob(ctx->ret);
+      delete ctx;
+    }
+  }
+
+ protected:
+  struct event_base* eb_;
+  std::vector<int> leaf_ports_;
+  boost::shared_ptr<TProtocolFactory> pfact_;
+};
+
+
+int main() {
+  boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler());
+  boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler));
+  boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory());
+  boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact));
+  boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080));
+  handler->setEventBase(server->getEventBase());
+  server->serve();
+}
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 664a58a..39382c8 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -70,9 +70,11 @@
                        src/server/TSimpleServer.cpp \
                        src/server/TThreadPoolServer.cpp \
                        src/server/TThreadedServer.cpp \
+											 src/async/TAsyncChannel.cpp \
                        src/processor/PeekProcessor.cpp
 
-libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp
+libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp \
+                         src/async/TAsyncProtocolProcessor.cpp
 
 libthriftz_la_SOURCES = src/transport/TZlibTransport.cpp
 
@@ -148,6 +150,17 @@
                          src/processor/PeekProcessor.h \
                          src/processor/StatsProcessor.h
 
+include_asyncdir = $(include_thriftdir)/async
+include_async_HEADERS = \
+                     src/async/TAsyncChannel.h \
+                     src/async/TAsyncProcessor.h \
+                     src/async/TAsyncBufferProcessor.h \
+                     src/async/TAsyncProtocolProcessor.h \
+                     src/async/TEvhttpClientChannel.h \
+                     src/async/TEvhttpServer.h \
+                     src/async/SimpleCallback.h
+
+
 noinst_PROGRAMS = concurrency_test
 
 concurrency_test_SOURCES = src/concurrency/test/Tests.cpp \
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index 22c10f1..f71a50b 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -119,4 +119,4 @@
 
 }} // apache::thrift
 
-#endif // #ifndef _THRIFT_PROCESSOR_H_
+#endif // #ifndef _THRIFT_TPROCESSOR_H_
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index 1bce23f..cb7d55a 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -110,6 +110,29 @@
 struct TypeSpec;
 }}
 
+class TDelayedException {
+ public:
+  template <class E> static TDelayedException* delayException(const E& e);
+  virtual void throw_it() = 0;
+  virtual ~TDelayedException() {};
+};
+
+template <class E> class TExceptionWrapper : public TDelayedException {
+ public:
+  TExceptionWrapper(const E& e) : e_(e) {}
+  virtual void throw_it() {
+    E temp(e_);
+    delete this;
+    throw temp;
+  }
+ private:
+  E e_;
+};
+
+template <class E>
+TDelayedException* TDelayedException::delayException(const E& e) {
+  return new TExceptionWrapper<E>(e);
+}
 
 }} // apache::thrift
 
diff --git a/lib/cpp/src/async/SimpleCallback.h b/lib/cpp/src/async/SimpleCallback.h
new file mode 100644
index 0000000..4218328
--- /dev/null
+++ b/lib/cpp/src/async/SimpleCallback.h
@@ -0,0 +1,98 @@
+#ifndef _THRIFT_ASYNC_SIMPLECALLBACK_H_
+#define _THRIFT_ASYNC_SIMPLECALLBACK_H_ 1
+
+#include <Thrift.h>
+namespace apache { namespace thrift {
+
+/**
+ * A template class for forming simple method callbacks with either an empty
+ * argument list or one argument of known type.
+ *
+ * For more efficiency where tr1::function is overkill.
+ */
+
+template<typename C,              ///< class whose method we wish to wrap
+         typename A = void,       ///< type of argument
+         typename R = void>       ///< type of return value
+class SimpleCallback {
+  typedef R (C::*cfptr_t)(A);     ///< pointer-to-member-function type
+  cfptr_t fptr_;                  ///< the embedded function pointer
+  C* obj_;                        ///< object whose function we're wrapping
+ public:
+  /**
+   * Constructor for empty callback object.
+   */
+  SimpleCallback() :
+    fptr_(NULL), obj_(NULL) {}
+  /**
+   * Construct callback wrapper for member function.
+   *
+   * @param fptr pointer-to-member-function
+   * @param "this" for object associated with callback
+   */
+  SimpleCallback(cfptr_t fptr, const C* obj) :
+    fptr_(fptr), obj_(const_cast<C*>(obj))
+  {}
+
+  /**
+   * Make a call to the member function we've wrapped.
+   *
+   * @param i argument for the wrapped member function
+   * @return value from that function
+   */
+  R operator()(A i) const {
+    (obj_->*fptr_)(i);
+  }
+
+  operator bool() const {
+    return obj_ != NULL && fptr_ != NULL;
+  }
+
+  ~SimpleCallback() {}
+};
+
+/**
+ * Specialization of SimpleCallback for empty argument list.
+ */
+template<typename C,              ///< class whose method we wish to wrap
+         typename R>              ///< type of return value
+class SimpleCallback<C, void, R> {
+  typedef R (C::*cfptr_t)();      ///< pointer-to-member-function type
+  cfptr_t fptr_;                  ///< the embedded function pointer
+  C* obj_;                        ///< object whose function we're wrapping
+ public:
+  /**
+   * Constructor for empty callback object.
+   */
+  SimpleCallback() :
+    fptr_(NULL), obj_(NULL) {}
+
+  /**
+   * Construct callback wrapper for member function.
+   *
+   * @param fptr pointer-to-member-function
+   * @param obj "this" for object associated with callback
+   */
+  SimpleCallback(cfptr_t fptr, const C* obj) :
+    fptr_(fptr), obj_(const_cast<C*>(obj))
+  {}
+
+  /**
+   * Make a call to the member function we've wrapped.
+   *
+   * @return value from that function
+   */
+  R operator()() const {
+    (obj_->*fptr_)();
+  }
+
+  operator bool() const {
+    return obj_ != NULL && fptr_ != NULL;
+  }
+
+  ~SimpleCallback() {}
+};
+
+}} // apache::thrift
+
+#endif /* !_THRIFT_ASYNC_SIMPLECALLBACK_H_ */
diff --git a/lib/cpp/src/async/TAsyncBufferProcessor.h b/lib/cpp/src/async/TAsyncBufferProcessor.h
new file mode 100644
index 0000000..06a503e
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncBufferProcessor.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_
+#define _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ 1
+
+#include <tr1/functional>
+#include <boost/shared_ptr.hpp>
+
+#include "transport/TBufferTransports.h"
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncBufferProcessor {
+ public:
+  // Process data in "in", putting the result in "out".
+  // Call _return(true) when done, or _return(false) to
+  // forcefully close the connection (if applicable).
+  // "in" and "out" should be TMemoryBuffer or similar,
+  // not a wrapper around a socket.
+  virtual void process(
+      std::tr1::function<void(bool healthy)> _return,
+      boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf,
+      boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf) = 0;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_
diff --git a/lib/cpp/src/async/TAsyncChannel.cpp b/lib/cpp/src/async/TAsyncChannel.cpp
new file mode 100644
index 0000000..2bf02fe
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncChannel.cpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <async/TAsyncChannel.h>
+#include <tr1/functional>
+
+namespace apache { namespace thrift { namespace async {
+
+bool TAsyncChannel::sendAndRecvMessage(const VoidCallback& cob,
+                                       TMemoryBuffer* sendBuf,
+                                       TMemoryBuffer* recvBuf) {
+  std::tr1::function<void()> send_done =
+    std::tr1::bind(&TAsyncChannel::recvMessage, this, cob, recvBuf);
+
+  return sendMessage(send_done, sendBuf);
+}
+
+}}}  // apache::thrift::async
diff --git a/lib/cpp/src/async/TAsyncChannel.h b/lib/cpp/src/async/TAsyncChannel.h
new file mode 100644
index 0000000..d5cd419
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncChannel.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_
+#define _THRIFT_ASYNC_TASYNCCHANNEL_H_ 1
+
+#include <tr1/functional>
+#include <Thrift.h>
+#include <transport/TTransportUtils.h>
+
+namespace apache { namespace thrift { namespace transport {
+class TMemoryBuffer;
+}}}
+
+namespace apache { namespace thrift { namespace async {
+using apache::thrift::transport::TMemoryBuffer;
+
+class TAsyncTransport;
+
+class TAsyncChannel {
+ public:
+  typedef std::tr1::function<void()> VoidCallback;
+
+  virtual ~TAsyncChannel() {}
+
+  // is the channel in a good state?
+  virtual bool good() const = 0;
+  virtual bool error() const = 0;
+  virtual bool timedOut() const = 0;
+
+  /**
+   * Send a message over the channel.
+   *
+   * @return  true iff the cob has been or will be called, else false
+   */
+  virtual bool sendMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0;
+
+  /**
+   * Receive a message from the channel.
+   *
+   * @return  true iff the cob has been or will be called, else false
+   */
+  virtual bool recvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0;
+
+  /**
+   * Send a message over the channel and receive a response.
+   *
+   * @return  true iff the cob has been or will be called, else false
+   */
+  virtual bool sendAndRecvMessage(const VoidCallback& cob,
+                                  apache::thrift::transport::TMemoryBuffer* sendBuf,
+                                  apache::thrift::transport::TMemoryBuffer* recvBuf);
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_
diff --git a/lib/cpp/src/async/TAsyncProcessor.h b/lib/cpp/src/async/TAsyncProcessor.h
new file mode 100644
index 0000000..abf5816
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncProcessor.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TASYNCPROCESSOR_H_
+#define _THRIFT_TASYNCPROCESSOR_H_ 1
+
+#include <tr1/functional>
+#include <boost/shared_ptr.hpp>
+#include <protocol/TProtocol.h>
+#include <TProcessor.h>
+
+namespace apache { namespace thrift { namespace async {
+
+/**
+ * Async version of a TProcessor.  It is not expected to complete by the time
+ * the call to process returns.  Instead, it calls a cob to signal completion.
+ */
+class TAsyncProcessor {
+ public:
+  virtual ~TAsyncProcessor() {}
+
+  virtual void process(std::tr1::function<void(bool success)> _return,
+                       boost::shared_ptr<protocol::TProtocol> in,
+                       boost::shared_ptr<protocol::TProtocol> out) = 0;
+
+  void process(std::tr1::function<void(bool success)> _return,
+               boost::shared_ptr<apache::thrift::protocol::TProtocol> io) {
+    return process(_return, io, io);
+  }
+
+ protected:
+  TAsyncProcessor() {}
+};
+
+}}} // apache::thrift::async
+
+// XXX I'm lazy for now
+namespace apache { namespace thrift {
+using apache::thrift::async::TAsyncProcessor;
+}}
+
+#endif // #ifndef _THRIFT_TASYNCPROCESSOR_H_
diff --git a/lib/cpp/src/async/TAsyncProtocolProcessor.cpp b/lib/cpp/src/async/TAsyncProtocolProcessor.cpp
new file mode 100644
index 0000000..05d504b
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncProtocolProcessor.cpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TAsyncProtocolProcessor.h"
+
+using apache::thrift::transport::TBufferBase;
+using apache::thrift::protocol::TProtocol;
+
+namespace apache { namespace thrift { namespace async {
+
+void TAsyncProtocolProcessor::process(
+    std::tr1::function<void(bool healthy)> _return,
+    boost::shared_ptr<TBufferBase> ibuf,
+    boost::shared_ptr<TBufferBase> obuf) {
+  boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf));
+  boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf));
+  return underlying_->process(
+      std::tr1::bind(
+        &TAsyncProtocolProcessor::finish,
+        _return,
+        oprot,
+        std::tr1::placeholders::_1),
+      iprot, oprot);
+}
+
+/* static */ void TAsyncProtocolProcessor::finish(
+    std::tr1::function<void(bool healthy)> _return,
+    boost::shared_ptr<TProtocol> oprot,
+    bool healthy) {
+  // This is a stub function to hold a reference to oprot.
+  return _return(healthy);
+}
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TAsyncProtocolProcessor.h b/lib/cpp/src/async/TAsyncProtocolProcessor.h
new file mode 100644
index 0000000..7ec718b
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncProtocolProcessor.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TNAME_ME_H_
+#define _THRIFT_TNAME_ME_H_ 1
+
+#include "TAsyncProcessor.h"
+#include "TAsyncBufferProcessor.h"
+#include "protocol/TProtocol.h"
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncProtocolProcessor : public TAsyncBufferProcessor {
+ public:
+  TAsyncProtocolProcessor(
+      boost::shared_ptr<TAsyncProcessor> underlying,
+      boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact)
+    : underlying_(underlying)
+    , pfact_(pfact)
+  {}
+
+  virtual void process(
+      std::tr1::function<void(bool healthy)> _return,
+      boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf,
+      boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf);
+
+ private:
+  static void finish(
+      std::tr1::function<void(bool healthy)> _return,
+      boost::shared_ptr<apache::thrift::protocol::TProtocol> oprot,
+      bool healthy);
+
+  boost::shared_ptr<TAsyncProcessor> underlying_;
+  boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact_;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TNAME_ME_H_
diff --git a/lib/cpp/src/async/TEvhttpClientChannel.cpp b/lib/cpp/src/async/TEvhttpClientChannel.cpp
new file mode 100644
index 0000000..54676a1
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpClientChannel.cpp
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TEvhttpClientChannel.h"
+#include <evhttp.h>
+
+namespace apache { namespace thrift { namespace async {
+
+
+TEvhttpClientChannel::TEvhttpClientChannel(
+    const std::string& host,
+    const std::string& path,
+    const char* address,
+    int port,
+    struct event_base* eb)
+  : host_(host)
+  , path_(path)
+  , recvBuf_(NULL)
+  , conn_(NULL)
+{
+  conn_ = evhttp_connection_new(address, port);
+  if (conn_ == NULL) {
+    abort(); // XXX
+  }
+  evhttp_connection_set_base(conn_, eb);
+}
+
+
+TEvhttpClientChannel::~TEvhttpClientChannel() {
+  if (conn_ != NULL) {
+    evhttp_connection_free(conn_);
+  }
+}
+
+
+bool TEvhttpClientChannel::sendAndRecvMessage(
+    const VoidCallback& cob,
+    apache::thrift::transport::TMemoryBuffer* sendBuf,
+    apache::thrift::transport::TMemoryBuffer* recvBuf) {
+  cob_ = cob;
+  recvBuf_ = recvBuf;
+
+  struct evhttp_request* req = evhttp_request_new(response, this);
+  if (req == NULL) {
+    abort(); // XXX
+  }
+
+  int rv;
+
+  rv = evhttp_add_header(req->output_headers, "Host", host_.c_str());
+  if (rv != 0) {
+    abort(); // XXX
+  }
+
+  rv = evhttp_add_header(req->output_headers, "Content-Type", "application/x-thrift");
+  if (rv != 0) {
+    abort(); // XXX
+  }
+
+  uint8_t* obuf;
+  uint32_t sz;
+  sendBuf->getBuffer(&obuf, &sz);
+  rv = evbuffer_add(req->output_buffer, obuf, sz);
+  if (rv != 0) {
+    abort(); // XXX
+  }
+
+  rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());
+  if (rv != 0) {
+    abort(); // XXX
+  }
+
+  return true;
+}
+
+
+bool TEvhttpClientChannel::sendMessage(
+    const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) {
+  abort(); // XXX
+}
+
+
+bool TEvhttpClientChannel::recvMessage(
+    const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) {
+  abort(); // XXX
+}
+
+
+void TEvhttpClientChannel::finish(struct evhttp_request* req) {
+  if (req == NULL) {
+    return cob_();
+  } else if (req->response_code != 200) {
+    return cob_();
+  }
+  recvBuf_->resetBuffer(
+      EVBUFFER_DATA(req->input_buffer),
+      EVBUFFER_LENGTH(req->input_buffer));
+  return cob_();
+}
+
+
+/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) {
+  TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg;
+  self->finish(req);
+}
+
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TEvhttpClientChannel.h b/lib/cpp/src/async/TEvhttpClientChannel.h
new file mode 100644
index 0000000..d2bc4b3
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpClientChannel.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_
+#define _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ 1
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "TAsyncChannel.h"
+
+struct event_base;
+struct evhttp_connection;
+struct evhttp_request;
+
+namespace apache { namespace thrift { namespace transport {
+class TMemoryBuffer;
+}}}
+
+namespace apache { namespace thrift { namespace async {
+
+class TEvhttpClientChannel : public TAsyncChannel {
+ public:
+  using TAsyncChannel::VoidCallback;
+
+  TEvhttpClientChannel(
+      const std::string& host,
+      const std::string& path,
+      const char* address,
+      int port,
+      struct event_base* eb);
+  ~TEvhttpClientChannel();
+
+  virtual bool sendAndRecvMessage(const VoidCallback& cob,
+                                  apache::thrift::transport::TMemoryBuffer* sendBuf,
+                                  apache::thrift::transport::TMemoryBuffer* recvBuf);
+
+  virtual bool sendMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message);
+  virtual bool recvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message);
+
+  void finish(struct evhttp_request* req);
+
+  //XXX
+  virtual bool good() const { return true; }
+  virtual bool error() const { return false; }
+  virtual bool timedOut() const { return false; }
+
+ private:
+  static void response(struct evhttp_request* req, void* arg);
+
+  std::string host_;
+  std::string path_;
+  VoidCallback cob_;
+  apache::thrift::transport::TMemoryBuffer* recvBuf_;
+  struct evhttp_connection* conn_;
+
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_
diff --git a/lib/cpp/src/async/TEvhttpServer.cpp b/lib/cpp/src/async/TEvhttpServer.cpp
new file mode 100644
index 0000000..2997597
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpServer.cpp
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TEvhttpServer.h"
+#include "TAsyncBufferProcessor.h"
+#include "transport/TBufferTransports.h"
+#include <evhttp.h>
+
+using apache::thrift::transport::TMemoryBuffer;
+
+namespace apache { namespace thrift { namespace async {
+
+
+struct TEvhttpServer::RequestContext {
+  struct evhttp_request* req;
+  boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> ibuf;
+  boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> obuf;
+
+  RequestContext(struct evhttp_request* req);
+};
+
+
+TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor)
+  : processor_(processor)
+  , eb_(NULL)
+  , eh_(NULL)
+{}
+
+
+TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port)
+  : processor_(processor)
+  , eb_(NULL)
+  , eh_(NULL)
+{
+  // Create event_base and evhttp.
+  eb_ = event_base_new();
+  if (eb_ == NULL) {
+    abort();  // XXX
+  }
+  eh_ = evhttp_new(eb_);
+  if (eh_ == NULL) {
+    event_base_free(eb_);
+    abort();  // XXX
+  }
+
+  // Bind to port.
+  int ret = evhttp_bind_socket(eh_, NULL, port);
+  if (ret < 0) {
+    evhttp_free(eh_);
+    event_base_free(eb_);
+  }
+
+  // Register a handler.  If you use the other constructor,
+  // you will want to do this yourself.
+  // Don't forget to unregister before destorying this TEvhttpServer.
+  evhttp_set_cb(eh_, "/", request, (void*)this);
+}
+
+
+TEvhttpServer::~TEvhttpServer() {
+  if (eh_ != NULL) {
+    evhttp_free(eh_);
+  }
+  if (eb_ != NULL) {
+    event_base_free(eb_);
+  }
+}
+
+
+int TEvhttpServer::serve() {
+  if (eb_ == NULL) {
+    abort();  // XXX
+  }
+  return event_base_dispatch(eb_);
+}
+
+
+TEvhttpServer::RequestContext::RequestContext(struct evhttp_request* req) : req(req)
+  , ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer), EVBUFFER_LENGTH(req->input_buffer)))
+  , obuf(new TMemoryBuffer())
+{}
+
+
+void TEvhttpServer::request(struct evhttp_request* req, void* self) {
+  static_cast<TEvhttpServer*>(self)->process(req);
+}
+
+
+void TEvhttpServer::process(struct evhttp_request* req) {
+  RequestContext* ctx = new RequestContext(req);
+  return processor_->process(
+      std::tr1::bind(
+        &TEvhttpServer::complete,
+        this,
+        ctx,
+        std::tr1::placeholders::_1),
+      ctx->ibuf,
+      ctx->obuf);
+}
+
+
+void TEvhttpServer::complete(RequestContext* ctx, bool success) {
+  std::auto_ptr<RequestContext> ptr(ctx);
+
+  int code = 200;
+  const char* reason = "OK";
+
+  int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");
+  if (rv != 0) {
+    // TODO: Log an error.
+  }
+
+  struct evbuffer* buf = evbuffer_new();
+  if (buf == NULL) {
+    // TODO: Log an error.
+  } else {
+    uint8_t* obuf;
+    uint32_t sz;
+    ctx->obuf->getBuffer(&obuf, &sz);
+    int ret = evbuffer_add(buf, obuf, sz);
+    if (ret != 0) {
+      // TODO: Log an error.
+    }
+  }
+
+  evhttp_send_reply(ctx->req, code, reason, buf);
+  if (buf != NULL) {
+    evbuffer_free(buf);
+  }
+}
+
+
+struct event_base* TEvhttpServer::getEventBase() {
+  return eb_;
+}
+
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TEvhttpServer.h b/lib/cpp/src/async/TEvhttpServer.h
new file mode 100644
index 0000000..edc6ffb
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpServer.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TEVHTTP_SERVER_H_
+#define _THRIFT_TEVHTTP_SERVER_H_ 1
+
+#include <boost/shared_ptr.hpp>
+
+struct event_base;
+struct evhttp;
+struct evhttp_request;
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncBufferProcessor;
+
+class TEvhttpServer {
+ public:
+  /**
+   * Create a TEvhttpServer for use with an external evhttp instance.
+   * Must be manually installed with evhttp_set_cb, using
+   * TEvhttpServer::request as the callback and the
+   * address of the server as the extra arg.
+   * Do not call "serve" on this server.
+   */
+  TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor);
+
+  /**
+   * Create a TEvhttpServer with an embedded event_base and evhttp,
+   * listening on port and responding on the endpoint "/".
+   * Call "serve" on this server to serve forever.
+   */
+  TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port);
+
+  ~TEvhttpServer();
+
+  static void request(struct evhttp_request* req, void* self);
+  int serve();
+
+  struct event_base* getEventBase();
+
+ private:
+  struct RequestContext;
+
+  void process(struct evhttp_request* req);
+  void complete(RequestContext* ctx, bool success);
+
+  boost::shared_ptr<TAsyncBufferProcessor> processor_;
+  struct event_base* eb_;
+  struct evhttp* eh_;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TEVHTTP_SERVER_H_