THRIFT-1704: Tornado support (Python)
diff --git a/.gitignore b/.gitignore
index eb2ae4d..000a95c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -241,6 +241,9 @@
 /test/py.twisted/Makefile.in
 /test/py.twisted/_trial_temp/
 /test/py.twisted/test_suite.pyc
+/test/py.tornado/Makefile
+/test/py.tornado/Makefile.in
+/test/py.tornado/*.pyc
 /test/rb/Makefile
 /test/rb/Makefile.in
 /tutorial/Makefile
@@ -257,6 +260,8 @@
 /tutorial/js/build/
 /tutorial/py.twisted/Makefile
 /tutorial/py.twisted/Makefile.in
+/tutorial/py.tornado/Makefile
+/tutorial/py.tornado/Makefile.in
 /tutorial/py/Makefile
 /tutorial/py/Makefile.in
 /ylwrap
diff --git a/compiler/cpp/src/generate/t_py_generator.cc b/compiler/cpp/src/generate/t_py_generator.cc
index 6085670..fd954c5 100644
--- a/compiler/cpp/src/generate/t_py_generator.cc
+++ b/compiler/cpp/src/generate/t_py_generator.cc
@@ -91,13 +91,22 @@
     iter = parsed_options.find("twisted");
     gen_twisted_ = (iter != parsed_options.end());
 
+    iter = parsed_options.find("tornado");
+    gen_tornado_ = (iter != parsed_options.end());
+
+    if (gen_twisted_ && gen_tornado_) {
+      throw "at most one of 'twisted' and 'tornado' are allowed";
+    }
+
     iter = parsed_options.find("utf8strings");
     gen_utf8strings_ = (iter != parsed_options.end());
 
     copy_options_ = option_string;
 
-    if (gen_twisted_){
+    if (gen_twisted_) {
       out_dir_base_ = "gen-py.twisted";
+    } else if (gen_tornado_) {
+      out_dir_base_ = "gen-py.tornado";
     } else {
       out_dir_base_ = "gen-py";
     }
@@ -214,6 +223,17 @@
                                           t_doc* tdoc);
 
   /**
+   * a type for specifying to function_signature what type of Tornado callback
+   * parameter to add
+   */
+
+  enum tornado_callback_t {
+    NONE = 0,
+    MANDATORY_FOR_ONEWAY_ELSE_NONE = 1,
+    OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY = 2,
+  };
+
+  /**
    * Helper rendering functions
    */
 
@@ -224,9 +244,12 @@
   std::string declare_argument(t_field* tfield);
   std::string render_field_default_value(t_field* tfield);
   std::string type_name(t_type* ttype);
-  std::string function_signature(t_function* tfunction, std::string prefix="");
-  std::string function_signature_if(t_function* tfunction, std::string prefix="");
-  std::string argument_list(t_struct* tstruct);
+  std::string function_signature(t_function* tfunction,
+                                 bool interface=false,
+                                 tornado_callback_t callback=NONE);
+  std::string argument_list(t_struct* tstruct,
+                            std::vector<std::string> *pre=NULL,
+                            std::vector<std::string> *post=NULL);
   std::string type_to_enum(t_type* ttype);
   std::string type_to_spec_args(t_type* ttype);
 
@@ -277,6 +300,11 @@
   bool gen_twisted_;
 
   /**
+   * True if we should generate code for use with Tornado
+   */
+  bool gen_tornado_;
+
+  /**
    * True if strings should be encoded using utf-8.
    */
   bool gen_utf8strings_;
@@ -1027,6 +1055,9 @@
       "from zope.interface import Interface, implements" << endl <<
       "from twisted.internet import defer" << endl <<
       "from thrift.transport import TTwisted" << endl;
+  } else if (gen_tornado_) {
+    f_service_ << "from tornado import gen" << endl;
+    f_service_ << "from tornado import stack_context" << endl;
   }
 
   f_service_ << endl;
@@ -1098,7 +1129,7 @@
   } else {
     if (gen_twisted_) {
       extends_if = "(Interface)";
-    } else if (gen_newstyle_ || gen_dynamic_) {
+    } else if (gen_newstyle_ || gen_dynamic_ || gen_tornado_) {
       extends_if = "(object)";
     }
   }
@@ -1115,7 +1146,7 @@
     vector<t_function*>::iterator f_iter;
     for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
       f_service_ <<
-        indent() << "def " << function_signature_if(*f_iter) << ":" << endl;
+        indent() << "def " << function_signature(*f_iter, true, OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY) << ":" << endl;
       indent_up();
       generate_python_docstring(f_service_, (*f_iter));
       f_service_ <<
