Python threadpool server for Thrift

Summary: Fixed number of threads that work from a shared queue

Reviewed By: cheever


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664832 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
index eb2ec70..c3a978d 100644
--- a/lib/py/src/server/TServer.py
+++ b/lib/py/src/server/TServer.py
@@ -1,6 +1,7 @@
 import sys
 import traceback
 import threading
+import Queue
 
 from thrift.Thrift import TProcessor
 from thrift.transport import TTransport
@@ -69,3 +70,54 @@
       pass
     except Exception, x:
       print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+class TThreadPoolServer(TServer):
+
+  """Server with a fixed size pool of threads which service requests."""
+
+  def __init__(self, processor, serverTransport, transportFactory=None):
+    TServer.__init__(self, processor, serverTransport, transportFactory)
+    self.clients = Queue.Queue()
+    self.threads = 10
+
+  def setNumThreads(num):
+    """Set the number of worker threads that should be created"""
+    self.threads = num
+
+  def serveThread(self):
+    """Loop around getting clients from the shared queue and process them."""
+    while True:
+      try:
+        client = self.client.get()
+        self.serveClient(client)
+      except Exception, x:
+        print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+      
+  def serveClient(self, client):
+    """Process input/output from a client for as long as possible"""
+    (input, output) = self.transportFactory.getIOTransports(client)
+    try:
+      while True:
+        self.processor.process(input, output)
+    except TTransport.TTransportException, tx:
+      pass
+    except Exception, x:
+      print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+  def serve(self):
+    """Start a fixed number of worker threads and put client into a queue"""
+    for i in range(self.threads):
+      try:
+        t = threading.Thread(target = self.serveThread)
+        t.start()
+      except Exception, x:
+        print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
+    
+    # Pump the socket for clients
+    self.serverTransport.listen()
+    while True:
+      try:
+        client = self.serverTransport.accept()
+        self.clients.put(client)
+      except Exception, x:
+        print '%s, %s, %s' % (type(x), x, traceback.format_exc())