THRIFT-1704: Tornado support (Python)
diff --git a/.gitignore b/.gitignore
index eb2ae4d..000a95c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -241,6 +241,9 @@
/test/py.twisted/Makefile.in
/test/py.twisted/_trial_temp/
/test/py.twisted/test_suite.pyc
+/test/py.tornado/Makefile
+/test/py.tornado/Makefile.in
+/test/py.tornado/*.pyc
/test/rb/Makefile
/test/rb/Makefile.in
/tutorial/Makefile
@@ -257,6 +260,8 @@
/tutorial/js/build/
/tutorial/py.twisted/Makefile
/tutorial/py.twisted/Makefile.in
+/tutorial/py.tornado/Makefile
+/tutorial/py.tornado/Makefile.in
/tutorial/py/Makefile
/tutorial/py/Makefile.in
/ylwrap
diff --git a/compiler/cpp/src/generate/t_py_generator.cc b/compiler/cpp/src/generate/t_py_generator.cc
index 6085670..fd954c5 100644
--- a/compiler/cpp/src/generate/t_py_generator.cc
+++ b/compiler/cpp/src/generate/t_py_generator.cc
@@ -91,13 +91,22 @@
iter = parsed_options.find("twisted");
gen_twisted_ = (iter != parsed_options.end());
+ iter = parsed_options.find("tornado");
+ gen_tornado_ = (iter != parsed_options.end());
+
+ if (gen_twisted_ && gen_tornado_) {
+ throw "at most one of 'twisted' and 'tornado' are allowed";
+ }
+
iter = parsed_options.find("utf8strings");
gen_utf8strings_ = (iter != parsed_options.end());
copy_options_ = option_string;
- if (gen_twisted_){
+ if (gen_twisted_) {
out_dir_base_ = "gen-py.twisted";
+ } else if (gen_tornado_) {
+ out_dir_base_ = "gen-py.tornado";
} else {
out_dir_base_ = "gen-py";
}
@@ -214,6 +223,17 @@
t_doc* tdoc);
/**
+ * a type for specifying to function_signature what type of Tornado callback
+ * parameter to add
+ */
+
+ enum tornado_callback_t {
+ NONE = 0,
+ MANDATORY_FOR_ONEWAY_ELSE_NONE = 1,
+ OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY = 2,
+ };
+
+ /**
* Helper rendering functions
*/
@@ -224,9 +244,12 @@
std::string declare_argument(t_field* tfield);
std::string render_field_default_value(t_field* tfield);
std::string type_name(t_type* ttype);
- std::string function_signature(t_function* tfunction, std::string prefix="");
- std::string function_signature_if(t_function* tfunction, std::string prefix="");
- std::string argument_list(t_struct* tstruct);
+ std::string function_signature(t_function* tfunction,
+ bool interface=false,
+ tornado_callback_t callback=NONE);
+ std::string argument_list(t_struct* tstruct,
+ std::vector<std::string> *pre=NULL,
+ std::vector<std::string> *post=NULL);
std::string type_to_enum(t_type* ttype);
std::string type_to_spec_args(t_type* ttype);
@@ -277,6 +300,11 @@
bool gen_twisted_;
/**
+ * True if we should generate code for use with Tornado
+ */
+ bool gen_tornado_;
+
+ /**
* True if strings should be encoded using utf-8.
*/
bool gen_utf8strings_;
@@ -1027,6 +1055,9 @@
"from zope.interface import Interface, implements" << endl <<
"from twisted.internet import defer" << endl <<
"from thrift.transport import TTwisted" << endl;
+ } else if (gen_tornado_) {
+ f_service_ << "from tornado import gen" << endl;
+ f_service_ << "from tornado import stack_context" << endl;
}
f_service_ << endl;
@@ -1098,7 +1129,7 @@
} else {
if (gen_twisted_) {
extends_if = "(Interface)";
- } else if (gen_newstyle_ || gen_dynamic_) {
+ } else if (gen_newstyle_ || gen_dynamic_ || gen_tornado_) {
extends_if = "(object)";
}
}
@@ -1115,7 +1146,7 @@
vector<t_function*>::iterator f_iter;
for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
f_service_ <<
- indent() << "def " << function_signature_if(*f_iter) << ":" << endl;
+ indent() << "def " << function_signature(*f_iter, true, OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY) << ":" << endl;
indent_up();
generate_python_docstring(f_service_, (*f_iter));
f_service_ <<
@@ -1165,6 +1196,9 @@
if (gen_twisted_) {
f_service_ <<
indent() << "def __init__(self, transport, oprot_factory):" << endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << "def __init__(self, transport, iprot_factory, oprot_factory=None):" << endl;
} else {
f_service_ <<
indent() << "def __init__(self, iprot, oprot=None):" << endl;
@@ -1177,6 +1211,15 @@
indent() << " self._seqid = 0" << endl <<
indent() << " self._reqs = {}" << endl <<
endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << " self._transport = transport" << endl <<
+ indent() << " self._iprot_factory = iprot_factory" << endl <<
+ indent() << " self._oprot_factory = (oprot_factory if oprot_factory is not None" << endl <<
+ indent() << " else iprot_factory)" << endl <<
+ indent() << " self._seqid = 0" << endl <<
+ indent() << " self._reqs = {}" << endl <<
+ endl;
} else {
f_service_ <<
indent() << " self._iprot = self._oprot = iprot" << endl <<
@@ -1190,6 +1233,10 @@
f_service_ <<
indent() << " " << extends << ".Client.__init__(self, transport, oprot_factory)" << endl <<
endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << " " << extends << ".Client.__init__(self, transport, iprot_factory, oprot_factory)" << endl <<
+ endl;
} else {
f_service_ <<
indent() << " " << extends << ".Client.__init__(self, iprot, oprot)" << endl <<
@@ -1197,6 +1244,24 @@
}
}
+ if (gen_tornado_ && extends.empty()) {
+ f_service_ <<
+ indent() << "@gen.engine" << endl <<
+ indent() << "def recv_dispatch(self):" << endl <<
+ indent() << " \"\"\"read a response from the wire. schedule exactly one per send that" << endl <<
+ indent() << " expects a response, but it doesn't matter which callee gets which" << endl <<
+ indent() << " response; they're dispatched here properly\"\"\"" << endl <<
+ endl <<
+ indent() << " # wait for a frame header" << endl <<
+ indent() << " frame = yield gen.Task(self._transport.readFrame)" << endl <<
+ indent() << " tr = TTransport.TMemoryBuffer(frame)" << endl <<
+ indent() << " iprot = self._iprot_factory.getProtocol(tr)" << endl <<
+ indent() << " (fname, mtype, rseqid) = iprot.readMessageBegin()" << endl <<
+ indent() << " method = getattr(self, 'recv_' + fname)" << endl <<
+ indent() << " method(iprot, mtype, rseqid)" << endl <<
+ endl;
+ }
+
// Generate client method implementations
vector<t_function*> functions = tservice->get_functions();
vector<t_function*>::const_iterator f_iter;
@@ -1208,7 +1273,7 @@
// Open function
indent(f_service_) <<
- "def " << function_signature(*f_iter) << ":" << endl;
+ "def " << function_signature(*f_iter, false, OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY) << ":" << endl;
indent_up();
generate_python_docstring(f_service_, (*f_iter));
if (gen_twisted_) {
@@ -1217,6 +1282,12 @@
indent(f_service_) <<
"d = self._reqs[self._seqid] = defer.Deferred()" << endl;
}
+ } else if (gen_tornado_) {
+ indent(f_service_) << "self._seqid += 1" << endl;
+ if (!(*f_iter)->is_oneway()) {
+ indent(f_service_) <<
+ "self._reqs[self._seqid] = callback" << endl;
+ }
}
indent(f_service_) <<
@@ -1231,12 +1302,24 @@
}
f_service_ << (*fld_iter)->get_name();
}
+
+ if (gen_tornado_ && (*f_iter)->is_oneway()) {
+ if (first) {
+ first = false;
+ } else {
+ f_service_ << ", ";
+ }
+ f_service_ << "callback";
+ }
+
f_service_ << ")" << endl;
if (!(*f_iter)->is_oneway()) {
f_service_ << indent();
if (gen_twisted_) {
f_service_ << "return d" << endl;
+ } else if (gen_tornado_) {
+ f_service_ << "self.recv_dispatch()" << endl;
} else {
if (!(*f_iter)->get_returntype()->is_void()) {
f_service_ << "return ";
@@ -1254,14 +1337,14 @@
f_service_ << endl;
indent(f_service_) <<
- "def send_" << function_signature(*f_iter) << ":" << endl;
+ "def send_" << function_signature(*f_iter, false, MANDATORY_FOR_ONEWAY_ELSE_NONE) << ":" << endl;
indent_up();
std::string argsname = (*f_iter)->get_name() + "_args";
// Serialize the request header
- if (gen_twisted_) {
+ if (gen_twisted_ || gen_tornado_) {
f_service_ <<
indent() << "oprot = self._oprot_factory.getProtocol(self._transport)" << endl <<
indent() <<
@@ -1286,6 +1369,19 @@
indent() << "args.write(oprot)" << endl <<
indent() << "oprot.writeMessageEnd()" << endl <<
indent() << "oprot.trans.flush()" << endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << "args.write(oprot)" << endl <<
+ indent() << "oprot.writeMessageEnd()" << endl;
+ if ((*f_iter)->is_oneway()) {
+ // send_* carry the callback so you can block on the write's flush
+ // (rather than on receipt of the response)
+ f_service_ <<
+ indent() << "oprot.trans.flush(callback=callback)" << endl;
+ } else {
+ f_service_ <<
+ indent() << "oprot.trans.flush()" << endl;
+ }
} else {
f_service_ <<
indent() << "args.write(self._oprot)" << endl <<
@@ -1300,7 +1396,7 @@
// Open function
f_service_ <<
endl;
- if (gen_twisted_) {
+ if (gen_twisted_ || gen_tornado_) {
f_service_ <<
indent() << "def recv_" << (*f_iter)->get_name() <<
"(self, iprot, mtype, rseqid):" << endl;
@@ -1319,6 +1415,9 @@
if (gen_twisted_) {
f_service_ <<
indent() << "d = self._reqs.pop(rseqid)" << endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << "callback = self._reqs.pop(rseqid)" << endl;
} else {
f_service_ <<
indent() << "(fname, mtype, rseqid) = self._iprot.readMessageBegin()" << endl;
@@ -1336,6 +1435,15 @@
indent() << "result = " << resultname << "()" << endl <<
indent() << "result.read(iprot)" << endl <<
indent() << "iprot.readMessageEnd()" << endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << " x.read(iprot)" << endl <<
+ indent() << " iprot.readMessageEnd()" << endl <<
+ indent() << " callback(x)" << endl <<
+ indent() << " return" << endl <<
+ indent() << "result = " << resultname << "()" << endl <<
+ indent() << "result.read(iprot)" << endl <<
+ indent() << "iprot.readMessageEnd()" << endl;
} else {
f_service_ <<
indent() << " x.read(self._iprot)" << endl <<
@@ -1353,6 +1461,10 @@
if (gen_twisted_) {
f_service_ <<
indent() << " return d.callback(result.success)" << endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << " callback(result.success)" << endl <<
+ indent() << " return" << endl;
} else {
f_service_ <<
indent() << " return result.success" << endl;
@@ -1369,6 +1481,10 @@
f_service_ <<
indent() << " return d.errback(result." << (*x_iter)->get_name() << ")" << endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << " callback(result." << (*x_iter)->get_name() << ")" << endl <<
+ indent() << " return" << endl;
} else {
f_service_ <<
indent() << " raise result." << (*x_iter)->get_name() << "" << endl;
@@ -1378,16 +1494,24 @@
// Careful, only return _result if not a void function
if ((*f_iter)->get_returntype()->is_void()) {
if (gen_twisted_) {
- indent(f_service_) <<
- "return d.callback(None)" << endl;
+ f_service_ <<
+ indent() << "return d.callback(None)" << endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << "callback(None)" << endl <<
+ indent() << "return" << endl;
} else {
- indent(f_service_) <<
- "return" << endl;
+ f_service_ <<
+ indent() << "return" << endl;
}
} else {
if (gen_twisted_) {
f_service_ <<
indent() << "return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\"))" << endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << "callback(TApplicationException(TApplicationException.MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\"))" << endl <<
+ indent() << "return" << endl;
} else {
f_service_ <<
indent() << "raise TApplicationException(TApplicationException.MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl;
@@ -1645,9 +1769,22 @@
f_service_ << endl;
// Generate the server implementation
- indent(f_service_) <<
- "def process(self, iprot, oprot):" << endl;
- indent_up();
+ if (gen_tornado_) {
+ f_service_ <<
+ indent() << "@gen.engine" << endl <<
+ indent() << "def process(self, transport, iprot_factory, oprot, callback):" << endl;
+ indent_up();
+ f_service_ <<
+ indent() << "# wait for a frame header" << endl <<
+ indent() << "frame = yield gen.Task(transport.readFrame)" << endl <<
+ indent() << "tr = TTransport.TMemoryBuffer(frame)" << endl <<
+ indent() << "iprot = iprot_factory.getProtocol(tr)" << endl <<
+ endl;
+ } else {
+ f_service_ <<
+ indent() << "def process(self, iprot, oprot):" << endl;
+ indent_up();
+ }
f_service_ <<
indent() << "(name, type, seqid) = iprot.readMessageBegin()" << endl;
@@ -1668,6 +1805,8 @@
if (gen_twisted_) {
f_service_ <<
indent() << " return defer.succeed(None)" << endl;
+ } else if (gen_tornado_) {
+ // nothing
} else {
f_service_ <<
indent() << " return" << endl;
@@ -1679,6 +1818,10 @@
if (gen_twisted_) {
f_service_ <<
indent() << " return self._processMap[name](self, seqid, iprot, oprot)" << endl;
+ } else if (gen_tornado_) {
+ f_service_ <<
+ indent() << " yield gen.Task(self._processMap[name], self, seqid, iprot, oprot)" << endl <<
+ indent() << "callback()" << endl;
} else {
f_service_ <<
indent() << " self._processMap[name](self, seqid, iprot, oprot)" << endl;
@@ -1709,9 +1852,17 @@
t_function* tfunction) {
(void) tservice;
// Open function
- indent(f_service_) <<
- "def process_" << tfunction->get_name() <<
- "(self, seqid, iprot, oprot):" << endl;
+ if (gen_tornado_) {
+ f_service_ <<
+ indent() << "@gen.engine" << endl <<
+ indent() << "def process_" << tfunction->get_name() <<
+ "(self, seqid, iprot, oprot, callback):" << endl;
+ } else {
+ f_service_ <<
+ indent() << "def process_" << tfunction->get_name() <<
+ "(self, seqid, iprot, oprot):" << endl;
+ }
+
indent_up();
string argsname = tfunction->get_name() + "_args";
@@ -1827,8 +1978,81 @@
indent_down();
f_service_ << endl;
}
- } else {
+ } else if (gen_tornado_) {
+ if (!tfunction->is_oneway() && xceptions.size() > 0) {
+ f_service_ <<
+ endl <<
+ indent() << "def handle_exception(xtype, value, traceback):" << endl;
+
+ for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+ f_service_ <<
+ indent() << " if xtype == " << type_name((*x_iter)->get_type()) << ":" << endl;
+ if (!tfunction->is_oneway()) {
+ f_service_ <<
+ indent() << " result." << (*x_iter)->get_name() << " = value" << endl;
+ }
+ f_service_ <<
+ indent() << " return True" << endl;
+ }
+
+ f_service_ <<
+ endl <<
+ indent() << "with stack_context.ExceptionStackContext(handle_exception):" << endl;
+ indent_up();
+ }
+
+ // Generate the function call
+ t_struct* arg_struct = tfunction->get_arglist();
+ const std::vector<t_field*>& fields = arg_struct->get_members();
+ vector<t_field*>::const_iterator f_iter;
+
+ f_service_ << indent();
+ if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
+ f_service_ << "result.success = ";
+ }
+ f_service_ <<
+ "yield gen.Task(self._handler." << tfunction->get_name() << ", ";
+ bool first = true;
+ for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+ if (first) {
+ first = false;
+ } else {
+ f_service_ << ", ";
+ }
+ f_service_ << "args." << (*f_iter)->get_name();
+ }
+ f_service_ << ")" << endl;
+
+ if (xceptions.size() > 0) {
+ f_service_ << endl;
+ }
+
+ if (!tfunction->is_oneway() && xceptions.size() > 0) {
+ indent_down();
+ }
+
+ // Shortcut out here for oneway functions
+ if (tfunction->is_oneway()) {
+ f_service_ <<
+ indent() << "callback()" << endl;
+ indent_down();
+ f_service_ << endl;
+ return;
+ }
+
+ f_service_ <<
+ indent() << "oprot.writeMessageBegin(\"" << tfunction->get_name() << "\", TMessageType.REPLY, seqid)" << endl <<
+ indent() << "result.write(oprot)" << endl <<
+ indent() << "oprot.writeMessageEnd()" << endl <<
+ indent() << "oprot.trans.flush()" << endl <<
+ indent() << "callback()" << endl;
+
+ // Close function
+ indent_down();
+ f_service_ << endl;
+
+ } else { // py
// Try block for a function with exceptions
if (xceptions.size() > 0) {
f_service_ <<
@@ -2001,7 +2225,7 @@
if (ttype->is_map()) {
out <<
indent() << prefix << " = {}" << endl <<
- indent() << "(" << ktype << ", " << vtype << ", " << size << " ) = iprot.readMapBegin() " << endl;
+ indent() << "(" << ktype << ", " << vtype << ", " << size << " ) = iprot.readMapBegin()" << endl;
} else if (ttype->is_set()) {
out <<
indent() << prefix << " = set()" << endl <<
@@ -2382,46 +2606,56 @@
* @return String of rendered function definition
*/
string t_py_generator::function_signature(t_function* tfunction,
- string prefix) {
- string argument_list_result = argument_list(tfunction->get_arglist());
- if (!argument_list_result.empty()) {
- argument_list_result = "self, " + argument_list_result;
- } else {
- argument_list_result = "self";
+ bool interface,
+ tornado_callback_t callback) {
+ vector<string> pre;
+ vector<string> post;
+ string signature = tfunction->get_name() + "(";
+
+ if (!(gen_twisted_ && interface)) {
+ pre.push_back("self");
}
- return prefix + tfunction->get_name() + "(" + argument_list_result + ")";
-}
-
-/**
- * Renders an interface function signature of the form 'type name(args)'
- *
- * @param tfunction Function definition
- * @return String of rendered function definition
- */
-string t_py_generator::function_signature_if(t_function* tfunction,
- string prefix) {
- string argument_list_result = argument_list(tfunction->get_arglist());
- if (!gen_twisted_) {
- if (!argument_list_result.empty()) {
- argument_list_result = "self, " + argument_list_result;
- } else {
- argument_list_result = "self";
+ if (gen_tornado_) {
+ if (callback == NONE) {
+ } else if (callback == MANDATORY_FOR_ONEWAY_ELSE_NONE) {
+ if (tfunction->is_oneway()) {
+ // Tornado send_* carry the callback so you can block on the write's flush
+ // (rather than on receipt of the response)
+ post.push_back("callback");
+ }
+ } else if (callback == OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY) {
+ if (tfunction->is_oneway()) {
+ post.push_back("callback=None");
+ } else {
+ post.push_back("callback");
+ }
}
}
- return prefix + tfunction->get_name() + "(" + argument_list_result + ")";
+ signature += argument_list(tfunction->get_arglist(), &pre, &post) + ")";
+ return signature;
}
-
/**
* Renders a field list
*/
-string t_py_generator::argument_list(t_struct* tstruct) {
+string t_py_generator::argument_list(t_struct* tstruct, vector<string> *pre, vector<string> *post) {
string result = "";
const vector<t_field*>& fields = tstruct->get_members();
vector<t_field*>::const_iterator f_iter;
+ vector<string>::const_iterator s_iter;
bool first = true;
+ if (pre) {
+ for (s_iter = pre->begin(); s_iter != pre->end(); ++s_iter) {
+ if (first) {
+ first = false;
+ } else {
+ result += ", ";
+ }
+ result += *s_iter;
+ }
+ }
for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
if (first) {
first = false;
@@ -2430,6 +2664,16 @@
}
result += (*f_iter)->get_name();
}
+ if (post) {
+ for (s_iter = post->begin(); s_iter != post->end(); ++s_iter) {
+ if (first) {
+ first = false;
+ } else {
+ result += ", ";
+ }
+ result += *s_iter;
+ }
+ }
return result;
}
@@ -2523,6 +2767,7 @@
THRIFT_REGISTER_GENERATOR(py, "Python",
" new_style: Generate new-style classes.\n" \
" twisted: Generate Twisted-friendly RPC services.\n" \
+" tornado: Generate code for use with Tornado.\n" \
" utf8strings: Encode/decode strings using utf8 in the generated code.\n" \
" slots: Generate code using slots for instance members.\n" \
" dynamic: Generate dynamic code, less code generated but slower.\n" \
diff --git a/configure.ac b/configure.ac
index 123c52b..54979ef 100755
--- a/configure.ac
+++ b/configure.ac
@@ -125,7 +125,7 @@
AX_LIB_ZLIB([1.2.3])
have_zlib=$success
-
+
AX_THRIFT_LIB(qt4, [Qt], yes)
have_qt=no
if test "$with_qt4" = "yes"; then
@@ -610,6 +610,7 @@
test/perl/Makefile
test/py/Makefile
test/py.twisted/Makefile
+ test/py.tornado/Makefile
test/rb/Makefile
tutorial/Makefile
tutorial/cpp/Makefile
@@ -617,6 +618,7 @@
tutorial/js/Makefile
tutorial/py/Makefile
tutorial/py.twisted/Makefile
+ tutorial/py.tornado/Makefile
])
AC_OUTPUT
diff --git a/lib/py/src/TTornado.py b/lib/py/src/TTornado.py
new file mode 100644
index 0000000..af309c3
--- /dev/null
+++ b/lib/py/src/TTornado.py
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from cStringIO import StringIO
+import logging
+import socket
+import struct
+
+from thrift.transport import TTransport
+from thrift.transport.TTransport import TTransportException
+
+from tornado import gen
+from tornado import iostream
+from tornado import netutil
+
+
+class TTornadoStreamTransport(TTransport.TTransportBase):
+ """a framed, buffered transport over a Tornado stream"""
+ def __init__(self, host, port, stream=None):
+ self.host = host
+ self.port = port
+ self.is_queuing_reads = False
+ self.read_queue = []
+ self.__wbuf = StringIO()
+
+ # servers provide a ready-to-go stream
+ self.stream = stream
+ if self.stream is not None:
+ self._set_close_callback()
+
+ # not the same number of parameters as TTransportBase.open
+ def open(self, callback):
+ logging.debug('socket connecting')
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ self.stream = iostream.IOStream(sock)
+
+ def on_close_in_connect(*_):
+ message = 'could not connect to {}:{}'.format(self.host, self.port)
+ raise TTransportException(
+ type=TTransportException.NOT_OPEN,
+ message=message)
+ self.stream.set_close_callback(on_close_in_connect)
+
+ def finish(*_):
+ self._set_close_callback()
+ callback()
+
+ self.stream.connect((self.host, self.port), callback=finish)
+
+ def _set_close_callback(self):
+ def on_close():
+ raise TTransportException(
+ type=TTransportException.END_OF_FILE,
+ message='socket closed')
+ self.stream.set_close_callback(self.close)
+
+ def close(self):
+ # don't raise if we intend to close
+ self.stream.set_close_callback(None)
+ self.stream.close()
+
+ def read(self, _):
+ # The generated code for Tornado shouldn't do individual reads -- only
+ # frames at a time
+ assert "you're doing it wrong" is True
+
+ @gen.engine
+ def readFrame(self, callback):
+ self.read_queue.append(callback)
+ logging.debug('read queue: %s', self.read_queue)
+
+ if self.is_queuing_reads:
+ # If a read is already in flight, then the while loop below should
+ # pull it from self.read_queue
+ return
+
+ self.is_queuing_reads = True
+ while self.read_queue:
+ next_callback = self.read_queue.pop()
+ result = yield gen.Task(self._readFrameFromStream)
+ next_callback(result)
+ self.is_queuing_reads = False
+
+ @gen.engine
+ def _readFrameFromStream(self, callback):
+ logging.debug('_readFrameFromStream')
+ frame_header = yield gen.Task(self.stream.read_bytes, 4)
+ frame_length, = struct.unpack('!i', frame_header)
+ logging.debug('received frame header, frame length = %i', frame_length)
+ frame = yield gen.Task(self.stream.read_bytes, frame_length)
+ logging.debug('received frame payload')
+ callback(frame)
+
+ def write(self, buf):
+ self.__wbuf.write(buf)
+
+ def flush(self, callback=None):
+ wout = self.__wbuf.getvalue()
+ wsz = len(wout)
+ # reset wbuf before write/flush to preserve state on underlying failure
+ self.__wbuf = StringIO()
+ # N.B.: Doing this string concatenation is WAY cheaper than making
+ # two separate calls to the underlying socket object. Socket writes in
+ # Python turn out to be REALLY expensive, but it seems to do a pretty
+ # good job of managing string buffer operations without excessive copies
+ buf = struct.pack("!i", wsz) + wout
+
+ logging.debug('writing frame length = %i', wsz)
+ self.stream.write(buf, callback)
+
+
+class TTornadoServer(netutil.TCPServer):
+ def __init__(self, processor, iprot_factory, oprot_factory=None,
+ *args, **kwargs):
+ super(TTornadoServer, self).__init__(*args, **kwargs)
+
+ self._processor = processor
+ self._iprot_factory = iprot_factory
+ self._oprot_factory = (oprot_factory if oprot_factory is not None
+ else iprot_factory)
+
+ def handle_stream(self, stream, address):
+ try:
+ host, port = address
+ trans = TTornadoStreamTransport(host=host, port=port, stream=stream)
+ oprot = self._oprot_factory.getProtocol(trans)
+
+ def next_pass():
+ if not trans.stream.closed():
+ self._processor.process(trans, self._iprot_factory, oprot,
+ callback=next_pass)
+
+ next_pass()
+
+ except Exception:
+ logging.exception('thrift exception in handle_stream')
+ trans.close()
diff --git a/test/Makefile.am b/test/Makefile.am
index aaa497f..7ebe51c 100755
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -34,6 +34,7 @@
if WITH_PYTHON
SUBDIRS += py
SUBDIRS += py.twisted
+SUBDIRS += py.tornado
endif
if WITH_RUBY
@@ -61,6 +62,7 @@
php \
py \
py.twisted \
+ py.tornado \
rb \
threads \
AnnotationTest.thrift \
diff --git a/test/py.tornado/Makefile.am b/test/py.tornado/Makefile.am
new file mode 100644
index 0000000..a8e680a
--- /dev/null
+++ b/test/py.tornado/Makefile.am
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+THRIFT = $(top_srcdir)/compiler/cpp/thrift
+
+thrift_gen: ../ThriftTest.thrift ../SmallTest.thrift
+ $(THRIFT) --gen py:tornado ../ThriftTest.thrift
+ $(THRIFT) --gen py:tornado ../SmallTest.thrift
+
+check: thrift_gen
+ ./test_suite.py
+
+clean-local:
+ $(RM) -r gen-py.tornado
diff --git a/test/py.tornado/test_suite.py b/test/py.tornado/test_suite.py
new file mode 100755
index 0000000..f04ba04
--- /dev/null
+++ b/test/py.tornado/test_suite.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import datetime
+import glob
+import sys
+import time
+import unittest
+
+sys.path.insert(0, './gen-py.tornado')
+sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])
+
+try:
+ __import__('tornado')
+except ImportError:
+ print "module `tornado` not found, skipping test"
+ sys.exit(0)
+
+from tornado import gen, ioloop, stack_context
+from tornado.testing import AsyncTestCase, get_unused_port
+
+from thrift import TTornado
+from thrift.protocol import TBinaryProtocol
+
+from ThriftTest import ThriftTest
+from ThriftTest.ttypes import *
+
+
+class TestHandler(object):
+ def __init__(self, test_instance):
+ self.test_instance = test_instance
+
+ def testVoid(self, callback):
+ callback()
+
+ def testString(self, s, callback):
+ callback(s)
+
+ def testByte(self, b, callback):
+ callback(b)
+
+ def testI16(self, i16, callback):
+ callback(i16)
+
+ def testI32(self, i32, callback):
+ callback(i32)
+
+ def testI64(self, i64, callback):
+ callback(i64)
+
+ def testDouble(self, dub, callback):
+ callback(dub)
+
+ def testStruct(self, thing, callback):
+ callback(thing)
+
+ def testException(self, s, callback):
+ if s == 'Xception':
+ x = Xception()
+ x.errorCode = 1001
+ x.message = s
+ raise x
+ elif s == 'throw_undeclared':
+ raise ValueError("foo")
+ callback()
+
+ def testOneway(self, seconds, callback=None):
+ start = time.time()
+ def fire_oneway():
+ end = time.time()
+ self.test_instance.stop((start, end, seconds))
+
+ ioloop.IOLoop.instance().add_timeout(
+ datetime.timedelta(seconds=seconds),
+ fire_oneway)
+
+ if callback:
+ callback()
+
+ def testNest(self, thing, callback):
+ callback(thing)
+
+ def testMap(self, thing, callback):
+ callback(thing)
+
+ def testSet(self, thing, callback):
+ callback(thing)
+
+ def testList(self, thing, callback):
+ callback(thing)
+
+ def testEnum(self, thing, callback):
+ callback(thing)
+
+ def testTypedef(self, thing, callback):
+ callback(thing)
+
+
+class ThriftTestCase(AsyncTestCase):
+ def get_new_ioloop(self):
+ return ioloop.IOLoop.instance()
+
+ def setUp(self):
+ self.port = get_unused_port()
+ self.io_loop = self.get_new_ioloop()
+
+ # server
+ self.handler = TestHandler(self)
+ self.processor = ThriftTest.Processor(self.handler)
+ self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+
+ self.server = TTornado.TTornadoServer(self.processor, self.pfactory)
+ self.server.bind(self.port)
+ self.server.start(1)
+
+ # client
+ transport = TTornado.TTornadoStreamTransport('localhost', self.port)
+ pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+ self.client = ThriftTest.Client(transport, pfactory)
+ transport.open(callback=self.stop)
+ self.wait(timeout=1)
+
+ def test_void(self):
+ self.client.testVoid(callback=self.stop)
+ v = self.wait(timeout=1)
+ self.assertEquals(v, None)
+
+ def test_string(self):
+ self.client.testString('Python', callback=self.stop)
+ v = self.wait(timeout=1)
+ self.assertEquals(v, 'Python')
+
+ def test_byte(self):
+ self.client.testByte(63, callback=self.stop)
+ v = self.wait(timeout=1)
+ self.assertEquals(v, 63)
+
+ def test_i32(self):
+ self.client.testI32(-1, callback=self.stop)
+ v = self.wait(timeout=1)
+ self.assertEquals(v, -1)
+
+ self.client.testI32(0, callback=self.stop)
+ v = self.wait(timeout=1)
+ self.assertEquals(v, 0)
+
+ def test_i64(self):
+ self.client.testI64(-34359738368, callback=self.stop)
+ v = self.wait(timeout=1)
+ self.assertEquals(v, -34359738368)
+
+ def test_double(self):
+ self.client.testDouble(-5.235098235, callback=self.stop)
+ v = self.wait(timeout=1)
+ self.assertEquals(v, -5.235098235)
+
+ def test_struct(self):
+ x = Xtruct()
+ x.string_thing = "Zero"
+ x.byte_thing = 1
+ x.i32_thing = -3
+ x.i64_thing = -5
+ self.client.testStruct(x, callback=self.stop)
+
+ y = self.wait(timeout=1)
+ self.assertEquals(y.string_thing, "Zero")
+ self.assertEquals(y.byte_thing, 1)
+ self.assertEquals(y.i32_thing, -3)
+ self.assertEquals(y.i64_thing, -5)
+
+ def test_exception(self):
+ self.client.testException('Safe', callback=self.stop)
+ v = self.wait(timeout=1)
+
+ self.client.testException('Xception', callback=self.stop)
+ ex = self.wait(timeout=1)
+ if type(ex) == Xception:
+ self.assertEquals(ex.errorCode, 1001)
+ self.assertEquals(ex.message, 'Xception')
+ else:
+ self.fail("should have gotten exception")
+
+ def test_oneway(self):
+ def return_from_send():
+ self.stop('done with send')
+ self.client.testOneway(0.5, callback=return_from_send)
+ self.assertEquals(self.wait(timeout=1), 'done with send')
+
+ start, end, seconds = self.wait(timeout=1)
+ self.assertAlmostEquals(seconds, (end - start), places=3)
+
+
+def suite():
+ suite = unittest.TestSuite()
+ loader = unittest.TestLoader()
+ suite.addTest(loader.loadTestsFromTestCase(ThriftTestCase))
+ return suite
+
+
+if __name__ == '__main__':
+ unittest.TestProgram(defaultTest='suite',
+ testRunner=unittest.TextTestRunner(verbosity=1))
diff --git a/tutorial/Makefile.am b/tutorial/Makefile.am
index 169a2c1..86c08c0 100755
--- a/tutorial/Makefile.am
+++ b/tutorial/Makefile.am
@@ -17,7 +17,7 @@
# under the License.
#
-SUBDIRS =
+SUBDIRS =
if MINGW
# do nothing, just build the compiler
@@ -43,6 +43,7 @@
if WITH_PYTHON
SUBDIRS += py
SUBDIRS += py.twisted
+SUBDIRS += py.tornado
endif
if WITH_RUBY
diff --git a/tutorial/py.tornado/Makefile.am b/tutorial/py.tornado/Makefile.am
new file mode 100755
index 0000000..6ac6023
--- /dev/null
+++ b/tutorial/py.tornado/Makefile.am
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+THRIFT = $(top_builddir)/compiler/cpp/thrift
+
+gen-py.tornado/tutorial/Calculator.py gen-py.tornado/shared/SharedService.py: $(top_srcdir)/tutorial/tutorial.thrift
+ $(THRIFT) --gen py:tornado -r $<
+
+all-local: gen-py.tornado/tutorial/Calculator.py
+
+tutorialserver: all
+ ${PYTHON} PythonServer.py
+
+tutorialclient: all
+ ${PYTHON} PythonClient.py
+
+clean-local:
+ $(RM) -r gen-*
+
+EXTRA_DIST = \
+ PythonServer.py \
+ PythonClient.py
diff --git a/tutorial/py.tornado/PythonClient.py b/tutorial/py.tornado/PythonClient.py
new file mode 100755
index 0000000..95d78b8
--- /dev/null
+++ b/tutorial/py.tornado/PythonClient.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import glob
+sys.path.append('gen-py.tornado')
+sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])
+
+import logging
+
+from tutorial import Calculator
+from tutorial.ttypes import Operation, Work, InvalidOperation
+
+from thrift import TTornado
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+
+from tornado import gen
+from tornado import ioloop
+
+
+@gen.engine
+def communicate(callback=None):
+ # create client
+ transport = TTornado.TTornadoStreamTransport('localhost', 9090)
+ pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+ client = Calculator.Client(transport, pfactory)
+
+ # open the transport, bail on error
+ try:
+ yield gen.Task(transport.open)
+ except TTransport.TTransportException as ex:
+ logging.error(ex)
+ if callback:
+ callback()
+ return
+
+ # ping
+ yield gen.Task(client.ping)
+ print "ping()"
+
+ # add
+ sum_ = yield gen.Task(client.add, 1, 1)
+ print "1 + 1 = {}".format(sum_)
+
+ # make a oneway call without a callback (schedule the write and continue
+ # without blocking)
+ client.zip()
+ print "zip() without callback"
+
+ # make a oneway call with a callback (we'll wait for the stream write to
+ # complete before continuing)
+ yield gen.Task(client.zip)
+ print "zip() with callback"
+
+ # calculate 1/0
+ work = Work()
+ work.op = Operation.DIVIDE
+ work.num1 = 1
+ work.num2 = 0
+
+ try:
+ quotient = yield gen.Task(client.calculate, 1, work)
+ print "Whoa? You know how to divide by zero?"
+ except InvalidOperation as io:
+ print "InvalidOperation: {}".format(io)
+
+ # calculate 15-10
+ work.op = Operation.SUBTRACT
+ work.num1 = 15
+ work.num2 = 10
+
+ diff = yield gen.Task(client.calculate, 1, work)
+ print "15 - 10 = {}".format(diff)
+
+ # getStruct
+ log = yield gen.Task(client.getStruct, 1)
+ print "Check log: {}".format(log.value)
+
+ # close the transport
+ client._transport.close()
+
+ if callback:
+ callback()
+
+
+def main():
+ # create an ioloop, do the above, then stop
+ io_loop = ioloop.IOLoop.instance()
+ def this_joint():
+ communicate(callback=io_loop.stop)
+ io_loop.add_callback(this_joint)
+ io_loop.start()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tutorial/py.tornado/PythonServer.py b/tutorial/py.tornado/PythonServer.py
new file mode 100755
index 0000000..52932ff
--- /dev/null
+++ b/tutorial/py.tornado/PythonServer.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import glob
+sys.path.append('gen-py.tornado')
+sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0])
+
+from tutorial import Calculator
+from tutorial.ttypes import Operation, InvalidOperation
+
+from shared.ttypes import SharedStruct
+
+from thrift import TTornado
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+from tornado import ioloop
+
+
+class CalculatorHandler(object):
+ def __init__(self):
+ self.log = {}
+
+ def ping(self, callback):
+ print "ping()"
+ callback()
+
+ def add(self, n1, n2, callback):
+ print "add({}, {})".format(n1, n2)
+ callback(n1 + n2)
+
+ def calculate(self, logid, work, callback):
+ print "calculate({}, {})".format(logid, work)
+
+ if work.op == Operation.ADD:
+ val = work.num1 + work.num2
+ elif work.op == Operation.SUBTRACT:
+ val = work.num1 - work.num2
+ elif work.op == Operation.MULTIPLY:
+ val = work.num1 * work.num2
+ elif work.op == Operation.DIVIDE:
+ if work.num2 == 0:
+ x = InvalidOperation()
+ x.what = work.op
+ x.why = "Cannot divide by 0"
+ raise x
+ val = work.num1 / work.num2
+ else:
+ x = InvalidOperation()
+ x.what = work.op
+ x.why = "Invalid operation"
+ raise x
+
+ log = SharedStruct()
+ log.key = logid
+ log.value = '%d' % (val)
+ self.log[logid] = log
+ callback(val)
+
+ def getStruct(self, key, callback):
+ print "getStruct({})".format(key)
+ callback(self.log[key])
+
+ def zip(self, callback):
+ print "zip()"
+ callback()
+
+
+def main():
+ handler = CalculatorHandler()
+ processor = Calculator.Processor(handler)
+ pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+ server = TTornado.TTornadoServer(processor, pfactory)
+
+ print "Starting the server..."
+ server.bind(9090)
+ server.start(1)
+ ioloop.IOLoop.instance().start()
+ print "done."
+
+
+if __name__ == "__main__":
+ main()