Thrift Python server code generation

Summary: Yep, it's up and running. We now have full client/server support in all of C++ Java PHP and Python. Well, not quite... there's no PHP server, but honestly who wants one? Actually, if we do want one the framework will support writing is as a PHP file that can be served in apache like a web service (i.e. restserver.php would be thriftserver.php). But now that's rambling and nothing to do with this commit.

Notes: cheever, let's chat about porting your multithreaded Pillar Python server over to Thrift


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664783 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc
index 268b9d5..059e6b0 100644
--- a/compiler/cpp/src/generate/t_java_generator.cc
+++ b/compiler/cpp/src/generate/t_java_generator.cc
@@ -547,8 +547,7 @@
         indent() << "TMessage _msg = _iprot.readMessageBegin(_itrans);" << endl <<
         indent() << resultname << " __result = new " << resultname << "();" << endl <<
         indent() << "__result.read(_iprot, _itrans);" << endl <<
-        indent() << "_iprot.readMessageEnd(_itrans);" << endl <<
-        endl;
+        indent() << "_iprot.readMessageEnd(_itrans);" << endl;
 
       // Careful, only return _result if not a void function
       if (!(*f_iter)->get_returntype()->is_void()) {
@@ -734,8 +733,7 @@
   // Declare result for non async function
   if (!tfunction->is_async()) {
     f_service_ <<
-      indent() << resultname << " __result = new " << resultname << "();" << endl <<
-      endl;
+      indent() << resultname << " __result = new " << resultname << "();" << endl;
   }
 
   // Try block for a function with exceptions
@@ -803,7 +801,6 @@
   }
 
   f_service_ <<
-    endl <<
     indent() << "_oprot.writeMessageBegin(_otrans, new TMessage(\"" << tfunction->get_name() << "\", TMessageType.REPLY, seqid));" << endl <<
     indent() << "__result.write(_oprot, _otrans);" << endl <<
     indent() << "_oprot.writeMessageEnd(_otrans);" << endl <<
diff --git a/compiler/cpp/src/generate/t_py_generator.cc b/compiler/cpp/src/generate/t_py_generator.cc
index c53fe42..53e5b3b 100644
--- a/compiler/cpp/src/generate/t_py_generator.cc
+++ b/compiler/cpp/src/generate/t_py_generator.cc
@@ -331,13 +331,15 @@
     py_imports() << endl;
 
   f_service_ <<
-    "from " << program_name_ << "_types import *" << endl << endl;
+    "from " << program_name_ << "_types import *" << endl << 
+    "from thrift.Thrift import TProcessor" << endl <<
+    endl;
 
   // Generate the three main parts of the service (well, two for now in PHP)
   generate_service_interface(tservice);
   generate_service_client(tservice);
   generate_service_helpers(tservice);
-  // generate_service_server(tservice);
+  generate_service_server(tservice);
   
   // Close service file
   f_service_ << endl;
