Python threadpool server for Thrift
Summary: Fixed number of threads that work from a shared queue
Reviewed By: cheever
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664832 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
index eb2ec70..c3a978d 100644
--- a/lib/py/src/server/TServer.py
+++ b/lib/py/src/server/TServer.py
@@ -1,6 +1,7 @@
import sys
import traceback
import threading
+import Queue
from thrift.Thrift import TProcessor
from thrift.transport import TTransport
@@ -69,3 +70,54 @@
pass
except Exception, x:
print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+class TThreadPoolServer(TServer):
+
+ """Server with a fixed size pool of threads which service requests."""
+
+ def __init__(self, processor, serverTransport, transportFactory=None):
+ TServer.__init__(self, processor, serverTransport, transportFactory)
+ self.clients = Queue.Queue()
+ self.threads = 10
+
+ def setNumThreads(num):
+ """Set the number of worker threads that should be created"""
+ self.threads = num
+
+ def serveThread(self):
+ """Loop around getting clients from the shared queue and process them."""
+ while True:
+ try:
+ client = self.client.get()
+ self.serveClient(client)
+ except Exception, x:
+ print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+ def serveClient(self, client):
+ """Process input/output from a client for as long as possible"""
+ (input, output) = self.transportFactory.getIOTransports(client)
+ try:
+ while True:
+ self.processor.process(input, output)
+ except TTransport.TTransportException, tx:
+ pass
+ except Exception, x:
+ print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+ def serve(self):
+ """Start a fixed number of worker threads and put client into a queue"""
+ for i in range(self.threads):
+ try:
+ t = threading.Thread(target = self.serveThread)
+ t.start()
+ except Exception, x:
+ print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
+
+ # Pump the socket for clients
+ self.serverTransport.listen()
+ while True:
+ try:
+ client = self.serverTransport.accept()
+ self.clients.put(client)
+ except Exception, x:
+ print '%s, %s, %s' % (type(x), x, traceback.format_exc())