@@ -1165,6 +1196,9 @@
   if (gen_twisted_) {
     f_service_ <<
       indent() << "def __init__(self, transport, oprot_factory):" << endl;
+  } else if (gen_tornado_) {
+    f_service_ <<
+      indent() << "def __init__(self, transport, iprot_factory, oprot_factory=None):" << endl;
   } else {
     f_service_ <<
       indent() << "def __init__(self, iprot, oprot=None):" << endl;
@@ -1177,6 +1211,15 @@
         indent() << "  self._seqid = 0" << endl <<
         indent() << "  self._reqs = {}" << endl <<
         endl;
+    } else if (gen_tornado_) {
+      f_service_ <<
+        indent() << "  self._transport = transport" << endl <<
+        indent() << "  self._iprot_factory = iprot_factory" << endl <<
+        indent() << "  self._oprot_factory = (oprot_factory if oprot_factory is not None" << endl <<
+        indent() << "                         else iprot_factory)" << endl <<
+        indent() << "  self._seqid = 0" << endl <<
+        indent() << "  self._reqs = {}" << endl <<
+        endl;
     } else {
       f_service_ <<
         indent() << "  self._iprot = self._oprot = iprot" << endl <<
@@ -1190,6 +1233,10 @@
       f_service_ <<
         indent() << "  " << extends << ".Client.__init__(self, transport, oprot_factory)" << endl <<
         endl;
+    } else if (gen_tornado_) {
+      f_service_ <<
+        indent() << "  " << extends << ".Client.__init__(self, transport, iprot_factory, oprot_factory)" << endl <<
+        endl;
     } else {
       f_service_ <<
         indent() << "  " << extends << ".Client.__init__(self, iprot, oprot)" << endl <<
@@ -1197,6 +1244,24 @@
     }
   }
 
+  if (gen_tornado_ && extends.empty()) {
+    f_service_ <<
+      indent() << "@gen.engine" << endl <<
+      indent() << "def recv_dispatch(self):" << endl <<
+      indent() << "  \"\"\"read a response from the wire. schedule exactly one per send that" << endl <<
+      indent() << "  expects a response, but it doesn't matter which callee gets which" << endl <<
+      indent() << "  response; they're dispatched here properly\"\"\"" << endl <<
+      endl <<
+      indent() << "  # wait for a frame header" << endl <<
+      indent() << "  frame = yield gen.Task(self._transport.readFrame)" << endl <<
+      indent() << "  tr = TTransport.TMemoryBuffer(frame)" << endl <<
+      indent() << "  iprot = self._iprot_factory.getProtocol(tr)" << endl <<
+      indent() << "  (fname, mtype, rseqid) = iprot.readMessageBegin()" << endl <<
+      indent() << "  method = getattr(self, 'recv_' + fname)" << endl <<
+      indent() << "  method(iprot, mtype, rseqid)" << endl <<
+      endl;
+  }
+
   // Generate client method implementations
   vector<t_function*> functions = tservice->get_functions();
   vector<t_function*>::const_iterator f_iter;
@@ -1208,7 +1273,7 @@
 
     // Open function
     indent(f_service_) <<
-      "def " << function_signature(*f_iter) << ":" << endl;
+      "def " << function_signature(*f_iter, false, OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY) << ":" << endl;
     indent_up();
     generate_python_docstring(f_service_, (*f_iter));
     if (gen_twisted_) {
@@ -1217,6 +1282,12 @@
         indent(f_service_) <<
           "d = self._reqs[self._seqid] = defer.Deferred()" << endl;
       }
+    } else if (gen_tornado_) {
+      indent(f_service_) << "self._seqid += 1" << endl;
+      if (!(*f_iter)->is_oneway()) {
+        indent(f_service_) <<
+          "self._reqs[self._seqid] = callback" << endl;
+      }
     }
 
     indent(f_service_) <<
@@ -1231,12 +1302,24 @@
       }
       f_service_ << (*fld_iter)->get_name();
     }
