Forking Python server.
The python threading model does not provide concurrency for CPU-bound
processes. Process forking is the current recommended way of writing
scalable Python servers.
Harry Wang ran the [elided] backend with this change for 3 days and observed
no errors. The threaded backend caused unexplained lockups under this load
after 24 hours.
I also ran a CPU-bound load test against this server with 32 concurrent
clients. It completed 5X faster than the threaded implementation.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666359 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py
index b6738f7..a44ab52 100644
--- a/lib/py/src/server/TServer.py
+++ b/lib/py/src/server/TServer.py
@@ -7,6 +7,7 @@
# http://developers.facebook.com/thrift/
import sys
+import os
import traceback
import threading
import Queue
@@ -167,3 +168,83 @@
self.clients.put(client)
except Exception, x:
print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+
+
+class TForkingServer(TServer):
+
+ """A Thrift server that forks a new process for each request"""
+ """
+ This is more scalable than the threaded server as it does not cause
+ GIL contention.
+
+ Note that this has different semantics from the threading server.
+ Specifically, updates to shared variables will no longer be shared.
+ It will also not work on windows.
+
+ This code is heavily inspired by SocketServer.ForkingMixIn in the
+ Python stdlib.
+ """
+
+ def __init__(self, *args):
+ TServer.__init__(self, *args)
+ self.children = []
+
+ def serve(self):
+ self.serverTransport.listen()
+ while True:
+ client = self.serverTransport.accept()
+ try:
+ pid = os.fork()
+
+ if pid: # parent
+ # add before collect, otherwise you race w/ waitpid
+ self.children.append(pid)
+ self.collect_children()
+
+ else:
+ itrans = self.inputTransportFactory.getTransport(client)
+ otrans = self.outputTransportFactory.getTransport(client)
+
+ iprot = self.inputProtocolFactory.getProtocol(itrans)
+ oprot = self.outputProtocolFactory.getProtocol(otrans)
+
+ try:
+ while True:
+ self.processor.process(iprot, oprot)
+ except TTransport.TTransportException, tx:
+ pass
+ except Exception, e:
+ print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+ os._exit(1)
+
+ def try_close(file):
+ try:
+ file.close()
+ except IOError, e:
+ print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+ try_close(itrans)
+ try_close(otrans)
+ os._exit(0)
+
+ except TTransport.TTransportException, tx:
+ pass
+ except Exception, x:
+ print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+
+
+
+ def collect_children(self):
+ while self.children:
+ try:
+ pid, status = os.waitpid(0, os.WNOHANG)
+ except os.error:
+ pid = None
+
+ if pid:
+ self.children.remove(pid)
+ else:
+ break
+
+