Thrift Python server code generation
Summary: Yep, it's up and running. We now have full client/server support in all of C++ Java PHP and Python. Well, not quite... there's no PHP server, but honestly who wants one? Actually, if we do want one the framework will support writing is as a PHP file that can be served in apache like a web service (i.e. restserver.php would be thriftserver.php). But now that's rambling and nothing to do with this commit.
Notes: cheever, let's chat about porting your multithreaded Pillar Python server over to Thrift
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664783 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc
index 268b9d5..059e6b0 100644
--- a/compiler/cpp/src/generate/t_java_generator.cc
+++ b/compiler/cpp/src/generate/t_java_generator.cc
@@ -547,8 +547,7 @@
indent() << "TMessage _msg = _iprot.readMessageBegin(_itrans);" << endl <<
indent() << resultname << " __result = new " << resultname << "();" << endl <<
indent() << "__result.read(_iprot, _itrans);" << endl <<
- indent() << "_iprot.readMessageEnd(_itrans);" << endl <<
- endl;
+ indent() << "_iprot.readMessageEnd(_itrans);" << endl;
// Careful, only return _result if not a void function
if (!(*f_iter)->get_returntype()->is_void()) {
@@ -734,8 +733,7 @@
// Declare result for non async function
if (!tfunction->is_async()) {
f_service_ <<
- indent() << resultname << " __result = new " << resultname << "();" << endl <<
- endl;
+ indent() << resultname << " __result = new " << resultname << "();" << endl;
}
// Try block for a function with exceptions
@@ -803,7 +801,6 @@
}
f_service_ <<
- endl <<
indent() << "_oprot.writeMessageBegin(_otrans, new TMessage(\"" << tfunction->get_name() << "\", TMessageType.REPLY, seqid));" << endl <<
indent() << "__result.write(_oprot, _otrans);" << endl <<
indent() << "_oprot.writeMessageEnd(_otrans);" << endl <<
diff --git a/compiler/cpp/src/generate/t_py_generator.cc b/compiler/cpp/src/generate/t_py_generator.cc
index c53fe42..53e5b3b 100644
--- a/compiler/cpp/src/generate/t_py_generator.cc
+++ b/compiler/cpp/src/generate/t_py_generator.cc
@@ -331,13 +331,15 @@
py_imports() << endl;
f_service_ <<
- "from " << program_name_ << "_types import *" << endl << endl;
+ "from " << program_name_ << "_types import *" << endl <<
+ "from thrift.Thrift import TProcessor" << endl <<
+ endl;
// Generate the three main parts of the service (well, two for now in PHP)
generate_service_interface(tservice);
generate_service_client(tservice);
generate_service_helpers(tservice);
- // generate_service_server(tservice);
+ generate_service_server(tservice);
// Close service file
f_service_ << endl;
@@ -558,6 +560,177 @@
}
/**
+ * Generates a service server definition.
+ *
+ * @param tservice The service to generate a server for.
+ */
+void t_py_generator::generate_service_server(t_service* tservice) {
+ // Generate the dispatch methods
+ vector<t_function*> functions = tservice->get_functions();
+ vector<t_function*>::iterator f_iter;
+
+ // Generate the header portion
+ f_service_ <<
+ "class Server(Iface, TProcessor):" << endl;
+ indent_up();
+
+ indent(f_service_) <<
+ "def __init__(self, handler, iprot, oprot=None):" << endl;
+ indent_up();
+ f_service_ <<
+ indent() << "self.__handler = handler" << endl <<
+ indent() << "self.__iprot = iprot" << endl <<
+ indent() << "if oprot == None:" << endl <<
+ indent() << " self.__oprot = iprot" << endl <<
+ indent() << "else:" << endl <<
+ indent() << " self.__oprot = oprot" << endl;
+ indent_down();
+ f_service_ << endl;
+
+ // Generate the server implementation
+ indent(f_service_) <<
+ "def process(self, itrans, otrans):" << endl;
+ indent_up();
+
+ f_service_ <<
+ indent() << "(name, type, seqid) = self.__iprot.readMessageBegin(itrans)" << endl;
+
+ // TODO(mcslee): validate message
+
+ bool first = true;
+ for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+ if (!first) {
+ f_service_ << indent() << "el";
+ } else {
+ f_service_ << indent();
+ first = false;
+ }
+ f_service_ <<
+ "if name == \"" << (*f_iter)->get_name() << "\":" << endl;
+ indent_up();
+ indent(f_service_) <<
+ "self.process_" << (*f_iter)->get_name() << "(seqid, itrans, otrans)" << endl;
+ indent_down();
+ }
+ f_service_ <<
+ indent() << "else:" << endl <<
+ indent() << " print 'Unknown function %s' % (name)" << endl;
+ f_service_ << endl;
+
+ // Read end of args field, the T_STOP, and the struct close
+ f_service_ <<
+ indent() << "return True" << endl;
+
+ indent_down();
+ f_service_ << endl;
+
+ // Generate the process subfunctions
+ for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+ generate_process_function(tservice, *f_iter);
+ }
+
+ indent_down();
+ f_service_ << endl;
+}
+
+/**
+ * Generates a process function definition.
+ *
+ * @param tfunction The function to write a dispatcher for
+ */
+void t_py_generator::generate_process_function(t_service* tservice,
+ t_function* tfunction) {
+ // Open function
+ indent(f_service_) <<
+ "def process_" << tfunction->get_name() <<
+ "(self, seqid, itrans, otrans):" << endl;
+ indent_up();
+
+ string argsname = tfunction->get_name() + "_args";
+ string resultname = tfunction->get_name() + "_result";
+
+ f_service_ <<
+ indent() << "__args = " << argsname << "()" << endl <<
+ indent() << "__args.read(self.__iprot, itrans)" << endl <<
+ indent() << "self.__iprot.readMessageEnd(itrans)" << endl;
+
+ t_struct* xs = tfunction->get_xceptions();
+ const std::vector<t_field*>& xceptions = xs->get_members();
+ vector<t_field*>::const_iterator x_iter;
+
+ // Declare result for non async function
+ if (!tfunction->is_async()) {
+ f_service_ <<
+ indent() << "__result = " << resultname << "()" << endl;
+ }
+
+ // Try block for a function with exceptions
+ if (xceptions.size() > 0) {
+ f_service_ <<
+ indent() << "try:" << endl;
+ indent_up();
+ }
+
+ // Generate the function call
+ t_struct* arg_struct = tfunction->get_arglist();
+ const std::vector<t_field*>& fields = arg_struct->get_members();
+ vector<t_field*>::const_iterator f_iter;
+
+ f_service_ << indent();
+ if (!tfunction->is_async() && !tfunction->get_returntype()->is_void()) {
+ f_service_ << "__result.success = ";
+ }
+ f_service_ <<
+ "self.__handler." << tfunction->get_name() << "(";
+ bool first = true;
+ for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+ if (first) {
+ first = false;
+ } else {
+ f_service_ << ", ";
+ }
+ f_service_ << "__args." << (*f_iter)->get_name();
+ }
+ f_service_ << ")" << endl;
+
+ if (!tfunction->is_async() && xceptions.size() > 0) {
+ indent_down();
+ for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+ f_service_ <<
+ indent() << "except " << (*x_iter)->get_type()->get_name() << ", " << (*x_iter)->get_name() << ":" << endl;
+ if (!tfunction->is_async()) {
+ indent_up();
+ f_service_ <<
+ indent() << "__result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << endl;
+ indent_down();
+ } else {
+ f_service_ <<
+ indent() << "pass" << endl;
+ }
+ }
+ }
+
+ // Shortcut out here for async functions
+ if (tfunction->is_async()) {
+ f_service_ <<
+ indent() << "return" << endl;
+ indent_down();
+ f_service_ << endl;
+ return;
+ }
+
+ f_service_ <<
+ indent() << "self.__oprot.writeMessageBegin(otrans, \"" << tfunction->get_name() << "\", TMessageType.REPLY, seqid)" << endl <<
+ indent() << "__result.write(self.__oprot, otrans)" << endl <<
+ indent() << "self.__oprot.writeMessageEnd(otrans)" << endl <<
+ indent() << "otrans.flush()" << endl;
+
+ // Close function
+ indent_down();
+ f_service_ << endl;
+}
+
+/**
* Deserializes a field of any type.
*/
void t_py_generator::generate_deserialize_field(ofstream &out,
diff --git a/compiler/cpp/src/generate/t_py_generator.h b/compiler/cpp/src/generate/t_py_generator.h
index 2703df3..f68a121 100644
--- a/compiler/cpp/src/generate/t_py_generator.h
+++ b/compiler/cpp/src/generate/t_py_generator.h
@@ -48,6 +48,9 @@
void generate_service_interface (t_service* tservice);
void generate_service_client (t_service* tservice);
+ void generate_service_server (t_service* tservice);
+ void generate_process_function (t_service* tservice, t_function* tfunction);
+
/** Serialization constructs */
void generate_deserialize_field (std::ofstream &out,
diff --git a/lib/py/setup.py b/lib/py/setup.py
index 1d5d023..ea6ddaa 100644
--- a/lib/py/setup.py
+++ b/lib/py/setup.py
@@ -6,7 +6,7 @@
author = ['Mark Slee'],
author_email = ['mcslee@facebook.com'],
url = 'http://code.facebook.com/thrift',
- packages = ['thrift', 'thrift.protocol', 'thrift.transport'],
+ packages = ['thrift', 'thrift.protocol', 'thrift.transport', 'thrift.server'],
package_dir = {'thrift' : 'src'},
)
diff --git a/lib/py/src/Thrift.py b/lib/py/src/Thrift.py
index e69de29..0c4a458 100644
--- a/lib/py/src/Thrift.py
+++ b/lib/py/src/Thrift.py
@@ -0,0 +1,6 @@
+class TProcessor:
+
+ """Base class for procsessor, which works on two streams."""
+
+ def process(itrans, otrans):
+ pass
diff --git a/lib/py/src/protocol/TBinaryProtocol.py b/lib/py/src/protocol/TBinaryProtocol.py
index c089ac5..860f461 100644
--- a/lib/py/src/protocol/TBinaryProtocol.py
+++ b/lib/py/src/protocol/TBinaryProtocol.py
@@ -70,7 +70,7 @@
otrans.write(buff)
def writeI64(self, otrans, i64):
- buff = pack("!l", i64)
+ buff = pack("!q", i64)
otrans.write(buff)
def writeString(self, otrans, str):
@@ -150,7 +150,7 @@
def readI64(self, itrans):
buff = itrans.readAll(8)
- val, = unpack('!l', buff)
+ val, = unpack('!q', buff)
return val
def readString(self, itrans):
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
new file mode 100644
index 0000000..69be260
--- /dev/null
+++ b/lib/py/src/server/TServer.py
@@ -0,0 +1,32 @@
+from thrift.Thrift import TProcessor
+from thrift.transport import TTransport
+
+class TServer:
+
+ """Base interface for a server, which must have a run method."""
+
+ def __init__(self, proc):
+ self.processor = proc
+
+ def run(self):
+ pass
+
+class TSimpleServer(TServer):
+
+ """Simple single-threaded server that just pumps around one transport."""
+
+ def __init__(self, proc, trans):
+ TServer.__init__(self, proc)
+ self.transport = trans
+
+ def run(self):
+ self.transport.listen()
+ while True:
+ client = TTransport.TBufferedTransport(self.transport.accept())
+ try:
+ while True:
+ self.processor.process(client, client)
+ except Exception, x:
+ print x
+ print 'Client died.'
+ client.close()
diff --git a/lib/py/src/server/__init__.py b/lib/py/src/server/__init__.py
new file mode 100644
index 0000000..f7e08be
--- /dev/null
+++ b/lib/py/src/server/__init__.py
@@ -0,0 +1 @@
+__all__ = ["TServer"]
diff --git a/lib/py/src/transport/TSocket.py b/lib/py/src/transport/TSocket.py
index 52c6118..4ef35d9 100644
--- a/lib/py/src/transport/TSocket.py
+++ b/lib/py/src/transport/TSocket.py
@@ -5,15 +5,14 @@
"""Socket implementation of TTransport base."""
- handle = None
- host = "localhost"
- port = 9090
-
- def __init__(self, host, port):
+ def __init__(self, host='localhost', port=9090):
self.host = host
self.port = port
self.handle = None
+ def set_handle(self, h):
+ self.handle = h
+
def isOpen(self):
return handle != None
@@ -36,10 +35,42 @@
def read(self, sz):
buff = self.handle.recv(sz)
+ if len(buff) == 0:
+ raise Exception('TScket read 0 bytes')
return buff
def write(self, buff):
- self.handle.sendall(buff)
+ sent = 0
+ have = len(buff)
+ while sent < have:
+ plus = self.handle.send(buff)
+ if plus == 0:
+ raise Exception('sent 0 bytes')
+ sent += plus
+ buff = buff[plus:]
def flush(self):
pass
+
+class TServerSocket(TServerTransportBase):
+
+ """Socket implementation of TServerTransport base."""
+
+ def __init__(self, port):
+ self.port = port
+ self.handle = None
+
+ def listen(self):
+ self.handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.handle.bind(('', self.port))
+ self.handle.listen(128)
+
+ def accept(self):
+ (client, addr) = self.handle.accept()
+ result = TSocket()
+ result.set_handle(client)
+ return result
+
+ def close(self):
+ self.handle.close()
+ self.handle = None
diff --git a/lib/py/src/transport/TTransport.py b/lib/py/src/transport/TTransport.py
index e9e36d7..1e8b6c6 100644
--- a/lib/py/src/transport/TTransport.py
+++ b/lib/py/src/transport/TTransport.py
@@ -20,6 +20,48 @@
def write(self, buf):
pass
- def flush():
+ def flush(self):
pass
+class TServerTransportBase:
+
+ """Base class for Thrift server transports."""
+
+ def listen(self):
+ pass
+
+ def accept(self):
+ pass
+
+ def close(self):
+ pass
+
+class TBufferedTransport(TTransportBase):
+
+ """Class that wraps another transport and buffers its I/O."""
+
+ def __init__(self, trans):
+ self.__trans = trans
+ self.__buf = ''
+
+ def isOpen(self):
+ return self.__trans.isOpen()
+
+ def open(self):
+ return self.__trans.open()
+
+ def close(self):
+ return self.__trans.close()
+
+ def read(self, sz):
+ return self.__trans.read(sz)
+
+ def readAll(self, sz):
+ return self.__trans.readAll(sz)
+
+ def write(self, buf):
+ self.__buf += buf
+
+ def flush(self):
+ self.__trans.write(self.__buf)
+ self.__buf = ''
diff --git a/test/py/TestClient.py b/test/py/TestClient.py
index a64b8d7..21d1990 100755
--- a/test/py/TestClient.py
+++ b/test/py/TestClient.py
@@ -5,31 +5,56 @@
import ThriftTest
from ThriftTest_types import *
+from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.protocol import TBinaryProtocol
-transport = TSocket.TSocket('localhost', 9090)
+import timing
+
+transport = TTransport.TBufferedTransport(TSocket.TSocket('localhost', 9090))
protocol = TBinaryProtocol.TBinaryProtocol()
client = ThriftTest.Client(transport, protocol)
transport.open()
+# Start debug timing
+timing.start()
+
print "testVoid()"
print client.testVoid()
-print "testString('PythonTest')"
-print client.testString('PythonTest')
+print "testString('Python')"
+print client.testString('Python')
print "testByte(63)"
print client.testByte(63)
+print "testI32(-1)"
+print client.testI32(-1)
+
+print "testI64(-34359738368)"
+print client.testI64(-34359738368)
+
+print "testStruct({Zero, 1, -3, -5})"
+x = Xtruct()
+x.string_thing = "Zero"
+x.byte_thing = 1
+x.i32_thing = -3
+x.i64_thing = -5
+x = client.testStruct(x)
+print "{%s, %d, %d, %d}" % (x.string_thing, x.byte_thing, x.i32_thing, x.i64_thing)
+
print "testException('Safe')"
print client.testException('Safe')
-print "textException('Xception')"
try:
+ print "textException('Xception')"
print client.testException('Xception')
+
except Xception, x:
- print 'Xception (%d, %s)' % (x.errorCode, x.message)
+ print "Xception (%d, %s)" % (x.errorCode, x.message)
+
+timing.finish()
+print "Total time: %d microsecs" % timing.micro()
transport.close()
diff --git a/test/py/TestServer.py b/test/py/TestServer.py
new file mode 100755
index 0000000..4b571c7
--- /dev/null
+++ b/test/py/TestServer.py
@@ -0,0 +1,37 @@
+#!/usr/bin/python
+
+import sys
+sys.path.append('./gen-py')
+
+import ThriftTest
+from ThriftTest_types import *
+from thrift.transport import TSocket
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+class TestHandler:
+
+ def testVoid(self):
+ print 'testVoid()'
+
+ def testString(self, str):
+ print 'testString(%s)' % str
+ return str
+
+ def testByte(self, byte):
+ print 'testByte(%d)' % byte
+ return byte
+
+ def testException(self, str):
+ print 'testException(%s)' % str
+ x = Xception()
+ x.errorCode = 1001
+ x.message = str
+ raise x
+
+transport = TSocket.TServerSocket(9090)
+protocol = TBinaryProtocol.TBinaryProtocol()
+handler = TestHandler()
+iface = ThriftTest.Server(handler, protocol)
+server = TServer.TSimpleServer(iface, transport)
+server.run()