Forking Python server.

The python threading model does not provide concurrency for CPU-bound
processes.  Process forking is the current recommended way of writing
scalable Python servers.

Harry Wang ran the [elided] backend with this change for 3 days and observed
no errors.  The threaded backend caused unexplained lockups under this load
after 24 hours.

I also ran a CPU-bound load test against this server with 32 concurrent
clients.  It completed 5X faster than the threaded implementation.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666359 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
index b6738f7..a44ab52 100644
--- a/lib/py/src/server/TServer.py
+++ b/lib/py/src/server/TServer.py
@@ -7,6 +7,7 @@
 # http://developers.facebook.com/thrift/
 
 import sys
+import os
 import traceback
 import threading
 import Queue
@@ -167,3 +168,83 @@
         self.clients.put(client)
       except Exception, x:
         print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+
+
+class TForkingServer(TServer):
+
+  """A Thrift server that forks a new process for each request"""
+  """
+  This is more scalable than the threaded server as it does not cause
+  GIL contention.
+
+  Note that this has different semantics from the threading server.
+  Specifically, updates to shared variables will no longer be shared.
+  It will also not work on windows.
+
+  This code is heavily inspired by SocketServer.ForkingMixIn in the
+  Python stdlib.
+  """
+
+  def __init__(self, *args):
+    TServer.__init__(self, *args)
+    self.children = []
+
+  def serve(self):
+    self.serverTransport.listen()
+    while True:
+      client = self.serverTransport.accept()
+      try:
+        pid = os.fork()
+
+        if pid: # parent
+          # add before collect, otherwise you race w/ waitpid
+          self.children.append(pid)
+          self.collect_children()
+
+        else:
+          itrans = self.inputTransportFactory.getTransport(client)
+          otrans = self.outputTransportFactory.getTransport(client)
+
+          iprot = self.inputProtocolFactory.getProtocol(itrans)
+          oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+          try:
+            while True:
+              self.processor.process(iprot, oprot)
+          except TTransport.TTransportException, tx:
+            pass
+          except Exception, e:
+            print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+            os._exit(1)
+
+          def try_close(file):
+            try:
+              file.close()
+            except IOError, e:
+              print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+          try_close(itrans)
+          try_close(otrans)
+          os._exit(0)
+
+      except TTransport.TTransportException, tx:
+        pass
+      except Exception, x:
+        print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+
+
+  def collect_children(self):
+    while self.children:
+      try:
+        pid, status = os.waitpid(0, os.WNOHANG)
+      except os.error:
+        pid = None
+
+      if pid:
+        self.children.remove(pid)
+      else:
+        break
+
+