David Reiss | 5ddabb8 | 2010-10-06 17:09:37 +0000 | [diff] [blame] | 1 | #include <tr1/functional> |
| 2 | #include "protocol/TBinaryProtocol.h" |
| 3 | #include "async/TAsyncProtocolProcessor.h" |
| 4 | #include "async/TEvhttpServer.h" |
| 5 | #include "async/TEvhttpClientChannel.h" |
| 6 | #include "Aggr.h" |
| 7 | |
| 8 | using std::tr1::bind; |
| 9 | using std::tr1::placeholders::_1; |
| 10 | |
| 11 | using apache::thrift::TException; |
| 12 | using apache::thrift::protocol::TBinaryProtocolFactory; |
| 13 | using apache::thrift::protocol::TProtocolFactory; |
| 14 | using apache::thrift::async::TEvhttpServer; |
| 15 | using apache::thrift::async::TAsyncProcessor; |
| 16 | using apache::thrift::async::TAsyncBufferProcessor; |
| 17 | using apache::thrift::async::TAsyncProtocolProcessor; |
| 18 | using apache::thrift::async::TAsyncChannel; |
| 19 | using apache::thrift::async::TEvhttpClientChannel; |
| 20 | |
| 21 | class AggrAsyncHandler : public AggrCobSvIf { |
| 22 | protected: |
| 23 | struct RequestContext { |
| 24 | std::tr1::function<void(std::vector<int32_t> const& _return)> cob; |
| 25 | std::vector<int32_t> ret; |
| 26 | int pending_calls; |
| 27 | }; |
| 28 | |
| 29 | public: |
| 30 | AggrAsyncHandler() |
| 31 | : eb_(NULL) |
| 32 | , pfact_(new TBinaryProtocolFactory()) |
| 33 | { |
| 34 | leaf_ports_.push_back(8081); |
| 35 | leaf_ports_.push_back(8082); |
| 36 | } |
| 37 | |
| 38 | void addValue(std::tr1::function<void()> cob, const int32_t value) { |
| 39 | // Silently drop writes to the aggrgator. |
| 40 | return cob(); |
| 41 | } |
| 42 | |
| 43 | void getValues(std::tr1::function<void( |
| 44 | std::vector<int32_t> const& _return)> cob, |
| 45 | std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) { |
| 46 | RequestContext* ctx = new RequestContext(); |
| 47 | ctx->cob = cob; |
| 48 | ctx->pending_calls = leaf_ports_.size(); |
| 49 | for (std::vector<int>::iterator it = leaf_ports_.begin(); |
| 50 | it != leaf_ports_.end(); ++it) { |
| 51 | boost::shared_ptr<TAsyncChannel> channel( |
| 52 | new TEvhttpClientChannel( |
| 53 | "localhost", "/", "127.0.0.1", *it, eb_)); |
| 54 | AggrCobClient* client = new AggrCobClient(channel, pfact_.get()); |
| 55 | client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1)); |
| 56 | } |
| 57 | } |
| 58 | |
| 59 | void setEventBase(struct event_base* eb) { |
| 60 | eb_ = eb; |
| 61 | } |
| 62 | |
| 63 | void clientReturn(RequestContext* ctx, AggrCobClient* client) { |
| 64 | ctx->pending_calls -= 1; |
| 65 | |
| 66 | try { |
| 67 | std::vector<int32_t> subret; |
| 68 | client->recv_getValues(subret); |
| 69 | ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end()); |
| 70 | } catch (TException& exn) { |
| 71 | // TODO: Log error |
| 72 | } |
| 73 | |
| 74 | delete client; |
| 75 | |
| 76 | if (ctx->pending_calls == 0) { |
| 77 | ctx->cob(ctx->ret); |
| 78 | delete ctx; |
| 79 | } |
| 80 | } |
| 81 | |
| 82 | protected: |
| 83 | struct event_base* eb_; |
| 84 | std::vector<int> leaf_ports_; |
| 85 | boost::shared_ptr<TProtocolFactory> pfact_; |
| 86 | }; |
| 87 | |
| 88 | |
| 89 | int main() { |
| 90 | boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler()); |
| 91 | boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler)); |
| 92 | boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory()); |
| 93 | boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact)); |
| 94 | boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080)); |
| 95 | handler->setEventBase(server->getEventBase()); |
| 96 | server->serve(); |
| 97 | } |