+
+    if (gen_tornado_ && (*f_iter)->is_oneway()) {
+      if (first) {
+        first = false;
+      } else {
+        f_service_ << ", ";
+      }
+      f_service_ << "callback";
+    }
+
     f_service_ << ")" << endl;
 
     if (!(*f_iter)->is_oneway()) {
       f_service_ << indent();
       if (gen_twisted_) {
         f_service_ << "return d" << endl;
+      } else if (gen_tornado_) {
+        f_service_ << "self.recv_dispatch()" << endl;
       } else {
         if (!(*f_iter)->get_returntype()->is_void()) {
           f_service_ << "return ";
@@ -1254,14 +1337,14 @@
     f_service_ << endl;
 
     indent(f_service_) <<
-      "def send_" << function_signature(*f_iter) << ":" << endl;
+      "def send_" << function_signature(*f_iter, false, MANDATORY_FOR_ONEWAY_ELSE_NONE) << ":" << endl;
 
     indent_up();
 
     std::string argsname = (*f_iter)->get_name() + "_args";
 
     // Serialize the request header
-    if (gen_twisted_) {
+    if (gen_twisted_ || gen_tornado_) {
       f_service_ <<
         indent() << "oprot = self._oprot_factory.getProtocol(self._transport)" << endl <<
         indent() <<
@@ -1286,6 +1369,19 @@
         indent() << "args.write(oprot)" << endl <<
         indent() << "oprot.writeMessageEnd()" << endl <<
         indent() << "oprot.trans.flush()" << endl;
+    } else if (gen_tornado_) {
+      f_service_ <<
+        indent() << "args.write(oprot)" << endl <<
+        indent() << "oprot.writeMessageEnd()" << endl;
+      if ((*f_iter)->is_oneway()) {
+        // send_* carry the callback so you can block on the write's flush
+        // (rather than on receipt of the response)
+        f_service_ <<
+          indent() << "oprot.trans.flush(callback=callback)" << endl;
+      } else {
+        f_service_ <<
+          indent() << "oprot.trans.flush()" << endl;
+      }
     } else {
       f_service_ <<
         indent() << "args.write(self._oprot)" << endl <<
@@ -1300,7 +1396,7 @@
       // Open function
       f_service_ <<
         endl;
-      if (gen_twisted_) {
+      if (gen_twisted_ || gen_tornado_) {
         f_service_ <<
           indent() << "def recv_" << (*f_iter)->get_name() <<
               "(self, iprot, mtype, rseqid):" << endl;
@@ -1319,6 +1415,9 @@
       if (gen_twisted_) {
         f_service_ <<
           indent() << "d = self._reqs.pop(rseqid)" << endl;
+      } else if (gen_tornado_) {
+        f_service_ <<
+          indent() << "callback = self._reqs.pop(rseqid)" << endl;
       } else {
         f_service_ <<
           indent() << "(fname, mtype, rseqid) = self._iprot.readMessageBegin()" << endl;
@@ -1336,6 +1435,15 @@
           indent() << "result = " << resultname << "()" << endl <<
           indent() << "result.read(iprot)" << endl <<
           indent() << "iprot.readMessageEnd()" << endl;
+      } else if (gen_tornado_) {
+        f_service_ <<
+          indent() << "  x.read(iprot)" << endl <<
+          indent() << "  iprot.readMessageEnd()" << endl <<
+          indent() << "  callback(x)" << endl <<
+          indent() << "  return" << endl <<
+          indent() << "result = " << resultname << "()" << endl <<
+          indent() << "result.read(iprot)" << endl <<
+          indent() << "iprot.readMessageEnd()" << endl;
       } else {
         f_service_ <<
           indent() << "  x.read(self._iprot)" << endl <<
@@ -1353,6 +1461,10 @@
           if (gen_twisted_) {
             f_service_ <<
               indent() << "  return d.callback(result.success)" << endl;
+          } else if (gen_tornado_) {
+            f_service_ <<
+              indent() << "  callback(result.success)" << endl <<
+              indent() << "  return" << endl;
           } else {
             f_service_ <<
               indent() << "  return result.success" << endl;
@@ -1369,6 +1481,10 @@
             f_service_ <<
               indent() << "  return d.errback(result." << (*x_iter)->get_name() << ")" << endl;
 
+          } else if (gen_tornado_) {
+            f_service_ <<
+              indent() << "  callback(result." << (*x_iter)->get_name() << ")" << endl <<
+              indent() << "  return" << endl;
           } else {
             f_service_ <<
               indent() << "  raise result." << (*x_iter)->get_name() << "" << endl;
@@ -1378,16 +1494,24 @@
       // Careful, only return _result if not a void function
       if ((*f_iter)->get_returntype()->is_void()) {
         if (gen_twisted_) {
-          indent(f_service_) <<
-            "return d.callback(None)" << endl;
+          f_service_ <<
+            indent() << "return d.callback(None)" << endl;
+        } else if (gen_tornado_) {
+          f_service_ <<
+            indent() << "callback(None)" << endl <<
+            indent() << "return" << endl;
         } else {
-          indent(f_service_) <<
-            "return" << endl;
+          f_service_ <<
+            indent() << "return" << endl;
         }
       } else {
         if (gen_twisted_) {
           f_service_ <<
             indent() << "return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\"))" << endl;
+        } else if (gen_tornado_) {
+          f_service_ <<
+            indent() << "callback(TApplicationException(TApplicationException.MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\"))" << endl <<
+            indent() << "return" << endl;
         } else {
           f_service_ <<
             indent() << "raise TApplicationException(TApplicationException.MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
@@ -1645,9 +1769,22 @@
   f_service_ << endl;
 
   // Generate the server implementation
-  indent(f_service_) <<
-    "def process(self, iprot, oprot):" << endl;
-  indent_up();
+  if (gen_tornado_) {
+    f_service_ <<
+      indent() << "@gen.engine" << endl <<
+      indent() << "def process(self, transport, iprot_factory, oprot, callback):" << endl;
+    indent_up();
+    f_service_ <<
+      indent() << "# wait for a frame header" << endl <<
+      indent() << "frame = yield gen.Task(transport.readFrame)" << endl <<
+      indent() << "tr = TTransport.TMemoryBuffer(frame)" << endl <<
+      indent() << "iprot = iprot_factory.getProtocol(tr)" << endl <<
+      endl;
+  } else {
+    f_service_ <<
+      indent() << "def process(self, iprot, oprot):" << endl;
+    indent_up();
+  }
 
   f_service_ <<
     indent() << "(name, type, seqid) = iprot.readMessageBegin()" << endl;
@@ -1668,6 +1805,8 @@
   if (gen_twisted_) {
     f_service_ <<
       indent() << "  return defer.succeed(None)" << endl;
+  } else if (gen_tornado_) {
+    // nothing
   } else {
     f_service_ <<
       indent() << "  return" << endl;
@@ -1679,6 +1818,10 @@
   if (gen_twisted_) {
     f_service_ <<
       indent() << "  return self._processMap[name](self, seqid, iprot, oprot)" << endl;
+  } else if (gen_tornado_) {
+    f_service_ <<
+      indent() << "  yield gen.Task(self._processMap[name], self, seqid, iprot, oprot)" << endl <<
+      indent() << "callback()" << endl;
   } else {
     f_service_ <<
       indent() << "  self._processMap[name](self, seqid, iprot, oprot)" << endl;
@@ -1709,9 +1852,17 @@
                                                t_function* tfunction) {
   (void) tservice;
   // Open function
-  indent(f_service_) <<
-    "def process_" << tfunction->get_name() <<
-    "(self, seqid, iprot, oprot):" << endl;
+  if (gen_tornado_) {
+    f_service_ <<
+      indent() << "@gen.engine" << endl <<
+      indent() << "def process_" << tfunction->get_name() <<
+                  "(self, seqid, iprot, oprot, callback):" << endl;
+  } else {
+    f_service_ <<
+      indent() << "def process_" << tfunction->get_name() <<
+                  "(self, seqid, iprot, oprot):" << endl;
+  }
+
   indent_up();
 
   string argsname = tfunction->get_name() + "_args";
@@ -1827,8 +1978,81 @@
       indent_down();
       f_service_ << endl;
     }
-  } else {
 
+  } else if (gen_tornado_) {
+    if (!tfunction->is_oneway() && xceptions.size() > 0) {
+      f_service_ <<
+        endl <<
+        indent() << "def handle_exception(xtype, value, traceback):" << endl;
+
+      for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+        f_service_ <<
+          indent() << "  if xtype == " << type_name((*x_iter)->get_type()) << ":" << endl;
+        if (!tfunction->is_oneway()) {
+          f_service_ <<
+            indent() << "    result." << (*x_iter)->get_name() << " = value" << endl;
+        }
+        f_service_ <<
+          indent() << "    return True" << endl;
+      }
+
+      f_service_ <<
+        endl <<
+        indent() << "with stack_context.ExceptionStackContext(handle_exception):" << endl;
+      indent_up();
+    }
+
+    // Generate the function call
+    t_struct* arg_struct = tfunction->get_arglist();
+    const std::vector<t_field*>& fields = arg_struct->get_members();
+    vector<t_field*>::const_iterator f_iter;
+
+    f_service_ << indent();
+    if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
+      f_service_ << "result.success = ";
+    }
+    f_service_ <<
+      "yield gen.Task(self._handler." << tfunction->get_name() << ", ";
+    bool first = true;
+    for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+      if (first) {
+        first = false;
+      } else {
+        f_service_ << ", ";
+      }
+      f_service_ << "args." << (*f_iter)->get_name();
+    }
+    f_service_ << ")" << endl;
+
+    if (xceptions.size() > 0) {
+      f_service_ << endl;
+    }
+
+    if (!tfunction->is_oneway() && xceptions.size() > 0) {
+      indent_down();
+    }
+
+    // Shortcut out here for oneway functions
+    if (tfunction->is_oneway()) {
+      f_service_ <<
+        indent() << "callback()" << endl;
+      indent_down();
+      f_service_ << endl;
+      return;
+    }
+
+    f_service_ <<
+      indent() << "oprot.writeMessageBegin(\"" << tfunction->get_name() << "\", TMessageType.REPLY, seqid)" << endl <<
+      indent() << "result.write(oprot)" << endl <<
+      indent() << "oprot.writeMessageEnd()" << endl <<
+      indent() << "oprot.trans.flush()" << endl <<
+      indent() << "callback()" << endl;
+
+    // Close function
+    indent_down();
+    f_service_ << endl;
+
+  } else {  // py
     // Try block for a function with exceptions
     if (xceptions.size() > 0) {
       f_service_ <<
@@ -2001,7 +2225,7 @@
   if (ttype->is_map()) {
     out <<
       indent() << prefix << " = {}" << endl <<
-      indent() << "(" << ktype << ", " << vtype << ", " << size << " ) = iprot.readMapBegin() " << endl;
+      indent() << "(" << ktype << ", " << vtype << ", " << size << " ) = iprot.readMapBegin()" << endl;
   } else if (ttype->is_set()) {
     out <<
       indent() << prefix << " = set()" << endl <<
@@ -2382,46 +2606,56 @@
  * @return String of rendered function definition
  */
 string t_py_generator::function_signature(t_function* tfunction,
-                                          string prefix) {
-  string argument_list_result = argument_list(tfunction->get_arglist());
-  if (!argument_list_result.empty()) {
-    argument_list_result = "self, " + argument_list_result;
-  } else {
-    argument_list_result = "self";
+                                          bool interface,
+                                          tornado_callback_t callback) {
+  vector<string> pre;
+  vector<string> post;
+  string signature = tfunction->get_name() + "(";
+
+  if (!(gen_twisted_ && interface)) {
+    pre.push_back("self");
   }
 
-  return prefix + tfunction->get_name() + "(" + argument_list_result + ")";
-}
-
-/**
- * Renders an interface function signature of the form 'type name(args)'
- *
- * @param tfunction Function definition
- * @return String of rendered function definition
- */
-string t_py_generator::function_signature_if(t_function* tfunction,
-                                             string prefix) {
-  string argument_list_result = argument_list(tfunction->get_arglist());
-  if (!gen_twisted_) {
-    if (!argument_list_result.empty()) {
-      argument_list_result = "self, " + argument_list_result;
-    } else {
-      argument_list_result = "self";
+  if (gen_tornado_) {
+    if (callback == NONE) {
+    } else if (callback == MANDATORY_FOR_ONEWAY_ELSE_NONE) {
+      if (tfunction->is_oneway()) {
+        // Tornado send_* carry the callback so you can block on the write's flush
+        // (rather than on receipt of the response)
+        post.push_back("callback");
+      }
+    } else if (callback == OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY) {
+      if (tfunction->is_oneway()) {
+        post.push_back("callback=None");
+      } else {
+        post.push_back("callback");
+      }
     }
   }
-  return prefix + tfunction->get_name() + "(" + argument_list_result + ")";
+  signature += argument_list(tfunction->get_arglist(), &pre, &post) + ")";
+  return signature;
 }
 
-
 /**
  * Renders a field list
  */
-string t_py_generator::argument_list(t_struct* tstruct) {
+string t_py_generator::argument_list(t_struct* tstruct, vector<string> *pre, vector<string> *post) {
   string result = "";
 
   const vector<t_field*>& fields = tstruct->get_members();
   vector<t_field*>::const_iterator f_iter;
+  vector<string>::const_iterator s_iter;
   bool first = true;
+  if (pre) {
+    for (s_iter = pre->begin(); s_iter != pre->end(); ++s_iter) {
+      if (first) {
+        first = false;
+      } else {
+        result += ", ";
+      }
+      result += *s_iter;
+    }
+  }
   for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
     if (first) {
       first = false;
@@ -2430,6 +2664,16 @@
     }
     result += (*f_iter)->get_name();
   }
+  if (post) {
+    for (s_iter = post->begin(); s_iter != post->end(); ++s_iter) {
+      if (first) {
+        first = false;
+      } else {
+        result += ", ";
+      }
+      result += *s_iter;
+    }
+  }
   return result;
 }
 
@@ -2523,6 +2767,7 @@
 THRIFT_REGISTER_GENERATOR(py, "Python",
 "    new_style:       Generate new-style classes.\n" \
 "    twisted:         Generate Twisted-friendly RPC services.\n" \
+"    tornado:         Generate code for use with Tornado.\n" \
 "    utf8strings:     Encode/decode strings using utf8 in the generated code.\n" \
 "    slots:           Generate code using slots for instance members.\n" \
 "    dynamic:         Generate dynamic code, less code generated but slower.\n" \
diff --git a/configure.ac b/configure.ac
index 123c52b..54979ef 100755
--- a/configure.ac
+++ b/configure.ac
@@ -125,7 +125,7 @@
 
   AX_LIB_ZLIB([1.2.3])
   have_zlib=$success
-  
+
   AX_THRIFT_LIB(qt4, [Qt], yes)
   have_qt=no
   if test "$with_qt4" = "yes";  then
@@ -610,6 +610,7 @@
   test/perl/Makefile
   test/py/Makefile
   test/py.twisted/Makefile
+  test/py.tornado/Makefile
   test/rb/Makefile
   tutorial/Makefile
   tutorial/cpp/Makefile
@@ -617,6 +618,7 @@
   tutorial/js/Makefile
   tutorial/py/Makefile
   tutorial/py.twisted/Makefile
+  tutorial/py.tornado/Makefile
 ])
 
 AC_OUTPUT
diff --git a/lib/py/src/TTornado.py b/lib/py/src/TTornado.py
new file mode 100644
index 0000000..af309c3
--- /dev/null
+++ b/lib/py/src/TTornado.py
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from cStringIO import StringIO
+import logging
+import socket
+import struct
+
+from thrift.transport import TTransport
+from thrift.transport.TTransport import TTransportException
+
+from tornado import gen
+from tornado import iostream
+from tornado import netutil
+
+
+class TTornadoStreamTransport(TTransport.TTransportBase):
+    """a framed, buffered transport over a Tornado stream"""
+    def __init__(self, host, port, stream=None):
+        self.host = host
+        self.port = port
+        self.is_queuing_reads = False
+        self.read_queue = []
+        self.__wbuf = StringIO()
+
+        # servers provide a ready-to-go stream
+        self.stream = stream
+        if self.stream is not None:
+            self._set_close_callback()
+
+    # not the same number of parameters as TTransportBase.open
+    def open(self, callback):
+        logging.debug('socket connecting')
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+        self.stream = iostream.IOStream(sock)
+
+        def on_close_in_connect(*_):
+            message = 'could not connect to {}:{}'.format(self.host, self.port)
+            raise TTransportException(
+                type=TTransportException.NOT_OPEN,
+                message=message)
+        self.stream.set_close_callback(on_close_in_connect)
+
+        def finish(*_):
+            self._set_close_callback()
+            callback()
+
+        self.stream.connect((self.host, self.port), callback=finish)
+
+    def _set_close_callback(self):
+        def on_close():
+            raise TTransportException(
+                type=TTransportException.END_OF_FILE,
+                message='socket closed')
+        self.stream.set_close_callback(self.close)
+
+    def close(self):
+        # don't raise if we intend to close
+        self.stream.set_close_callback(None)
+        self.stream.close()
+
+    def read(self, _):
+        # The generated code for Tornado shouldn't do individual reads -- only
+        # frames at a time
+        assert "you're doing it wrong" is True
+
+    @gen.engine
+    def readFrame(self, callback):
+        self.read_queue.append(callback)
+        logging.debug('read queue: %s', self.read_queue)
+
+        if self.is_queuing_reads:
+            # If a read is already in flight, then the while loop below should
+            # pull it from self.read_queue
+            return
+
+        self.is_queuing_reads = True
+        while self.read_queue:
+            next_callback = self.read_queue.pop()
+            result = yield gen.Task(self._readFrameFromStream)
+            next_callback(result)
+        self.is_queuing_reads = False
+
+    @gen.engine
+    def _readFrameFromStream(self, callback):
+        logging.debug('_readFrameFromStream')
+        frame_header = yield gen.Task(self.stream.read_bytes, 4)
+        frame_length, = struct.unpack('!i', frame_header)
+        logging.debug('received frame header, frame length = %i', frame_length)
+        frame = yield gen.Task(self.stream.read_bytes, frame_length)
+        logging.debug('received frame payload')
+        callback(frame)
+
+    def write(self, buf):
+        self.__wbuf.write(buf)
+
+    def flush(self, callback=None):
+        wout = self.__wbuf.getvalue()
+        wsz = len(wout)
+        # reset wbuf before write/flush to preserve state on underlying failure
+        self.__wbuf = StringIO()
+        # N.B.: Doing this string concatenation is WAY cheaper than making
+        # two separate calls to the underlying socket object. Socket writes in
+        # Python turn out to be REALLY expensive, but it seems to do a pretty
+        # good job of managing string buffer operations without excessive copies
+        buf = struct.pack("!i", wsz) + wout
+
+        logging.debug('writing frame length = %i', wsz)
+        self.stream.write(buf, callback)
+
+
+class TTornadoServer(netutil.TCPServer):
+    def __init__(self, processor, iprot_factory, oprot_factory=None,
+                 *args, **kwargs):
+        super(TTornadoServer, self).__init__(*args, **kwargs)
+
+        self._processor = processor
+        self._iprot_factory = iprot_factory
+        self._oprot_factory = (oprot_factory if oprot_factory is not None
+                               else iprot_factory)
+
+    def handle_stream(self, stream, address):
+        try:
+            host, port = address
+            trans = TTornadoStreamTransport(host=host, port=port, stream=stream)
+            oprot = self._oprot_factory.getProtocol(trans)
+
+            def next_pass():
+                if not trans.stream.closed():
+                    self._processor.process(trans, self._iprot_factory, oprot,
+                                            callback=next_pass)
+
+            next_pass()
+
+        except Exception:
+            logging.exception('thrift exception in handle_stream')
+            trans.close()
diff --git a/test/Makefile.am b/test/Makefile.am
index aaa497f..7ebe51c 100755
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -34,6 +34,7 @@
 if WITH_PYTHON
 SUBDIRS += py
 SUBDIRS += py.twisted
+SUBDIRS += py.tornado
 endif
 
 if WITH_RUBY
@@ -61,6 +62,7 @@
 	php \
 	py \
 	py.twisted \
+	py.tornado \
 	rb \
 	threads \
 	AnnotationTest.thrift \
diff --git a/test/py.tornado/Makefile.am b/test/py.tornado/Makefile.am
new file mode 100644
index 0000000..a8e680a
--- /dev/null
+++ b/test/py.tornado/Makefile.am
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+THRIFT = $(top_srcdir)/compiler/cpp/thrift
+
+thrift_gen: ../ThriftTest.thrift ../SmallTest.thrift
+	$(THRIFT) --gen py:tornado ../ThriftTest.thrift
+	$(THRIFT) --gen py:tornado ../SmallTest.thrift
+
+check: thrift_gen
+	./test_suite.py
+
+clean-local:
+	$(RM) -r gen-py.tornado
diff --git a/test/py.tornado/test_suite.py b/test/py.tornado/test_suite.py
new file mode 100755
index 0000000..f04ba04
--- /dev/null
+++ b/test/py.tornado/test_suite.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import datetime
+import glob
+import sys
+import time
+import unittest
+
+sys.path.insert(0, './gen-py.tornado')
+sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])
+
+try:
+    __import__('tornado')
+except ImportError:
+    print "module `tornado` not found, skipping test"
+    sys.exit(0)
+
+from tornado import gen, ioloop, stack_context
+from tornado.testing import AsyncTestCase, get_unused_port
+
+from thrift import TTornado
+from thrift.protocol import TBinaryProtocol
+
+from ThriftTest import ThriftTest
+from ThriftTest.ttypes import *
+
+
+class TestHandler(object):
+    def __init__(self, test_instance):
+        self.test_instance = test_instance
+
+    def testVoid(self, callback):
+        callback()
+
+    def testString(self, s, callback):
+        callback(s)
+
+    def testByte(self, b, callback):
+        callback(b)
+
+    def testI16(self, i16, callback):
+        callback(i16)
+
+    def testI32(self, i32, callback):
+        callback(i32)
+
+    def testI64(self, i64, callback):
+        callback(i64)
+
+    def testDouble(self, dub, callback):
+        callback(dub)
+
+    def testStruct(self, thing, callback):
+        callback(thing)
+
+    def testException(self, s, callback):
+        if s == 'Xception':
+            x = Xception()
+            x.errorCode = 1001
+            x.message = s
+            raise x
+        elif s == 'throw_undeclared':
+            raise ValueError("foo")
+        callback()
+
+    def testOneway(self, seconds, callback=None):
+        start = time.time()
+        def fire_oneway():
+            end = time.time()
+            self.test_instance.stop((start, end, seconds))
+
+        ioloop.IOLoop.instance().add_timeout(
+            datetime.timedelta(seconds=seconds),
+            fire_oneway)
+
+        if callback:
+            callback()
+
+    def testNest(self, thing, callback):
+        callback(thing)
+
+    def testMap(self, thing, callback):
+        callback(thing)
+
+    def testSet(self, thing, callback):
+        callback(thing)
+
+    def testList(self, thing, callback):
+        callback(thing)
+
+    def testEnum(self, thing, callback):
+        callback(thing)
+
+    def testTypedef(self, thing, callback):
+        callback(thing)
+
+
+class ThriftTestCase(AsyncTestCase):
+    def get_new_ioloop(self):
+        return ioloop.IOLoop.instance()
+
+    def setUp(self):
+        self.port = get_unused_port()
+        self.io_loop = self.get_new_ioloop()
+
+        # server
+        self.handler = TestHandler(self)
+        self.processor = ThriftTest.Processor(self.handler)
+        self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+
+        self.server = TTornado.TTornadoServer(self.processor, self.pfactory)
+        self.server.bind(self.port)
+        self.server.start(1)
+
+        # client
+        transport = TTornado.TTornadoStreamTransport('localhost', self.port)
+        pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+        self.client = ThriftTest.Client(transport, pfactory)
+        transport.open(callback=self.stop)
+        self.wait(timeout=1)
+
+    def test_void(self):
+        self.client.testVoid(callback=self.stop)
+        v = self.wait(timeout=1)
+        self.assertEquals(v, None)
+
+    def test_string(self):
+        self.client.testString('Python', callback=self.stop)
+        v = self.wait(timeout=1)
+        self.assertEquals(v, 'Python')
+
+    def test_byte(self):
+        self.client.testByte(63, callback=self.stop)
+        v = self.wait(timeout=1)
+        self.assertEquals(v, 63)
+
+    def test_i32(self):
+        self.client.testI32(-1, callback=self.stop)
+        v = self.wait(timeout=1)
+        self.assertEquals(v, -1)
+
+        self.client.testI32(0, callback=self.stop)
+        v = self.wait(timeout=1)
+        self.assertEquals(v, 0)
+
+    def test_i64(self):
+        self.client.testI64(-34359738368, callback=self.stop)
+        v = self.wait(timeout=1)
+        self.assertEquals(v, -34359738368)
+
+    def test_double(self):
+        self.client.testDouble(-5.235098235, callback=self.stop)
+        v = self.wait(timeout=1)
+        self.assertEquals(v, -5.235098235)
+
+    def test_struct(self):
+        x = Xtruct()
+        x.string_thing = "Zero"
+        x.byte_thing = 1
+        x.i32_thing = -3
+        x.i64_thing = -5
+        self.client.testStruct(x, callback=self.stop)
+
+        y = self.wait(timeout=1)
+        self.assertEquals(y.string_thing, "Zero")
+        self.assertEquals(y.byte_thing, 1)
+        self.assertEquals(y.i32_thing, -3)
+        self.assertEquals(y.i64_thing, -5)
+
+    def test_exception(self):
+        self.client.testException('Safe', callback=self.stop)
+        v = self.wait(timeout=1)
+
+        self.client.testException('Xception', callback=self.stop)
+        ex = self.wait(timeout=1)
+        if type(ex) == Xception:
+            self.assertEquals(ex.errorCode, 1001)
+            self.assertEquals(ex.message, 'Xception')
+        else:
+            self.fail("should have gotten exception")
+
+    def test_oneway(self):
+        def return_from_send():
+            self.stop('done with send')
+        self.client.testOneway(0.5, callback=return_from_send)
+        self.assertEquals(self.wait(timeout=1), 'done with send')
+
+        start, end, seconds = self.wait(timeout=1)
+        self.assertAlmostEquals(seconds, (end - start), places=3)
+
+
+def suite():
+    suite = unittest.TestSuite()
+    loader = unittest.TestLoader()
+    suite.addTest(loader.loadTestsFromTestCase(ThriftTestCase))
+    return suite
+
+
+if __name__ == '__main__':
+    unittest.TestProgram(defaultTest='suite',
+                         testRunner=unittest.TextTestRunner(verbosity=1))
diff --git a/tutorial/Makefile.am b/tutorial/Makefile.am
index 169a2c1..86c08c0 100755
--- a/tutorial/Makefile.am
+++ b/tutorial/Makefile.am
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-SUBDIRS = 
+SUBDIRS =
 
 if MINGW
 # do nothing, just build the compiler
@@ -43,6 +43,7 @@
 if WITH_PYTHON
 SUBDIRS += py
 SUBDIRS += py.twisted
+SUBDIRS += py.tornado
 endif
 
 if WITH_RUBY
diff --git a/tutorial/py.tornado/Makefile.am b/tutorial/py.tornado/Makefile.am
new file mode 100755
index 0000000..6ac6023
--- /dev/null
+++ b/tutorial/py.tornado/Makefile.am
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+THRIFT = $(top_builddir)/compiler/cpp/thrift
+
+gen-py.tornado/tutorial/Calculator.py gen-py.tornado/shared/SharedService.py: $(top_srcdir)/tutorial/tutorial.thrift
+	$(THRIFT) --gen py:tornado -r $<
+
+all-local: gen-py.tornado/tutorial/Calculator.py
+
+tutorialserver: all
+	${PYTHON} PythonServer.py
+
+tutorialclient: all
+	${PYTHON} PythonClient.py
+
+clean-local:
+	$(RM) -r gen-*
+
+EXTRA_DIST = \
+	PythonServer.py \
+	PythonClient.py
diff --git a/tutorial/py.tornado/PythonClient.py b/tutorial/py.tornado/PythonClient.py
new file mode 100755
index 0000000..95d78b8
--- /dev/null
+++ b/tutorial/py.tornado/PythonClient.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import glob
+sys.path.append('gen-py.tornado')
+sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])
+
+import logging
+
+from tutorial import Calculator
+from tutorial.ttypes import Operation, Work, InvalidOperation
+
+from thrift import TTornado
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+
+from tornado import gen
+from tornado import ioloop
+
+
+@gen.engine
+def communicate(callback=None):
+    # create client
+    transport = TTornado.TTornadoStreamTransport('localhost', 9090)
+    pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+    client = Calculator.Client(transport, pfactory)
+
+    # open the transport, bail on error
+    try:
+        yield gen.Task(transport.open)
+    except TTransport.TTransportException as ex:
+        logging.error(ex)
+        if callback:
+            callback()
+        return
+
+    # ping
+    yield gen.Task(client.ping)
+    print "ping()"
+
+    # add
+    sum_ = yield gen.Task(client.add, 1, 1)
+    print "1 + 1 = {}".format(sum_)
+
+    # make a oneway call without a callback (schedule the write and continue
+    # without blocking)
+    client.zip()
+    print "zip() without callback"
+
+    # make a oneway call with a callback (we'll wait for the stream write to
+    # complete before continuing)
+    yield gen.Task(client.zip)
+    print "zip() with callback"
+
+    # calculate 1/0
+    work = Work()
+    work.op = Operation.DIVIDE
+    work.num1 = 1
+    work.num2 = 0
+
+    try:
+        quotient = yield gen.Task(client.calculate, 1, work)
+        print "Whoa? You know how to divide by zero?"
+    except InvalidOperation as io:
+        print "InvalidOperation: {}".format(io)
+
+    # calculate 15-10
+    work.op = Operation.SUBTRACT
+    work.num1 = 15
+    work.num2 = 10
+
+    diff = yield gen.Task(client.calculate, 1, work)
+    print "15 - 10 = {}".format(diff)
+
+    # getStruct
+    log = yield gen.Task(client.getStruct, 1)
+    print "Check log: {}".format(log.value)
+
+    # close the transport
+    client._transport.close()
+
+    if callback:
+        callback()
+
+
+def main():
+    # create an ioloop, do the above, then stop
+    io_loop = ioloop.IOLoop.instance()
+    def this_joint():
+        communicate(callback=io_loop.stop)
+    io_loop.add_callback(this_joint)
+    io_loop.start()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/tutorial/py.tornado/PythonServer.py b/tutorial/py.tornado/PythonServer.py
new file mode 100755
index 0000000..52932ff
--- /dev/null
+++ b/tutorial/py.tornado/PythonServer.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import glob
+sys.path.append('gen-py.tornado')
+sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])
+
+from tutorial import Calculator
+from tutorial.ttypes import Operation, InvalidOperation
+
+from shared.ttypes import SharedStruct
+
+from thrift import TTornado
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+from tornado import ioloop
+
+
+class CalculatorHandler(object):
+    def __init__(self):
+        self.log = {}
+
+    def ping(self, callback):
+        print "ping()"
+        callback()
+
+    def add(self, n1, n2, callback):
+        print "add({}, {})".format(n1, n2)
+        callback(n1 + n2)
+
+    def calculate(self, logid, work, callback):
+        print "calculate({}, {})".format(logid, work)
+
+        if work.op == Operation.ADD:
+            val = work.num1 + work.num2
+        elif work.op == Operation.SUBTRACT:
+            val = work.num1 - work.num2
+        elif work.op == Operation.MULTIPLY:
+            val = work.num1 * work.num2
+        elif work.op == Operation.DIVIDE:
+            if work.num2 == 0:
+                x = InvalidOperation()
+                x.what = work.op
+                x.why = "Cannot divide by 0"
+                raise x
+            val = work.num1 / work.num2
+        else:
+            x = InvalidOperation()
+            x.what = work.op
+            x.why = "Invalid operation"
+            raise x
+
+        log = SharedStruct()
+        log.key = logid
+        log.value = '%d' % (val)
+        self.log[logid] = log
+        callback(val)
+
+    def getStruct(self, key, callback):
+        print "getStruct({})".format(key)
+        callback(self.log[key])
+
+    def zip(self, callback):
+        print "zip()"
+        callback()
+
+
+def main():
+    handler = CalculatorHandler()
+    processor = Calculator.Processor(handler)
+    pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+    server = TTornado.TTornadoServer(processor, pfactory)
+
+    print "Starting the server..."
+    server.bind(9090)
+    server.start(1)
+    ioloop.IOLoop.instance().start()
+    print "done."
+
+
+if __name__ == "__main__":
+    main()