THRIFT-928. cpp: Make clients call writeEnd on their transports before flush
Changing the order of these calls makes more sense from the perspective
of logical operations. It also simplifies the upcoming stats collection
code. No clients should be affected.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005128 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc
index 0ba4540..c2f9e2e 100644
--- a/compiler/cpp/src/generate/t_cpp_generator.cc
+++ b/compiler/cpp/src/generate/t_cpp_generator.cc
@@ -1970,8 +1970,8 @@
indent() << "args.write(oprot_);" << endl <<
endl <<
indent() << "oprot_->writeMessageEnd();" << endl <<
- indent() << "oprot_->getTransport()->flush();" << endl <<
- indent() << "oprot_->getTransport()->writeEnd();" << endl;
+ indent() << "oprot_->getTransport()->writeEnd();" << endl <<
+ indent() << "oprot_->getTransport()->flush();" << endl;
scope_down(f_service_);
f_service_ << endl;
@@ -2142,8 +2142,8 @@
: ", const " + type_name((*f_iter)->get_returntype()) + "& _return");
// XXX Don't declare throw if it doesn't exist
f_header_ <<
- "void return_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ");" << endl <<
- "void throw_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw);" << endl;
+ "void return_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx" << ret_arg << ");" << endl <<
+ "void throw_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, ::apache::thrift::TDelayedException* _throw);" << endl;
}
}
indent_down();
@@ -2209,8 +2209,8 @@
indent() << " oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
indent() << " x.write(oprot);" << endl <<
indent() << " oprot->writeMessageEnd();" << endl <<
- indent() << " oprot->getTransport()->flush();" << endl <<
indent() << " oprot->getTransport()->writeEnd();" << endl <<
+ indent() << " oprot->getTransport()->flush();" << endl <<
indent() << (style == "Cob" ? " return cob(true);" : " return true;") << endl <<
indent() << "}" << endl <<
endl <<
@@ -2241,8 +2241,8 @@
indent() << " oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
indent() << " x.write(oprot);" << endl <<
indent() << " oprot->writeMessageEnd();" << endl <<
- indent() << " oprot->getTransport()->flush();" << endl <<
indent() << " oprot->getTransport()->writeEnd();" << endl <<
+ indent() << " oprot->getTransport()->flush();" << endl <<
indent() << (style == "Cob" ? " return cob(true);" : " return true;") << endl;
} else {
f_service_ <<
@@ -2344,17 +2344,7 @@
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
indent() << "}" << endl <<
- indent() << "// A glorified finally block since ctx is a void*" << endl <<
- indent() << "class ContextFreer {" << endl <<
- indent() << " public:" << endl <<
- indent() << " ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
- indent() << " handler_(handler), context_(context) {}" << endl <<
- indent() << " ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
- indent() << " private:" << endl <<
- indent() << " ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
- indent() << " void* context_;" << endl <<
- indent() << "};" << endl <<
- indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
+ indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
indent() << "}" << endl << endl <<
@@ -2442,8 +2432,8 @@
indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
indent() << "x.write(oprot);" << endl <<
indent() << "oprot->writeMessageEnd();" << endl <<
- indent() << "oprot->getTransport()->flush();" << endl <<
- indent() << "oprot->getTransport()->writeEnd();" << endl;
+ indent() << "oprot->getTransport()->writeEnd();" << endl <<
+ indent() << "oprot->getTransport()->flush();" << endl;
}
f_service_ << indent() << "return;" << endl;
indent_down();
@@ -2470,8 +2460,8 @@
indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
indent() << "result.write(oprot);" << endl <<
indent() << "oprot->writeMessageEnd();" << endl <<
- indent() << "oprot->getTransport()->flush();" << endl <<
- indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
+ indent() << "oprot->getTransport()->writeEnd();" << endl <<
+ indent() << "oprot->getTransport()->flush();" << endl << endl <<
indent() << "if (eventHandler_.get() != NULL) {" << endl <<
indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
indent() << "}" << endl;
@@ -2491,22 +2481,43 @@
f_service_ <<
indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl <<
+ indent() << "void* ctx = NULL;" << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl <<
+ indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
indent() << "try {" << endl;
indent_up();
f_service_ <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl <<
indent() << "args.read(iprot);" << endl <<
indent() << "iprot->readMessageEnd();" << endl <<
- indent() << "iprot->getTransport()->readEnd();" << endl;
+ indent() << "uint32_t bytes = iprot->getTransport()->readEnd();" << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
+ indent() << "}" << endl;
scope_down(f_service_);
// TODO(dreiss): Handle TExceptions? Expose to server?
f_service_ <<
indent() << "catch (const std::exception& exn) {" << endl <<
+ indent() << " if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << " }" << endl <<
indent() << " return cob(false);" << endl <<
indent() << "}" << endl;
+ if (tfunction->is_oneway()) {
+ f_service_ <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->onewayComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl;
+ }
// TODO(dreiss): Figure out a strategy for exceptions in async handlers.
f_service_ <<
+ indent() << "freer.unregister();" << endl <<
indent() << "iface_->" << tfunction->get_name() << "(";
indent_up(); indent_up();
if (tfunction->is_oneway()) {
@@ -2524,13 +2535,13 @@
f_service_ <<
indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
<< "return_" << tfunction->get_name()
- << ", this, cob, seqid, oprot" << ret_placeholder << ")";
+ << ", this, cob, seqid, oprot, ctx" << ret_placeholder << ")";
if (!xceptions.empty()) {
f_service_
- << ',' << endl <<
+ << ',' << endl <<
indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
- << "throw_" << tfunction->get_name()
- << ", this, cob, seqid, oprot, std::tr1::placeholders::_1)";
+ << "throw_" << tfunction->get_name()
+ << ", this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1)";
}
}
@@ -2552,7 +2563,7 @@
: ", const " + type_name(tfunction->get_returntype()) + "& _return");
f_service_ <<
"void " << tservice->get_name() << "AsyncProcessor::" <<
- "return_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ')' << endl;
+ "return_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx" << ret_arg << ')' << endl;
scope_up(f_service_);
f_service_ <<
indent() << tservice->get_name() << "_" << tfunction->get_name() << "_presult result;" << endl;
@@ -2566,11 +2577,21 @@
// Serialize the result into a struct
f_service_ <<
endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl <<
+ indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl << endl <<
indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
indent() << "result.write(oprot);" << endl <<
indent() << "oprot->writeMessageEnd();" << endl <<
+ indent() << "uint32_t bytes = oprot->getTransport()->writeEnd();" << endl <<
indent() << "oprot->getTransport()->flush();" << endl <<
- indent() << "oprot->getTransport()->writeEnd();" << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
+ indent() << "}" << endl <<
indent() << "return cob(true);" << endl;
scope_down(f_service_);
f_service_ << endl;
@@ -2580,7 +2601,7 @@
if (!tfunction->is_oneway() && !xceptions.empty()) {
f_service_ <<
"void " << tservice->get_name() << "AsyncProcessor::" <<
- "throw_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw)" << endl;
+ "throw_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, ::apache::thrift::TDelayedException* _throw)" << endl;
scope_up(f_service_);
f_service_ <<
indent() << tservice->get_name() << "_" << tfunction->get_name() << "_result result;" << endl << endl <<
@@ -2600,17 +2621,27 @@
indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
scope_down(f_service_);
}
+ // TODO(dreiss): Handle the case where an undeclared exception is thrown?
// Serialize the result into a struct
f_service_ <<
endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl <<
+ indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+ indent() << "}" << endl << endl <<
indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
indent() << "result.write(oprot);" << endl <<
indent() << "oprot->writeMessageEnd();" << endl <<
+ indent() << "uint32_t bytes = oprot->getTransport()->writeEnd();" << endl <<
indent() << "oprot->getTransport()->flush();" << endl <<
- indent() << "oprot->getTransport()->writeEnd();" << endl <<
+ indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+ indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
+ indent() << "}" << endl <<
indent() << "return cob(true);" << endl;
-
scope_down(f_service_);
f_service_ << endl;
} // for each function
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index f71a50b..896f5ae 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -86,6 +86,21 @@
};
/**
+ * A helper class used by the generated code to free each context.
+ */
+class TProcessorContextFreer {
+ public:
+ TProcessorContextFreer(TProcessorEventHandler* handler, void* context, const char* method) :
+ handler_(handler), context_(context), method_(method) {}
+ ~TProcessorContextFreer() { if (handler_ != NULL) handler_->freeContext(context_, method_); }
+ void unregister() { handler_ = NULL; }
+ private:
+ apache::thrift::TProcessorEventHandler* handler_;
+ void* context_;
+ const char* method_;
+};
+
+/**
* A processor is a generic object that acts upon two streams of data, one
* an input and the other an output. The definition of this object is loose,
* though the typical case is for some sort of server that either generates
diff --git a/lib/cpp/src/async/TAsyncProcessor.h b/lib/cpp/src/async/TAsyncProcessor.h
index abf5816..a0b5428 100644
--- a/lib/cpp/src/async/TAsyncProcessor.h
+++ b/lib/cpp/src/async/TAsyncProcessor.h
@@ -31,6 +31,9 @@
* Async version of a TProcessor. It is not expected to complete by the time
* the call to process returns. Instead, it calls a cob to signal completion.
*/
+
+class TEventServer; // forward declaration
+
class TAsyncProcessor {
public:
virtual ~TAsyncProcessor() {}
@@ -44,8 +47,27 @@
return process(_return, io, io);
}
+ boost::shared_ptr<TProcessorEventHandler> getEventHandler() {
+ return eventHandler_;
+ }
+
+ void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) {
+ eventHandler_ = eventHandler;
+ }
+
+ const TEventServer* getAsyncServer() {
+ return asyncServer_;
+ }
protected:
TAsyncProcessor() {}
+
+ boost::shared_ptr<TProcessorEventHandler> eventHandler_;
+ const TEventServer* asyncServer_;
+ private:
+ friend class TEventServer;
+ void setAsyncServer(const TEventServer* server) {
+ asyncServer_ = server;
+ }
};
}}} // apache::thrift::async