py.twisted: Avoid synchronous exceptions

This is an update of the patch posted to:
https://issues.apache.org/jira/browse/THRIFT-585
and a re-write of 8345772

Patch: Mattias de Zalenski, James Broadhead

Jira: THRIFT-585
diff --git a/compiler/cpp/src/generate/t_py_generator.cc b/compiler/cpp/src/generate/t_py_generator.cc
index eb558ad..2b5725f 100644
--- a/compiler/cpp/src/generate/t_py_generator.cc
+++ b/compiler/cpp/src/generate/t_py_generator.cc
@@ -786,16 +786,16 @@
   }
 
   out << indent() << "def __hash__(self):" << endl;
-  indent_up(); 
+  indent_up();
   indent(out) << "value = 17" << endl;  // PYTHONHASHSEED would be better, but requires Python 3.2.3
-  for (m_iter = members.begin(); m_iter != members.end(); ++m_iter) { 
-    indent(out) << "value = (value * 31) ^ hash(self." << (*m_iter)->get_name() + ")" << endl; 
-  } 
-  indent(out) << "return value" << endl; 
-  indent_down(); 
-  out << endl; 
-  
-  
+  for (m_iter = members.begin(); m_iter != members.end(); ++m_iter) {
+    indent(out) << "value = (value * 31) ^ hash(self." << (*m_iter)->get_name() + ")" << endl;
+  }
+  indent(out) << "return value" << endl;
+  indent_down();
+  out << endl;
+
+
   if (!gen_slots_) {
     // Printing utilities so that on the command line thrift
     // structs look pretty like dictionaries
@@ -1288,23 +1288,29 @@
     indent_up();
     generate_python_docstring(f_service_, (*f_iter));
     if (gen_twisted_) {
-      indent(f_service_) << "self._seqid += 1" << endl;
-      if (!(*f_iter)->is_oneway()) {
-        indent(f_service_) <<
-          "d = self._reqs[self._seqid] = defer.Deferred()" << endl;
-      }
+      indent(f_service_) << "seqid = self._seqid = self._seqid + 1" << endl;
+      indent(f_service_) << "self._reqs[seqid] = defer.Deferred()" << endl << endl;
+      indent(f_service_) << "d = defer.maybeDeferred(self.send_" << funname;
+
     } else if (gen_tornado_) {
       indent(f_service_) << "self._seqid += 1" << endl;
       if (!(*f_iter)->is_oneway()) {
         indent(f_service_) <<
           "future = self._reqs[self._seqid] = concurrent.Future()" << endl;
       }
+      indent(f_service_) <<
+        "self.send_" << funname << "(";
+
+    } else {
+      indent(f_service_) <<
+        "self.send_" << funname << "(";
     }
 
-    indent(f_service_) <<
-      "self.send_" << funname << "(";
-
     bool first = true;
+    if (gen_twisted_) {
+      // we need a leading comma if there are args, since it's called as maybeDeferred(funcname, arg)
+      first = false;
+    }
     for (fld_iter = fields.begin(); fld_iter != fields.end(); ++fld_iter) {
       if (first) {
         first = false;
@@ -1317,30 +1323,69 @@
     f_service_ << ")" << endl;
 
     if (!(*f_iter)->is_oneway()) {
-      f_service_ << indent();
       if (gen_twisted_) {
-        f_service_ << "return d" << endl;
+        // nothing. See the next block.
       } else if (gen_tornado_) {
-        f_service_ << "return future" << endl;
+        indent(f_service_) << "return future" << endl;
       } else {
+        f_service_ << indent();
         if (!(*f_iter)->get_returntype()->is_void()) {
           f_service_ << "return ";
         }
-        f_service_ <<
-          "self.recv_" << funname << "()" << endl;
-      }
-    } else {
-      if (gen_twisted_) {
-        f_service_ <<
-          indent() << "return defer.succeed(None)" << endl;
+        f_service_ << "self.recv_" << funname << "()" << endl;
       }
     }
     indent_down();
-    f_service_ << endl;
 
+    if (gen_twisted_) {
+      // This block injects the body of the send_<> method for twisted (and a cb/eb pair)
+      indent_up();
+      indent(f_service_) <<
+        "d.addCallbacks(" << endl;
+
+      indent_up();
+      f_service_ <<
+        indent() << "callback=self.cb_send_" << funname << "," << endl <<
+        indent() << "callbackArgs=(seqid,)," << endl <<
+        indent() << "errback=self.eb_send_" << funname << "," << endl <<
+        indent() << "errbackArgs=(seqid,))" << endl;
+      indent_down();
+
+      indent(f_service_) <<
+        "return d" << endl;
+      indent_down();
+      f_service_ << endl;
+
+      indent(f_service_) <<
+        "def cb_send_" << funname << "(self, _, seqid):" << endl;
+      indent_up();
+      if ((*f_iter)->is_oneway()) {
+        // if one-way, fire the deferred & remove it from _reqs
+        f_service_ << indent() <<
+          "d = self._reqs.pop(seqid)" << endl << indent() <<
+          "d.callback(None)" << endl << indent() <<
+          "return d" << endl;
+      } else {
+        f_service_ << indent() <<
+          "return self._reqs[seqid]" << endl;
+      }
+      indent_down();
+      f_service_ << endl;
+
+      // add an errback to fail the request if the call to send_<> raised an exception
+      indent(f_service_) <<
+        "def eb_send_" << funname << "(self, f, seqid):" << endl;
+      indent_up();
+      f_service_ <<
+        indent() << "d = self._reqs.pop(seqid)" << endl <<
+        indent() << "d.errback(f)" << endl <<
+        indent() << "return d" << endl;
+      indent_down();
+    }
+
+    f_service_ << endl;
     indent(f_service_) <<
       "def send_" << function_signature(*f_iter, false) << ":" << endl;
-
     indent_up();
 
     std::string argsname = (*f_iter)->get_name() + "_args";
diff --git a/lib/py/src/transport/TTwisted.py b/lib/py/src/transport/TTwisted.py
index 2b77414..29bbd4c 100644
--- a/lib/py/src/transport/TTwisted.py
+++ b/lib/py/src/transport/TTwisted.py
@@ -42,7 +42,7 @@
     def flush(self):
         msg = self.__wbuf.getvalue()
         self.__wbuf = StringIO()
-        self.sendMessage(msg)
+        return self.sendMessage(msg)
 
     def sendMessage(self, message):
         raise NotImplementedError
@@ -55,7 +55,7 @@
         self.func = func
 
     def sendMessage(self, message):
-        self.func(message)
+        return self.func(message)
 
 
 class ThriftClientProtocol(basic.Int32StringReceiver):