THRIFT-923. cpp: Implement a fully nonblocking server and client

There are three major parts of this:
1/ New callback-style interfaces for for a few key Thrift components:
   TAsyncProcessor for servers and TAsyncChannel for clients.
2/ Concrete implementations of TAsyncChannel and a server for
   TAsyncProcessor based on evhttp.
3/ Async-style code generation for C++

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005127 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/contrib/async-test/Makefile b/contrib/async-test/Makefile
new file mode 100644
index 0000000..33e7f8a
--- /dev/null
+++ b/contrib/async-test/Makefile
@@ -0,0 +1,33 @@
+THRIFT = thrift
+CXXFLAGS = `pkg-config --cflags thrift thrift-nb` -levent
+LDLIBS = `pkg-config --libs thrift thrift-nb` -levent
+CXXFLAGS += -g -O0
+
+GENNAMES = Aggr aggr_types
+GENHEADERS = $(addsuffix .h, $(GENNAMES))
+GENSRCS = $(addsuffix .cpp, $(GENNAMES))
+GENOBJS = $(addsuffix .o, $(GENNAMES))
+
+PYLIBS = aggr/__init__.py
+
+PROGS =  test-server
+
+all: $(PYLIBS) $(PROGS)
+
+test-server: test-server.o $(GENOBJS)
+
+test-server.o: $(GENSRCS)
+
+aggr/__init__.py: aggr.thrift
+	$(RM) $(dir $@)
+	$(THRIFT) --gen py:newstyle $<
+	mv gen-py/$(dir $@) .
+
+$(GENSRCS): aggr.thrift
+	$(THRIFT) --gen cpp:cob_style $<
+	mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) .
+
+clean:
+	$(RM) -r *.o $(PROGS) $(GENSRCS) $(GENHEADERS) gen-* aggr
+
+.PHONY: clean
diff --git a/contrib/async-test/aggr.thrift b/contrib/async-test/aggr.thrift
new file mode 100644
index 0000000..c016a65
--- /dev/null
+++ b/contrib/async-test/aggr.thrift
@@ -0,0 +1,8 @@
+exception Error {
+  1: string desc;
+}
+
+service Aggr {
+  void addValue(1: i32 value);
+  list<i32> getValues() throws (1: Error err);
+}
diff --git a/contrib/async-test/test-leaf.py b/contrib/async-test/test-leaf.py
new file mode 100755
index 0000000..8b7c3e3
--- /dev/null
+++ b/contrib/async-test/test-leaf.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+import sys
+import time
+from thrift.transport import TTransport
+from thrift.transport import TSocket
+from thrift.protocol import TBinaryProtocol
+from thrift.server import THttpServer
+from aggr import Aggr
+
+class AggrHandler(Aggr.Iface):
+  def __init__(self):
+    self.values = []
+
+  def addValue(self, value):
+    self.values.append(value)
+
+  def getValues(self, ):
+    time.sleep(1)
+    return self.values
+
+processor = Aggr.Processor(AggrHandler())
+pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+THttpServer.THttpServer(processor, ('', int(sys.argv[1])), pfactory).serve()
diff --git a/contrib/async-test/test-server.cpp b/contrib/async-test/test-server.cpp
new file mode 100644
index 0000000..a55c348
--- /dev/null
+++ b/contrib/async-test/test-server.cpp
@@ -0,0 +1,97 @@
+#include <tr1/functional>
+#include "protocol/TBinaryProtocol.h"
+#include "async/TAsyncProtocolProcessor.h"
+#include "async/TEvhttpServer.h"
+#include "async/TEvhttpClientChannel.h"
+#include "Aggr.h"
+
+using std::tr1::bind;
+using std::tr1::placeholders::_1;
+
+using apache::thrift::TException;
+using apache::thrift::protocol::TBinaryProtocolFactory;
+using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::async::TEvhttpServer;
+using apache::thrift::async::TAsyncProcessor;
+using apache::thrift::async::TAsyncBufferProcessor;
+using apache::thrift::async::TAsyncProtocolProcessor;
+using apache::thrift::async::TAsyncChannel;
+using apache::thrift::async::TEvhttpClientChannel;
+
+class AggrAsyncHandler : public AggrCobSvIf {
+ protected:
+  struct RequestContext {
+    std::tr1::function<void(std::vector<int32_t> const& _return)> cob;
+    std::vector<int32_t> ret;
+    int pending_calls;
+  };
+
+ public:
+  AggrAsyncHandler()
+    : eb_(NULL)
+    , pfact_(new TBinaryProtocolFactory())
+  {
+    leaf_ports_.push_back(8081);
+    leaf_ports_.push_back(8082);
+  }
+
+  void addValue(std::tr1::function<void()> cob, const int32_t value) {
+    // Silently drop writes to the aggrgator.
+    return cob();
+  }
+
+  void getValues(std::tr1::function<void(
+        std::vector<int32_t> const& _return)> cob,
+      std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
+    RequestContext* ctx = new RequestContext();
+    ctx->cob = cob;
+    ctx->pending_calls = leaf_ports_.size();
+    for (std::vector<int>::iterator it = leaf_ports_.begin();
+        it != leaf_ports_.end(); ++it) {
+      boost::shared_ptr<TAsyncChannel> channel(
+          new TEvhttpClientChannel(
+            "localhost", "/", "127.0.0.1", *it, eb_));
+      AggrCobClient* client = new AggrCobClient(channel, pfact_.get());
+      client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1));
+    }
+  }
+
+  void setEventBase(struct event_base* eb) {
+    eb_ = eb;
+  }
+
+  void clientReturn(RequestContext* ctx, AggrCobClient* client) {
+    ctx->pending_calls -= 1;
+
+    try {
+      std::vector<int32_t> subret;
+      client->recv_getValues(subret);
+      ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end());
+    } catch (TException& exn) {
+      // TODO: Log error
+    }
+
+    delete client;
+
+    if (ctx->pending_calls == 0) {
+      ctx->cob(ctx->ret);
+      delete ctx;
+    }
+  }
+
+ protected:
+  struct event_base* eb_;
+  std::vector<int> leaf_ports_;
+  boost::shared_ptr<TProtocolFactory> pfact_;
+};
+
+
+int main() {
+  boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler());
+  boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler));
+  boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory());
+  boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact));
+  boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080));
+  handler->setEventBase(server->getEventBase());
+  server->serve();
+}