@@ -558,6 +560,177 @@
 }
 
 /**
+ * Generates a service server definition.
+ *
+ * @param tservice The service to generate a server for.
+ */
+void t_py_generator::generate_service_server(t_service* tservice) {
+  // Generate the dispatch methods
+  vector<t_function*> functions = tservice->get_functions();
+  vector<t_function*>::iterator f_iter; 
+
+  // Generate the header portion
+  f_service_ <<
+    "class Server(Iface, TProcessor):" << endl;
+  indent_up();
+
+  indent(f_service_) <<
+    "def __init__(self, handler, iprot, oprot=None):" << endl;
+  indent_up();
+  f_service_ <<
+    indent() << "self.__handler = handler" << endl <<
+    indent() << "self.__iprot = iprot" << endl <<
+    indent() << "if oprot == None:" << endl <<
+    indent() << "  self.__oprot = iprot" << endl <<
+    indent() << "else:" << endl <<
+    indent() << "  self.__oprot = oprot" << endl;
+  indent_down();
+  f_service_ << endl;
+ 
+  // Generate the server implementation
+  indent(f_service_) <<
+    "def process(self, itrans, otrans):" << endl;
+  indent_up();
+
+  f_service_ <<
+    indent() << "(name, type, seqid)  = self.__iprot.readMessageBegin(itrans)" << endl;
+
+  // TODO(mcslee): validate message
+
+  bool first = true;
+  for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+    if (!first) {
+      f_service_ << indent() << "el";
+    } else {
+      f_service_ << indent();
+      first = false;
+    }
+    f_service_ <<
+      "if name == \"" << (*f_iter)->get_name() << "\":" << endl;
+    indent_up();
+    indent(f_service_) <<
+      "self.process_" << (*f_iter)->get_name() << "(seqid, itrans, otrans)" << endl;
+    indent_down();
+  }
+  f_service_ <<
+    indent() << "else:" << endl <<
+    indent() << "  print 'Unknown function %s' % (name)" << endl;
+  f_service_ << endl;
+  
+  // Read end of args field, the T_STOP, and the struct close
+  f_service_ <<
+    indent() << "return True" << endl;
+
+  indent_down();
+  f_service_ << endl;
+
+  // Generate the process subfunctions
+  for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+    generate_process_function(tservice, *f_iter);
+  }
+
+  indent_down();
+  f_service_ << endl;
+}
+
+/**
+ * Generates a process function definition.
+ *
+ * @param tfunction The function to write a dispatcher for
+ */
+void t_py_generator::generate_process_function(t_service* tservice,
+                                               t_function* tfunction) {
+  // Open function
+  indent(f_service_) <<
+    "def process_" << tfunction->get_name() <<
+    "(self, seqid, itrans, otrans):" << endl;
+  indent_up();
+
+  string argsname = tfunction->get_name() + "_args";
+  string resultname = tfunction->get_name() + "_result";
+
+  f_service_ <<
+    indent() << "__args = " << argsname << "()" << endl <<
+    indent() << "__args.read(self.__iprot, itrans)" << endl <<
+    indent() << "self.__iprot.readMessageEnd(itrans)" << endl;
+
+  t_struct* xs = tfunction->get_xceptions();
+  const std::vector<t_field*>& xceptions = xs->get_members();
+  vector<t_field*>::const_iterator x_iter;
+
+  // Declare result for non async function
+  if (!tfunction->is_async()) {
+    f_service_ <<
+      indent() << "__result = " << resultname << "()" << endl;
+  }
+
+  // Try block for a function with exceptions
+  if (xceptions.size() > 0) {
+    f_service_ <<
+      indent() << "try:" << endl;
+    indent_up();
+  }
+ 
+  // Generate the function call
+  t_struct* arg_struct = tfunction->get_arglist();
+  const std::vector<t_field*>& fields = arg_struct->get_members();
+  vector<t_field*>::const_iterator f_iter;
+
+  f_service_ << indent();
+  if (!tfunction->is_async() && !tfunction->get_returntype()->is_void()) {
+    f_service_ << "__result.success = ";
+  }
+  f_service_ <<
+    "self.__handler." << tfunction->get_name() << "(";
+  bool first = true;
+  for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+    if (first) {
+      first = false;
+    } else {
+      f_service_ << ", ";
+    }
+    f_service_ << "__args." << (*f_iter)->get_name();
+  }
+  f_service_ << ")" << endl;
+
+  if (!tfunction->is_async() && xceptions.size() > 0) {
+    indent_down();
+    for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) {
+      f_service_ <<
+        indent() << "except " << (*x_iter)->get_type()->get_name() << ", " << (*x_iter)->get_name() << ":" << endl;
+      if (!tfunction->is_async()) {
+        indent_up();
+        f_service_ <<
+          indent() << "__result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << endl;
+        indent_down();
+      } else {
+        f_service_ <<
+          indent() << "pass" << endl;
+      }
+    }
+  }
+
+  // Shortcut out here for async functions
+  if (tfunction->is_async()) {
+    f_service_ <<
+      indent() << "return" << endl;
+    indent_down();
+    f_service_ << endl;
+    return;
+  }
+
+  f_service_ <<
+    indent() << "self.__oprot.writeMessageBegin(otrans, \"" << tfunction->get_name() << "\", TMessageType.REPLY, seqid)" << endl <<
+    indent() << "__result.write(self.__oprot, otrans)" << endl <<
+    indent() << "self.__oprot.writeMessageEnd(otrans)" << endl <<
+    indent() << "otrans.flush()" << endl;
+
+  // Close function
+  indent_down();
+  f_service_ << endl;
+}
+
+/**
  * Deserializes a field of any type.
  */
 void t_py_generator::generate_deserialize_field(ofstream &out,
diff --git a/compiler/cpp/src/generate/t_py_generator.h b/compiler/cpp/src/generate/t_py_generator.h
index 2703df3..f68a121 100644
--- a/compiler/cpp/src/generate/t_py_generator.h
+++ b/compiler/cpp/src/generate/t_py_generator.h
@@ -48,6 +48,9 @@
   void generate_service_interface (t_service* tservice);
   void generate_service_client    (t_service* tservice);
 
+  void generate_service_server    (t_service* tservice);
+  void generate_process_function  (t_service* tservice, t_function* tfunction);
+
   /** Serialization constructs */
 
   void generate_deserialize_field        (std::ofstream &out,
diff --git a/lib/py/setup.py b/lib/py/setup.py
index 1d5d023..ea6ddaa 100644
--- a/lib/py/setup.py
+++ b/lib/py/setup.py
@@ -6,7 +6,7 @@
       author = ['Mark Slee'],
       author_email = ['mcslee@facebook.com'],
       url = 'http://code.facebook.com/thrift',
-      packages = ['thrift', 'thrift.protocol', 'thrift.transport'],
+      packages = ['thrift', 'thrift.protocol', 'thrift.transport', 'thrift.server'],
       package_dir = {'thrift' : 'src'},
       )
 
diff --git a/lib/py/src/Thrift.py b/lib/py/src/Thrift.py
index e69de29..0c4a458 100644
--- a/lib/py/src/Thrift.py
+++ b/lib/py/src/Thrift.py
@@ -0,0 +1,6 @@
+class TProcessor:
+
+  """Base class for procsessor, which works on two streams."""
+
+  def process(itrans, otrans):
+    pass
diff --git a/lib/py/src/protocol/TBinaryProtocol.py b/lib/py/src/protocol/TBinaryProtocol.py
index c089ac5..860f461 100644
--- a/lib/py/src/protocol/TBinaryProtocol.py
+++ b/lib/py/src/protocol/TBinaryProtocol.py
@@ -70,7 +70,7 @@
     otrans.write(buff)
     
   def writeI64(self, otrans, i64):
-    buff = pack("!l", i64)
+    buff = pack("!q", i64)
     otrans.write(buff)
 
   def writeString(self, otrans, str):
@@ -150,7 +150,7 @@
 
   def readI64(self, itrans):
     buff = itrans.readAll(8)
-    val, = unpack('!l', buff)
+    val, = unpack('!q', buff)
     return val
 
   def readString(self, itrans):
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
new file mode 100644
index 0000000..69be260
--- /dev/null
+++ b/lib/py/src/server/TServer.py
@@ -0,0 +1,32 @@
+from thrift.Thrift import TProcessor
+from thrift.transport import TTransport
+
+class TServer:
+
+  """Base interface for a server, which must have a run method."""
+
+  def __init__(self, proc):
+    self.processor = proc
+
+  def run(self):
+    pass
+
+class TSimpleServer(TServer):
+
+  """Simple single-threaded server that just pumps around one transport."""
+
+  def __init__(self, proc, trans):
+    TServer.__init__(self, proc)
+    self.transport = trans
+
+  def run(self):
+    self.transport.listen()
+    while True:
+      client = TTransport.TBufferedTransport(self.transport.accept())
+      try:
+        while True:
+          self.processor.process(client, client)
+      except Exception, x:
+        print x
+        print 'Client died.'
+      client.close()
diff --git a/lib/py/src/server/__init__.py b/lib/py/src/server/__init__.py
new file mode 100644
index 0000000..f7e08be
--- /dev/null
+++ b/lib/py/src/server/__init__.py
@@ -0,0 +1 @@
+__all__ = ["TServer"]
diff --git a/lib/py/src/transport/TSocket.py b/lib/py/src/transport/TSocket.py
index 52c6118..4ef35d9 100644
--- a/lib/py/src/transport/TSocket.py
+++ b/lib/py/src/transport/TSocket.py
@@ -5,15 +5,14 @@
 
   """Socket implementation of TTransport base."""
 
-  handle = None
-  host = "localhost"
-  port = 9090
-
-  def __init__(self, host, port):
+  def __init__(self, host='localhost', port=9090):
     self.host = host
     self.port = port
     self.handle = None
 
+  def set_handle(self, h):
+    self.handle = h
+
   def isOpen(self):
     return handle != None
 
@@ -36,10 +35,42 @@
 
   def read(self, sz):
     buff = self.handle.recv(sz)
+    if len(buff) == 0:
+      raise Exception('TScket read 0 bytes')
     return buff
 
   def write(self, buff):
-    self.handle.sendall(buff)
+    sent = 0
+    have = len(buff)
+    while sent < have:
+      plus = self.handle.send(buff)
+      if plus == 0:
+        raise Exception('sent 0 bytes')
+      sent += plus
+      buff = buff[plus:]
 
   def flush(self):
     pass
+
+class TServerSocket(TServerTransportBase):
+
+  """Socket implementation of TServerTransport base."""
+
+  def __init__(self, port):
+    self.port = port
+    self.handle = None
+ 
+  def listen(self):
+    self.handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    self.handle.bind(('', self.port))
+    self.handle.listen(128)
+
+  def accept(self):
+    (client, addr) = self.handle.accept()
+    result = TSocket()
+    result.set_handle(client)
+    return result
+
+  def close(self):
+    self.handle.close()
+    self.handle = None
diff --git a/lib/py/src/transport/TTransport.py b/lib/py/src/transport/TTransport.py
index e9e36d7..1e8b6c6 100644
--- a/lib/py/src/transport/TTransport.py
+++ b/lib/py/src/transport/TTransport.py
@@ -20,6 +20,48 @@
   def write(self, buf):
     pass
 
-  def flush():
+  def flush(self):
     pass
 
+class TServerTransportBase:
+
+  """Base class for Thrift server transports."""
+
+  def listen(self):
+    pass
+
+  def accept(self):
+    pass
+
+  def close(self):
+    pass
+
+class TBufferedTransport(TTransportBase):
+
+  """Class that wraps another transport and buffers its I/O."""
+
+  def __init__(self, trans):
+    self.__trans = trans
+    self.__buf = ''
+
+  def isOpen(self):
+    return self.__trans.isOpen()
+
+  def open(self):
+    return self.__trans.open()
+
+  def close(self):
+    return self.__trans.close()
+
+  def read(self, sz):
+    return self.__trans.read(sz)
+
+  def readAll(self, sz):
+    return self.__trans.readAll(sz)
+
+  def write(self, buf):
+    self.__buf += buf
+
+  def flush(self):
+    self.__trans.write(self.__buf)
+    self.__buf = ''
diff --git a/test/py/TestClient.py b/test/py/TestClient.py
index a64b8d7..21d1990 100755
--- a/test/py/TestClient.py
+++ b/test/py/TestClient.py
@@ -5,31 +5,56 @@
 
 import ThriftTest
 from ThriftTest_types import *
+from thrift.transport import TTransport
 from thrift.transport import TSocket
 from thrift.protocol import TBinaryProtocol
 
-transport = TSocket.TSocket('localhost', 9090)
+import timing
+
+transport = TTransport.TBufferedTransport(TSocket.TSocket('localhost', 9090))
 protocol = TBinaryProtocol.TBinaryProtocol()
 client = ThriftTest.Client(transport, protocol)
 
 transport.open()
 
+# Start debug timing
+timing.start()
+
 print "testVoid()"
 print client.testVoid()
 
-print "testString('PythonTest')"
-print client.testString('PythonTest')
+print "testString('Python')"
+print client.testString('Python')
 
 print "testByte(63)"
 print client.testByte(63)
 
+print "testI32(-1)"
+print client.testI32(-1)
+
+print "testI64(-34359738368)"
+print client.testI64(-34359738368)
+
+print "testStruct({Zero, 1, -3, -5})"
+x = Xtruct()
+x.string_thing = "Zero"
+x.byte_thing = 1
+x.i32_thing = -3
+x.i64_thing = -5
+x = client.testStruct(x)
+print "{%s, %d, %d, %d}" % (x.string_thing, x.byte_thing, x.i32_thing, x.i64_thing)
+
 print "testException('Safe')"
 print client.testException('Safe')
 
-print "textException('Xception')"
 try:
+  print "textException('Xception')"
   print client.testException('Xception')
+
 except Xception, x:
-  print 'Xception (%d, %s)' % (x.errorCode, x.message)
+  print "Xception (%d, %s)" % (x.errorCode, x.message)
+
+timing.finish()
+print "Total time: %d microsecs" % timing.micro()
 
 transport.close()
diff --git a/test/py/TestServer.py b/test/py/TestServer.py
new file mode 100755
index 0000000..4b571c7
--- /dev/null
+++ b/test/py/TestServer.py
@@ -0,0 +1,37 @@
+#!/usr/bin/python
+
+import sys
+sys.path.append('./gen-py')
+
+import ThriftTest
+from ThriftTest_types import *
+from thrift.transport import TSocket
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+class TestHandler:
+
+  def testVoid(self):
+    print 'testVoid()'
+
+  def testString(self, str):
+    print 'testString(%s)' % str
+    return str
+
+  def testByte(self, byte):
+    print 'testByte(%d)' % byte
+    return byte
+
+  def testException(self, str):
+    print 'testException(%s)' % str
+    x = Xception()
+    x.errorCode = 1001
+    x.message = str
+    raise x
+
+transport = TSocket.TServerSocket(9090)
+protocol = TBinaryProtocol.TBinaryProtocol()
+handler = TestHandler()
+iface = ThriftTest.Server(handler, protocol)
+server = TServer.TSimpleServer(iface, transport)
+server.run()