THRIFT-1083. py: Preforking python process pool server

Patch: Dvir Volk

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1079913 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/server/TProcessPoolServer.py b/lib/py/src/server/TProcessPoolServer.py
new file mode 100644
index 0000000..7ed814a
--- /dev/null
+++ b/lib/py/src/server/TProcessPoolServer.py
@@ -0,0 +1,125 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+import logging
+from multiprocessing import  Process, Value, Condition, reduction
+
+from TServer import TServer
+from thrift.transport.TTransport import TTransportException
+
+class TProcessPoolServer(TServer):
+
+    """
+    Server with a fixed size pool of worker subprocesses which service requests.
+    Note that if you need shared state between the handlers - it's up to you!
+    Written by Dvir Volk, doat.com
+    """
+
+    def __init__(self, * args):
+        TServer.__init__(self, *args)
+        self.numWorkers = 10
+        self.workers = []
+        self.isRunning = Value('b', False)
+        self.stopCondition = Condition()
+        self.postForkCallback = None
+
+    def setPostForkCallback(self, callback):
+        if not callable(callback):
+            raise TypeError("This is not a callback!")
+        self.postForkCallback = callback
+
+    def setNumWorkers(self, num):
+        """Set the number of worker threads that should be created"""
+        self.numWorkers = num
+
+    def workerProcess(self):
+        """Loop around getting clients from the shared queue and process them."""
+
+        if self.postForkCallback:
+            self.postForkCallback()
+
+        while self.isRunning.value == True:
+            try:
+                client = self.serverTransport.accept()
+                self.serveClient(client)
+            except (KeyboardInterrupt, SystemExit):
+                return 0
+            except Exception, x:
+                logging.exception(x)
+
+    def serveClient(self, client):
+        """Process input/output from a client for as long as possible"""
+        itrans = self.inputTransportFactory.getTransport(client)
+        otrans = self.outputTransportFactory.getTransport(client)
+        iprot = self.inputProtocolFactory.getProtocol(itrans)
+        oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+        try:
+            while True:
+                self.processor.process(iprot, oprot)
+        except TTransportException, tx:
+            pass
+        except Exception, x:
+            logging.exception(x)
+
+        itrans.close()
+        otrans.close()
+
+
+    def serve(self):
+        """Start a fixed number of worker threads and put client into a queue"""
+
+        #this is a shared state that can tell the workers to exit when set as false
+        self.isRunning.value = True
+
+        #first bind and listen to the port
+        self.serverTransport.listen()
+
+        #fork the children
+        for i in range(self.numWorkers):
+            try:
+                w = Process(target=self.workerProcess)
+                w.daemon = True
+                w.start()
+                self.workers.append(w)
+            except Exception, x:
+                logging.exception(x)
+
+        #wait until the condition is set by stop()
+
+        while True:
+
+            self.stopCondition.acquire()
+            try:
+                self.stopCondition.wait()
+                break
+            except (SystemExit, KeyboardInterrupt):
+		break
+            except Exception, x:
+                logging.exception(x)
+
+        self.isRunning.value = False
+
+    def stop(self):
+        self.isRunning.value = False
+        self.stopCondition.acquire()
+        self.stopCondition.notify()
+        self.stopCondition.release()
+