blob: 7ed814a883cd56a727c2b10e0162dd107c68f12e [file] [log] [blame]
Bryan Duxburya48b7d62011-03-09 18:05:58 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20
21import logging
22from multiprocessing import Process, Value, Condition, reduction
23
24from TServer import TServer
25from thrift.transport.TTransport import TTransportException
26
27class TProcessPoolServer(TServer):
28
29 """
30 Server with a fixed size pool of worker subprocesses which service requests.
31 Note that if you need shared state between the handlers - it's up to you!
32 Written by Dvir Volk, doat.com
33 """
34
35 def __init__(self, * args):
36 TServer.__init__(self, *args)
37 self.numWorkers = 10
38 self.workers = []
39 self.isRunning = Value('b', False)
40 self.stopCondition = Condition()
41 self.postForkCallback = None
42
43 def setPostForkCallback(self, callback):
44 if not callable(callback):
45 raise TypeError("This is not a callback!")
46 self.postForkCallback = callback
47
48 def setNumWorkers(self, num):
49 """Set the number of worker threads that should be created"""
50 self.numWorkers = num
51
52 def workerProcess(self):
53 """Loop around getting clients from the shared queue and process them."""
54
55 if self.postForkCallback:
56 self.postForkCallback()
57
58 while self.isRunning.value == True:
59 try:
60 client = self.serverTransport.accept()
61 self.serveClient(client)
62 except (KeyboardInterrupt, SystemExit):
63 return 0
64 except Exception, x:
65 logging.exception(x)
66
67 def serveClient(self, client):
68 """Process input/output from a client for as long as possible"""
69 itrans = self.inputTransportFactory.getTransport(client)
70 otrans = self.outputTransportFactory.getTransport(client)
71 iprot = self.inputProtocolFactory.getProtocol(itrans)
72 oprot = self.outputProtocolFactory.getProtocol(otrans)
73
74 try:
75 while True:
76 self.processor.process(iprot, oprot)
77 except TTransportException, tx:
78 pass
79 except Exception, x:
80 logging.exception(x)
81
82 itrans.close()
83 otrans.close()
84
85
86 def serve(self):
87 """Start a fixed number of worker threads and put client into a queue"""
88
89 #this is a shared state that can tell the workers to exit when set as false
90 self.isRunning.value = True
91
92 #first bind and listen to the port
93 self.serverTransport.listen()
94
95 #fork the children
96 for i in range(self.numWorkers):
97 try:
98 w = Process(target=self.workerProcess)
99 w.daemon = True
100 w.start()
101 self.workers.append(w)
102 except Exception, x:
103 logging.exception(x)
104
105 #wait until the condition is set by stop()
106
107 while True:
108
109 self.stopCondition.acquire()
110 try:
111 self.stopCondition.wait()
112 break
113 except (SystemExit, KeyboardInterrupt):
114 break
115 except Exception, x:
116 logging.exception(x)
117
118 self.isRunning.value = False
119
120 def stop(self):
121 self.isRunning.value = False
122 self.stopCondition.acquire()
123 self.stopCondition.notify()
124 self.stopCondition.release()
125