THRIFT-928. cpp: Processor-level event callbacks
- Add a TProcessorEventHandler callback interface.
- Add methods to TProcessor to hold an instance of the interface.
- Add code to the compiler to make the processor call callbacks at key points.
- Add an optional processor event handler to the test server.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005126 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc
index 59749b7..ef0dd36 100644
--- a/compiler/cpp/src/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/generate/t_cpp_generator.cc
@@ -2059,10 +2059,31 @@
string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result";
f_service_ <<
+ indent() << "void* ctx = NULL;" << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl <<
+ indent() << "// A glorified finally block since ctx is a void*" << endl <<
+ indent() << "class ContextFreer {" << endl <<
+ indent() << " public:" << endl <<
+ indent() << " ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
+ indent() << " handler_(handler), context_(context) {}" << endl <<
+ indent() << " ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
+ indent() << " private:" << endl <<
+ indent() << " ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
+ indent() << " void* context_;" << endl <<
+ indent() << "};" << endl <<
+ indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl << endl <<
indent() << argsname << " args;" << endl <<
indent() << "args.read(iprot);" << endl <<
indent() << "iprot->readMessageEnd();" << endl <<
- indent() << "iprot->getTransport()->readEnd();" << endl <<
+ indent() << "iprot->getTransport()->readEnd();" << endl << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl <<
endl;
t_struct* xs = tfunction->get_xceptions();
@@ -2135,38 +2156,52 @@
f_service_ << " catch (const std::exception& e) {" << endl;
+ indent_up();
+ f_service_ <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl;
+
if (!tfunction->is_oneway()) {
- indent_up();
f_service_ <<
+ endl <<
indent() << "::apache::thrift::TApplicationException x(e.what());" << endl <<
indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
indent() << "x.write(oprot);" << endl <<
indent() << "oprot->writeMessageEnd();" << endl <<
indent() << "oprot->getTransport()->flush();" << endl <<
- indent() << "oprot->getTransport()->writeEnd();" << endl <<
- indent() << "return;" << endl;
- indent_down();
+ indent() << "oprot->getTransport()->writeEnd();" << endl;
}
- f_service_ << indent() << "}" << endl;
+ f_service_ << indent() << "return;" << endl;
+ indent_down();
+ f_service_ << indent() << "}" << endl << endl;
// Shortcut out here for oneway functions
if (tfunction->is_oneway()) {
f_service_ <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl << endl <<
indent() << "return;" << endl;
- indent_down();
- f_service_ << "}" << endl <<
- endl;
+ // Close function
+ scope_down(f_service_);
+ f_service_ << endl;
return;
}
// Serialize the result into a struct
f_service_ <<
- endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl << endl <<
indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
indent() << "result.write(oprot);" << endl <<
indent() << "oprot->writeMessageEnd();" << endl <<
indent() << "oprot->getTransport()->flush();" << endl <<
- indent() << "oprot->getTransport()->writeEnd();" << endl;
+ indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl;
// Close function
scope_down(f_service_);
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index f2d5279..22c10f1 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -27,6 +27,65 @@
namespace apache { namespace thrift {
/**
+ * Virtual interface class that can handle events from the processor. To
+ * use this you should subclass it and implement the methods that you care
+ * about. Your subclass can also store local data that you may care about,
+ * such as additional "arguments" to these methods (stored in the object
+ * instance's state).
+ */
+class TProcessorEventHandler {
+ public:
+
+ virtual ~TProcessorEventHandler() {}
+
+ /**
+ * Called before calling other callback methods.
+ * Expected to return some sort of context object.
+ * The return value is passed to all other callbacks
+ * for that function invocation.
+ */
+ virtual void* getContext(const char* fn_name) { return NULL; }
+
+ /**
+ * Expected to free resources associated with a context.
+ */
+ virtual void freeContext(void* ctx, const char* fn_name) { }
+
+ /**
+ * Called before reading arguments.
+ */
+ virtual void preRead(void* ctx, const char* fn_name) {}
+
+ /**
+ * Called between reading arguments and calling the handler.
+ */
+ virtual void postRead(void* ctx, const char* fn_name) {}
+
+ /**
+ * Called between calling the handler and writing the response.
+ */
+ virtual void preWrite(void* ctx, const char* fn_name) {}
+
+ /**
+ * Called after writing the response.
+ */
+ virtual void postWrite(void* ctx, const char* fn_name) {}
+
+ /**
+ * Called when an async function call completes successfully.
+ */
+ virtual void asyncComplete(void* ctx, const char* fn_name) {}
+
+ /**
+ * Called if the handler throws an undeclared exception.
+ */
+ virtual void handlerError(void* ctx, const char* fn_name) {}
+
+ protected:
+ TProcessorEventHandler() {}
+};
+
+/**
* A processor is a generic object that acts upon two streams of data, one
* an input and the other an output. The definition of this object is loose,
* though the typical case is for some sort of server that either generates
@@ -44,8 +103,18 @@
return process(io, io);
}
+ boost::shared_ptr<TProcessorEventHandler> getEventHandler() {
+ return eventHandler_;
+ }
+
+ void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) {
+ eventHandler_ = eventHandler;
+ }
+
protected:
TProcessor() {}
+
+ boost::shared_ptr<TProcessorEventHandler> eventHandler_;
};
}} // apache::thrift
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index d6063ac..18bdc54 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -287,6 +287,39 @@
}
};
+
+class TestProcessorEventHandler : public TProcessorEventHandler {
+ virtual void* getContext(const char* fn_name) {
+ return new std::string(fn_name);
+ }
+ virtual void freeContext(void* ctx, const char* fn_name) {
+ delete static_cast<std::string*>(ctx);
+ }
+ virtual void preRead(void* ctx, const char* fn_name) {
+ communicate("preRead", ctx, fn_name);
+ }
+ virtual void postRead(void* ctx, const char* fn_name) {
+ communicate("postRead", ctx, fn_name);
+ }
+ virtual void preWrite(void* ctx, const char* fn_name) {
+ communicate("preWrite", ctx, fn_name);
+ }
+ virtual void postWrite(void* ctx, const char* fn_name) {
+ communicate("postWrite", ctx, fn_name);
+ }
+ virtual void asyncComplete(void* ctx, const char* fn_name) {
+ communicate("asyncComplete", ctx, fn_name);
+ }
+ virtual void handlerError(void* ctx, const char* fn_name) {
+ communicate("handlerError", ctx, fn_name);
+ }
+
+ void communicate(const char* event, void* ctx, const char* fn_name) {
+ std::cout << event << ": " << *static_cast<std::string*>(ctx) << " = " << fn_name << std::endl;
+ }
+};
+
+
int main(int argc, char **argv) {
int port = 9090;
@@ -297,7 +330,7 @@
ostringstream usage;
usage <<
- argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>]" << endl <<
+ argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--processor-events]" << endl <<
"\t\tserver-type\t\ttype of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\". Default is " << serverType << endl <<
@@ -365,6 +398,12 @@
shared_ptr<ThriftTestProcessor> testProcessor(new ThriftTestProcessor(testHandler));
+
+ if (!args["processor-events"].empty()) {
+ testProcessor->setEventHandler(shared_ptr<TProcessorEventHandler>(
+ new TestProcessorEventHandler()));
+ }
+
// Transport
shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));