blob: 736946638df6eb488296757d8d0a37f95725e0c0 [file] [log] [blame]
Bryan Duxburya48b7d62011-03-09 18:05:58 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20
21import logging
22from multiprocessing import Process, Value, Condition, reduction
23
24from TServer import TServer
25from thrift.transport.TTransport import TTransportException
26
Bryan Duxburya48b7d62011-03-09 18:05:58 +000027
Bryan Duxbury69720412012-01-03 17:32:30 +000028class TProcessPoolServer(TServer):
29 """Server with a fixed size pool of worker subprocesses to service requests
30
Bryan Duxburya48b7d62011-03-09 18:05:58 +000031 Note that if you need shared state between the handlers - it's up to you!
32 Written by Dvir Volk, doat.com
33 """
Bryan Duxbury69720412012-01-03 17:32:30 +000034 def __init__(self, *args):
Bryan Duxburya48b7d62011-03-09 18:05:58 +000035 TServer.__init__(self, *args)
36 self.numWorkers = 10
37 self.workers = []
38 self.isRunning = Value('b', False)
39 self.stopCondition = Condition()
40 self.postForkCallback = None
41
42 def setPostForkCallback(self, callback):
43 if not callable(callback):
44 raise TypeError("This is not a callback!")
45 self.postForkCallback = callback
46
47 def setNumWorkers(self, num):
48 """Set the number of worker threads that should be created"""
49 self.numWorkers = num
50
51 def workerProcess(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000052 """Loop getting clients from the shared queue and process them"""
Bryan Duxburya48b7d62011-03-09 18:05:58 +000053 if self.postForkCallback:
54 self.postForkCallback()
55
Bryan Duxbury69720412012-01-03 17:32:30 +000056 while self.isRunning.value:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000057 try:
58 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +020059 if not client:
60 continue
Bryan Duxburya48b7d62011-03-09 18:05:58 +000061 self.serveClient(client)
62 except (KeyboardInterrupt, SystemExit):
63 return 0
Todd Lipcon2b2560e2012-12-10 14:29:59 -080064 except Exception, x:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000065 logging.exception(x)
66
67 def serveClient(self, client):
68 """Process input/output from a client for as long as possible"""
69 itrans = self.inputTransportFactory.getTransport(client)
70 otrans = self.outputTransportFactory.getTransport(client)
71 iprot = self.inputProtocolFactory.getProtocol(itrans)
72 oprot = self.outputProtocolFactory.getProtocol(otrans)
73
74 try:
75 while True:
76 self.processor.process(iprot, oprot)
77 except TTransportException, tx:
78 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -080079 except Exception, x:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000080 logging.exception(x)
81
82 itrans.close()
83 otrans.close()
84
Bryan Duxburya48b7d62011-03-09 18:05:58 +000085 def serve(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000086 """Start workers and put into queue"""
87 # this is a shared state that can tell the workers to exit when False
Bryan Duxburya48b7d62011-03-09 18:05:58 +000088 self.isRunning.value = True
89
Bryan Duxbury69720412012-01-03 17:32:30 +000090 # first bind and listen to the port
Bryan Duxburya48b7d62011-03-09 18:05:58 +000091 self.serverTransport.listen()
92
Bryan Duxbury69720412012-01-03 17:32:30 +000093 # fork the children
Bryan Duxburya48b7d62011-03-09 18:05:58 +000094 for i in range(self.numWorkers):
95 try:
96 w = Process(target=self.workerProcess)
97 w.daemon = True
98 w.start()
99 self.workers.append(w)
100 except Exception, x:
101 logging.exception(x)
102
Bryan Duxbury69720412012-01-03 17:32:30 +0000103 # wait until the condition is set by stop()
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000104 while True:
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000105 self.stopCondition.acquire()
106 try:
107 self.stopCondition.wait()
108 break
109 except (SystemExit, KeyboardInterrupt):
Bryan Duxbury69720412012-01-03 17:32:30 +0000110 break
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800111 except Exception, x:
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000112 logging.exception(x)
113
114 self.isRunning.value = False
115
116 def stop(self):
117 self.isRunning.value = False
118 self.stopCondition.acquire()
119 self.stopCondition.notify()
120 self.stopCondition.release()