THRIFT-923. cpp: Implement a fully nonblocking server and client
There are three major parts of this:
1/ New callback-style interfaces for for a few key Thrift components:
TAsyncProcessor for servers and TAsyncChannel for clients.
2/ Concrete implementations of TAsyncChannel and a server for
TAsyncProcessor based on evhttp.
3/ Async-style code generation for C++
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005127 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/async/SimpleCallback.h b/lib/cpp/src/async/SimpleCallback.h
new file mode 100644
index 0000000..4218328
--- /dev/null
+++ b/lib/cpp/src/async/SimpleCallback.h
@@ -0,0 +1,98 @@
+#ifndef _THRIFT_ASYNC_SIMPLECALLBACK_H_
+#define _THRIFT_ASYNC_SIMPLECALLBACK_H_ 1
+
+#include <Thrift.h>
+namespace apache { namespace thrift {
+
+/**
+ * A template class for forming simple method callbacks with either an empty
+ * argument list or one argument of known type.
+ *
+ * For more efficiency where tr1::function is overkill.
+ */
+
+template<typename C, ///< class whose method we wish to wrap
+ typename A = void, ///< type of argument
+ typename R = void> ///< type of return value
+class SimpleCallback {
+ typedef R (C::*cfptr_t)(A); ///< pointer-to-member-function type
+ cfptr_t fptr_; ///< the embedded function pointer
+ C* obj_; ///< object whose function we're wrapping
+ public:
+ /**
+ * Constructor for empty callback object.
+ */
+ SimpleCallback() :
+ fptr_(NULL), obj_(NULL) {}
+ /**
+ * Construct callback wrapper for member function.
+ *
+ * @param fptr pointer-to-member-function
+ * @param "this" for object associated with callback
+ */
+ SimpleCallback(cfptr_t fptr, const C* obj) :
+ fptr_(fptr), obj_(const_cast<C*>(obj))
+ {}
+
+ /**
+ * Make a call to the member function we've wrapped.
+ *
+ * @param i argument for the wrapped member function
+ * @return value from that function
+ */
+ R operator()(A i) const {
+ (obj_->*fptr_)(i);
+ }
+
+ operator bool() const {
+ return obj_ != NULL && fptr_ != NULL;
+ }
+
+ ~SimpleCallback() {}
+};
+
+/**
+ * Specialization of SimpleCallback for empty argument list.
+ */
+template<typename C, ///< class whose method we wish to wrap
+ typename R> ///< type of return value
+class SimpleCallback<C, void, R> {
+ typedef R (C::*cfptr_t)(); ///< pointer-to-member-function type
+ cfptr_t fptr_; ///< the embedded function pointer
+ C* obj_; ///< object whose function we're wrapping
+ public:
+ /**
+ * Constructor for empty callback object.
+ */
+ SimpleCallback() :
+ fptr_(NULL), obj_(NULL) {}
+
+ /**
+ * Construct callback wrapper for member function.
+ *
+ * @param fptr pointer-to-member-function
+ * @param obj "this" for object associated with callback
+ */
+ SimpleCallback(cfptr_t fptr, const C* obj) :
+ fptr_(fptr), obj_(const_cast<C*>(obj))
+ {}
+
+ /**
+ * Make a call to the member function we've wrapped.
+ *
+ * @return value from that function
+ */
+ R operator()() const {
+ (obj_->*fptr_)();
+ }
+
+ operator bool() const {
+ return obj_ != NULL && fptr_ != NULL;
+ }
+
+ ~SimpleCallback() {}
+};
+
+}} // apache::thrift
+
+#endif /* !_THRIFT_ASYNC_SIMPLECALLBACK_H_ */
diff --git a/lib/cpp/src/async/TAsyncBufferProcessor.h b/lib/cpp/src/async/TAsyncBufferProcessor.h
new file mode 100644
index 0000000..06a503e
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncBufferProcessor.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_
+#define _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ 1
+
+#include <tr1/functional>
+#include <boost/shared_ptr.hpp>
+
+#include "transport/TBufferTransports.h"
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncBufferProcessor {
+ public:
+ // Process data in "in", putting the result in "out".
+ // Call _return(true) when done, or _return(false) to
+ // forcefully close the connection (if applicable).
+ // "in" and "out" should be TMemoryBuffer or similar,
+ // not a wrapper around a socket.
+ virtual void process(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf,
+ boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf) = 0;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_
diff --git a/lib/cpp/src/async/TAsyncChannel.cpp b/lib/cpp/src/async/TAsyncChannel.cpp
new file mode 100644
index 0000000..2bf02fe
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncChannel.cpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <async/TAsyncChannel.h>
+#include <tr1/functional>
+
+namespace apache { namespace thrift { namespace async {
+
+bool TAsyncChannel::sendAndRecvMessage(const VoidCallback& cob,
+ TMemoryBuffer* sendBuf,
+ TMemoryBuffer* recvBuf) {
+ std::tr1::function<void()> send_done =
+ std::tr1::bind(&TAsyncChannel::recvMessage, this, cob, recvBuf);
+
+ return sendMessage(send_done, sendBuf);
+}
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TAsyncChannel.h b/lib/cpp/src/async/TAsyncChannel.h
new file mode 100644
index 0000000..d5cd419
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncChannel.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_
+#define _THRIFT_ASYNC_TASYNCCHANNEL_H_ 1
+
+#include <tr1/functional>
+#include <Thrift.h>
+#include <transport/TTransportUtils.h>
+
+namespace apache { namespace thrift { namespace transport {
+class TMemoryBuffer;
+}}}
+
+namespace apache { namespace thrift { namespace async {
+using apache::thrift::transport::TMemoryBuffer;
+
+class TAsyncTransport;
+
+class TAsyncChannel {
+ public:
+ typedef std::tr1::function<void()> VoidCallback;
+
+ virtual ~TAsyncChannel() {}
+
+ // is the channel in a good state?
+ virtual bool good() const = 0;
+ virtual bool error() const = 0;
+ virtual bool timedOut() const = 0;
+
+ /**
+ * Send a message over the channel.
+ *
+ * @return true iff the cob has been or will be called, else false
+ */
+ virtual bool sendMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0;
+
+ /**
+ * Receive a message from the channel.
+ *
+ * @return true iff the cob has been or will be called, else false
+ */
+ virtual bool recvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0;
+
+ /**
+ * Send a message over the channel and receive a response.
+ *
+ * @return true iff the cob has been or will be called, else false
+ */
+ virtual bool sendAndRecvMessage(const VoidCallback& cob,
+ apache::thrift::transport::TMemoryBuffer* sendBuf,
+ apache::thrift::transport::TMemoryBuffer* recvBuf);
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_
diff --git a/lib/cpp/src/async/TAsyncProcessor.h b/lib/cpp/src/async/TAsyncProcessor.h
new file mode 100644
index 0000000..abf5816
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncProcessor.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TASYNCPROCESSOR_H_
+#define _THRIFT_TASYNCPROCESSOR_H_ 1
+
+#include <tr1/functional>
+#include <boost/shared_ptr.hpp>
+#include <protocol/TProtocol.h>
+#include <TProcessor.h>
+
+namespace apache { namespace thrift { namespace async {
+
+/**
+ * Async version of a TProcessor. It is not expected to complete by the time
+ * the call to process returns. Instead, it calls a cob to signal completion.
+ */
+class TAsyncProcessor {
+ public:
+ virtual ~TAsyncProcessor() {}
+
+ virtual void process(std::tr1::function<void(bool success)> _return,
+ boost::shared_ptr<protocol::TProtocol> in,
+ boost::shared_ptr<protocol::TProtocol> out) = 0;
+
+ void process(std::tr1::function<void(bool success)> _return,
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> io) {
+ return process(_return, io, io);
+ }
+
+ protected:
+ TAsyncProcessor() {}
+};
+
+}}} // apache::thrift::async
+
+// XXX I'm lazy for now
+namespace apache { namespace thrift {
+using apache::thrift::async::TAsyncProcessor;
+}}
+
+#endif // #ifndef _THRIFT_TASYNCPROCESSOR_H_
diff --git a/lib/cpp/src/async/TAsyncProtocolProcessor.cpp b/lib/cpp/src/async/TAsyncProtocolProcessor.cpp
new file mode 100644
index 0000000..05d504b
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncProtocolProcessor.cpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TAsyncProtocolProcessor.h"
+
+using apache::thrift::transport::TBufferBase;
+using apache::thrift::protocol::TProtocol;
+
+namespace apache { namespace thrift { namespace async {
+
+void TAsyncProtocolProcessor::process(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<TBufferBase> ibuf,
+ boost::shared_ptr<TBufferBase> obuf) {
+ boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf));
+ boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf));
+ return underlying_->process(
+ std::tr1::bind(
+ &TAsyncProtocolProcessor::finish,
+ _return,
+ oprot,
+ std::tr1::placeholders::_1),
+ iprot, oprot);
+}
+
+/* static */ void TAsyncProtocolProcessor::finish(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<TProtocol> oprot,
+ bool healthy) {
+ // This is a stub function to hold a reference to oprot.
+ return _return(healthy);
+}
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TAsyncProtocolProcessor.h b/lib/cpp/src/async/TAsyncProtocolProcessor.h
new file mode 100644
index 0000000..7ec718b
--- /dev/null
+++ b/lib/cpp/src/async/TAsyncProtocolProcessor.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TNAME_ME_H_
+#define _THRIFT_TNAME_ME_H_ 1
+
+#include "TAsyncProcessor.h"
+#include "TAsyncBufferProcessor.h"
+#include "protocol/TProtocol.h"
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncProtocolProcessor : public TAsyncBufferProcessor {
+ public:
+ TAsyncProtocolProcessor(
+ boost::shared_ptr<TAsyncProcessor> underlying,
+ boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact)
+ : underlying_(underlying)
+ , pfact_(pfact)
+ {}
+
+ virtual void process(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf,
+ boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf);
+
+ private:
+ static void finish(
+ std::tr1::function<void(bool healthy)> _return,
+ boost::shared_ptr<apache::thrift::protocol::TProtocol> oprot,
+ bool healthy);
+
+ boost::shared_ptr<TAsyncProcessor> underlying_;
+ boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact_;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TNAME_ME_H_
diff --git a/lib/cpp/src/async/TEvhttpClientChannel.cpp b/lib/cpp/src/async/TEvhttpClientChannel.cpp
new file mode 100644
index 0000000..54676a1
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpClientChannel.cpp
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TEvhttpClientChannel.h"
+#include <evhttp.h>
+
+namespace apache { namespace thrift { namespace async {
+
+
+TEvhttpClientChannel::TEvhttpClientChannel(
+ const std::string& host,
+ const std::string& path,
+ const char* address,
+ int port,
+ struct event_base* eb)
+ : host_(host)
+ , path_(path)
+ , recvBuf_(NULL)
+ , conn_(NULL)
+{
+ conn_ = evhttp_connection_new(address, port);
+ if (conn_ == NULL) {
+ abort(); // XXX
+ }
+ evhttp_connection_set_base(conn_, eb);
+}
+
+
+TEvhttpClientChannel::~TEvhttpClientChannel() {
+ if (conn_ != NULL) {
+ evhttp_connection_free(conn_);
+ }
+}
+
+
+bool TEvhttpClientChannel::sendAndRecvMessage(
+ const VoidCallback& cob,
+ apache::thrift::transport::TMemoryBuffer* sendBuf,
+ apache::thrift::transport::TMemoryBuffer* recvBuf) {
+ cob_ = cob;
+ recvBuf_ = recvBuf;
+
+ struct evhttp_request* req = evhttp_request_new(response, this);
+ if (req == NULL) {
+ abort(); // XXX
+ }
+
+ int rv;
+
+ rv = evhttp_add_header(req->output_headers, "Host", host_.c_str());
+ if (rv != 0) {
+ abort(); // XXX
+ }
+
+ rv = evhttp_add_header(req->output_headers, "Content-Type", "application/x-thrift");
+ if (rv != 0) {
+ abort(); // XXX
+ }
+
+ uint8_t* obuf;
+ uint32_t sz;
+ sendBuf->getBuffer(&obuf, &sz);
+ rv = evbuffer_add(req->output_buffer, obuf, sz);
+ if (rv != 0) {
+ abort(); // XXX
+ }
+
+ rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());
+ if (rv != 0) {
+ abort(); // XXX
+ }
+
+ return true;
+}
+
+
+bool TEvhttpClientChannel::sendMessage(
+ const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) {
+ abort(); // XXX
+}
+
+
+bool TEvhttpClientChannel::recvMessage(
+ const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) {
+ abort(); // XXX
+}
+
+
+void TEvhttpClientChannel::finish(struct evhttp_request* req) {
+ if (req == NULL) {
+ return cob_();
+ } else if (req->response_code != 200) {
+ return cob_();
+ }
+ recvBuf_->resetBuffer(
+ EVBUFFER_DATA(req->input_buffer),
+ EVBUFFER_LENGTH(req->input_buffer));
+ return cob_();
+}
+
+
+/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) {
+ TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg;
+ self->finish(req);
+}
+
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TEvhttpClientChannel.h b/lib/cpp/src/async/TEvhttpClientChannel.h
new file mode 100644
index 0000000..d2bc4b3
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpClientChannel.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_
+#define _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ 1
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "TAsyncChannel.h"
+
+struct event_base;
+struct evhttp_connection;
+struct evhttp_request;
+
+namespace apache { namespace thrift { namespace transport {
+class TMemoryBuffer;
+}}}
+
+namespace apache { namespace thrift { namespace async {
+
+class TEvhttpClientChannel : public TAsyncChannel {
+ public:
+ using TAsyncChannel::VoidCallback;
+
+ TEvhttpClientChannel(
+ const std::string& host,
+ const std::string& path,
+ const char* address,
+ int port,
+ struct event_base* eb);
+ ~TEvhttpClientChannel();
+
+ virtual bool sendAndRecvMessage(const VoidCallback& cob,
+ apache::thrift::transport::TMemoryBuffer* sendBuf,
+ apache::thrift::transport::TMemoryBuffer* recvBuf);
+
+ virtual bool sendMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message);
+ virtual bool recvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message);
+
+ void finish(struct evhttp_request* req);
+
+ //XXX
+ virtual bool good() const { return true; }
+ virtual bool error() const { return false; }
+ virtual bool timedOut() const { return false; }
+
+ private:
+ static void response(struct evhttp_request* req, void* arg);
+
+ std::string host_;
+ std::string path_;
+ VoidCallback cob_;
+ apache::thrift::transport::TMemoryBuffer* recvBuf_;
+ struct evhttp_connection* conn_;
+
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_
diff --git a/lib/cpp/src/async/TEvhttpServer.cpp b/lib/cpp/src/async/TEvhttpServer.cpp
new file mode 100644
index 0000000..2997597
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpServer.cpp
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TEvhttpServer.h"
+#include "TAsyncBufferProcessor.h"
+#include "transport/TBufferTransports.h"
+#include <evhttp.h>
+
+using apache::thrift::transport::TMemoryBuffer;
+
+namespace apache { namespace thrift { namespace async {
+
+
+struct TEvhttpServer::RequestContext {
+ struct evhttp_request* req;
+ boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> ibuf;
+ boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> obuf;
+
+ RequestContext(struct evhttp_request* req);
+};
+
+
+TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor)
+ : processor_(processor)
+ , eb_(NULL)
+ , eh_(NULL)
+{}
+
+
+TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port)
+ : processor_(processor)
+ , eb_(NULL)
+ , eh_(NULL)
+{
+ // Create event_base and evhttp.
+ eb_ = event_base_new();
+ if (eb_ == NULL) {
+ abort(); // XXX
+ }
+ eh_ = evhttp_new(eb_);
+ if (eh_ == NULL) {
+ event_base_free(eb_);
+ abort(); // XXX
+ }
+
+ // Bind to port.
+ int ret = evhttp_bind_socket(eh_, NULL, port);
+ if (ret < 0) {
+ evhttp_free(eh_);
+ event_base_free(eb_);
+ }
+
+ // Register a handler. If you use the other constructor,
+ // you will want to do this yourself.
+ // Don't forget to unregister before destorying this TEvhttpServer.
+ evhttp_set_cb(eh_, "/", request, (void*)this);
+}
+
+
+TEvhttpServer::~TEvhttpServer() {
+ if (eh_ != NULL) {
+ evhttp_free(eh_);
+ }
+ if (eb_ != NULL) {
+ event_base_free(eb_);
+ }
+}
+
+
+int TEvhttpServer::serve() {
+ if (eb_ == NULL) {
+ abort(); // XXX
+ }
+ return event_base_dispatch(eb_);
+}
+
+
+TEvhttpServer::RequestContext::RequestContext(struct evhttp_request* req) : req(req)
+ , ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer), EVBUFFER_LENGTH(req->input_buffer)))
+ , obuf(new TMemoryBuffer())
+{}
+
+
+void TEvhttpServer::request(struct evhttp_request* req, void* self) {
+ static_cast<TEvhttpServer*>(self)->process(req);
+}
+
+
+void TEvhttpServer::process(struct evhttp_request* req) {
+ RequestContext* ctx = new RequestContext(req);
+ return processor_->process(
+ std::tr1::bind(
+ &TEvhttpServer::complete,
+ this,
+ ctx,
+ std::tr1::placeholders::_1),
+ ctx->ibuf,
+ ctx->obuf);
+}
+
+
+void TEvhttpServer::complete(RequestContext* ctx, bool success) {
+ std::auto_ptr<RequestContext> ptr(ctx);
+
+ int code = 200;
+ const char* reason = "OK";
+
+ int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");
+ if (rv != 0) {
+ // TODO: Log an error.
+ }
+
+ struct evbuffer* buf = evbuffer_new();
+ if (buf == NULL) {
+ // TODO: Log an error.
+ } else {
+ uint8_t* obuf;
+ uint32_t sz;
+ ctx->obuf->getBuffer(&obuf, &sz);
+ int ret = evbuffer_add(buf, obuf, sz);
+ if (ret != 0) {
+ // TODO: Log an error.
+ }
+ }
+
+ evhttp_send_reply(ctx->req, code, reason, buf);
+ if (buf != NULL) {
+ evbuffer_free(buf);
+ }
+}
+
+
+struct event_base* TEvhttpServer::getEventBase() {
+ return eb_;
+}
+
+
+}}} // apache::thrift::async
diff --git a/lib/cpp/src/async/TEvhttpServer.h b/lib/cpp/src/async/TEvhttpServer.h
new file mode 100644
index 0000000..edc6ffb
--- /dev/null
+++ b/lib/cpp/src/async/TEvhttpServer.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_TEVHTTP_SERVER_H_
+#define _THRIFT_TEVHTTP_SERVER_H_ 1
+
+#include <boost/shared_ptr.hpp>
+
+struct event_base;
+struct evhttp;
+struct evhttp_request;
+
+namespace apache { namespace thrift { namespace async {
+
+class TAsyncBufferProcessor;
+
+class TEvhttpServer {
+ public:
+ /**
+ * Create a TEvhttpServer for use with an external evhttp instance.
+ * Must be manually installed with evhttp_set_cb, using
+ * TEvhttpServer::request as the callback and the
+ * address of the server as the extra arg.
+ * Do not call "serve" on this server.
+ */
+ TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor);
+
+ /**
+ * Create a TEvhttpServer with an embedded event_base and evhttp,
+ * listening on port and responding on the endpoint "/".
+ * Call "serve" on this server to serve forever.
+ */
+ TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port);
+
+ ~TEvhttpServer();
+
+ static void request(struct evhttp_request* req, void* self);
+ int serve();
+
+ struct event_base* getEventBase();
+
+ private:
+ struct RequestContext;
+
+ void process(struct evhttp_request* req);
+ void complete(RequestContext* ctx, bool success);
+
+ boost::shared_ptr<TAsyncBufferProcessor> processor_;
+ struct event_base* eb_;
+ struct evhttp* eh_;
+};
+
+}}} // apache::thrift::async
+
+#endif // #ifndef _THRIFT_TEVHTTP_SERVER_H_