blob: a55c348445d8c4d1b2c9452b215882206a792f13 [file] [log] [blame]
David Reiss5ddabb82010-10-06 17:09:37 +00001#include <tr1/functional>
2#include "protocol/TBinaryProtocol.h"
3#include "async/TAsyncProtocolProcessor.h"
4#include "async/TEvhttpServer.h"
5#include "async/TEvhttpClientChannel.h"
6#include "Aggr.h"
7
8using std::tr1::bind;
9using std::tr1::placeholders::_1;
10
11using apache::thrift::TException;
12using apache::thrift::protocol::TBinaryProtocolFactory;
13using apache::thrift::protocol::TProtocolFactory;
14using apache::thrift::async::TEvhttpServer;
15using apache::thrift::async::TAsyncProcessor;
16using apache::thrift::async::TAsyncBufferProcessor;
17using apache::thrift::async::TAsyncProtocolProcessor;
18using apache::thrift::async::TAsyncChannel;
19using apache::thrift::async::TEvhttpClientChannel;
20
21class AggrAsyncHandler : public AggrCobSvIf {
22 protected:
23 struct RequestContext {
24 std::tr1::function<void(std::vector<int32_t> const& _return)> cob;
25 std::vector<int32_t> ret;
26 int pending_calls;
27 };
28
29 public:
30 AggrAsyncHandler()
31 : eb_(NULL)
32 , pfact_(new TBinaryProtocolFactory())
33 {
34 leaf_ports_.push_back(8081);
35 leaf_ports_.push_back(8082);
36 }
37
38 void addValue(std::tr1::function<void()> cob, const int32_t value) {
39 // Silently drop writes to the aggrgator.
40 return cob();
41 }
42
43 void getValues(std::tr1::function<void(
44 std::vector<int32_t> const& _return)> cob,
45 std::tr1::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
46 RequestContext* ctx = new RequestContext();
47 ctx->cob = cob;
48 ctx->pending_calls = leaf_ports_.size();
49 for (std::vector<int>::iterator it = leaf_ports_.begin();
50 it != leaf_ports_.end(); ++it) {
51 boost::shared_ptr<TAsyncChannel> channel(
52 new TEvhttpClientChannel(
53 "localhost", "/", "127.0.0.1", *it, eb_));
54 AggrCobClient* client = new AggrCobClient(channel, pfact_.get());
55 client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1));
56 }
57 }
58
59 void setEventBase(struct event_base* eb) {
60 eb_ = eb;
61 }
62
63 void clientReturn(RequestContext* ctx, AggrCobClient* client) {
64 ctx->pending_calls -= 1;
65
66 try {
67 std::vector<int32_t> subret;
68 client->recv_getValues(subret);
69 ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end());
70 } catch (TException& exn) {
71 // TODO: Log error
72 }
73
74 delete client;
75
76 if (ctx->pending_calls == 0) {
77 ctx->cob(ctx->ret);
78 delete ctx;
79 }
80 }
81
82 protected:
83 struct event_base* eb_;
84 std::vector<int> leaf_ports_;
85 boost::shared_ptr<TProtocolFactory> pfact_;
86};
87
88
89int main() {
90 boost::shared_ptr<AggrAsyncHandler> handler(new AggrAsyncHandler());
91 boost::shared_ptr<TAsyncProcessor> proc(new AggrAsyncProcessor(handler));
92 boost::shared_ptr<TProtocolFactory> pfact(new TBinaryProtocolFactory());
93 boost::shared_ptr<TAsyncBufferProcessor> bufproc(new TAsyncProtocolProcessor(proc, pfact));
94 boost::shared_ptr<TEvhttpServer> server(new TEvhttpServer(bufproc, 8080));
95 handler->setEventBase(server->getEventBase());
96 server->serve();
97}