blob: 7a695a88344db650e9f482350ab7454d2d3de0c6 [file] [log] [blame]
Bryan Duxburya48b7d62011-03-09 18:05:58 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20
21import logging
22from multiprocessing import Process, Value, Condition, reduction
23
24from TServer import TServer
25from thrift.transport.TTransport import TTransportException
26
Bryan Duxburya48b7d62011-03-09 18:05:58 +000027
Bryan Duxbury69720412012-01-03 17:32:30 +000028class TProcessPoolServer(TServer):
29 """Server with a fixed size pool of worker subprocesses to service requests
30
Bryan Duxburya48b7d62011-03-09 18:05:58 +000031 Note that if you need shared state between the handlers - it's up to you!
32 Written by Dvir Volk, doat.com
33 """
Bryan Duxbury69720412012-01-03 17:32:30 +000034 def __init__(self, *args):
Bryan Duxburya48b7d62011-03-09 18:05:58 +000035 TServer.__init__(self, *args)
36 self.numWorkers = 10
37 self.workers = []
38 self.isRunning = Value('b', False)
39 self.stopCondition = Condition()
40 self.postForkCallback = None
41
42 def setPostForkCallback(self, callback):
43 if not callable(callback):
44 raise TypeError("This is not a callback!")
45 self.postForkCallback = callback
46
47 def setNumWorkers(self, num):
48 """Set the number of worker threads that should be created"""
49 self.numWorkers = num
50
51 def workerProcess(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000052 """Loop getting clients from the shared queue and process them"""
Bryan Duxburya48b7d62011-03-09 18:05:58 +000053 if self.postForkCallback:
54 self.postForkCallback()
55
Bryan Duxbury69720412012-01-03 17:32:30 +000056 while self.isRunning.value:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000057 try:
58 client = self.serverTransport.accept()
59 self.serveClient(client)
60 except (KeyboardInterrupt, SystemExit):
61 return 0
Todd Lipcon2b2560e2012-12-10 14:29:59 -080062 except Exception, x:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000063 logging.exception(x)
64
65 def serveClient(self, client):
66 """Process input/output from a client for as long as possible"""
67 itrans = self.inputTransportFactory.getTransport(client)
68 otrans = self.outputTransportFactory.getTransport(client)
69 iprot = self.inputProtocolFactory.getProtocol(itrans)
70 oprot = self.outputProtocolFactory.getProtocol(otrans)
71
72 try:
73 while True:
74 self.processor.process(iprot, oprot)
75 except TTransportException, tx:
76 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -080077 except Exception, x:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000078 logging.exception(x)
79
80 itrans.close()
81 otrans.close()
82
Bryan Duxburya48b7d62011-03-09 18:05:58 +000083 def serve(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000084 """Start workers and put into queue"""
85 # this is a shared state that can tell the workers to exit when False
Bryan Duxburya48b7d62011-03-09 18:05:58 +000086 self.isRunning.value = True
87
Bryan Duxbury69720412012-01-03 17:32:30 +000088 # first bind and listen to the port
Bryan Duxburya48b7d62011-03-09 18:05:58 +000089 self.serverTransport.listen()
90
Bryan Duxbury69720412012-01-03 17:32:30 +000091 # fork the children
Bryan Duxburya48b7d62011-03-09 18:05:58 +000092 for i in range(self.numWorkers):
93 try:
94 w = Process(target=self.workerProcess)
95 w.daemon = True
96 w.start()
97 self.workers.append(w)
98 except Exception, x:
99 logging.exception(x)
100
Bryan Duxbury69720412012-01-03 17:32:30 +0000101 # wait until the condition is set by stop()
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000102 while True:
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000103 self.stopCondition.acquire()
104 try:
105 self.stopCondition.wait()
106 break
107 except (SystemExit, KeyboardInterrupt):
Bryan Duxbury69720412012-01-03 17:32:30 +0000108 break
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800109 except Exception, x:
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000110 logging.exception(x)
111
112 self.isRunning.value = False
113
114 def stop(self):
115 self.isRunning.value = False
116 self.stopCondition.acquire()
117 self.stopCondition.notify()
118 self.stopCondition.release()