blob: ae7fe1cd2e988b44bbc075d71c043f4b3a11ae56 [file] [log] [blame]
Bryan Duxburya48b7d62011-03-09 18:05:58 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20
21import logging
Konrad Grochowski3a724e32014-08-12 11:48:29 -040022logger = logging.getLogger(__name__)
23
Bryan Duxburya48b7d62011-03-09 18:05:58 +000024from multiprocessing import Process, Value, Condition, reduction
25
26from TServer import TServer
27from thrift.transport.TTransport import TTransportException
28
Bryan Duxburya48b7d62011-03-09 18:05:58 +000029
Bryan Duxbury69720412012-01-03 17:32:30 +000030class TProcessPoolServer(TServer):
31 """Server with a fixed size pool of worker subprocesses to service requests
32
Bryan Duxburya48b7d62011-03-09 18:05:58 +000033 Note that if you need shared state between the handlers - it's up to you!
34 Written by Dvir Volk, doat.com
35 """
Bryan Duxbury69720412012-01-03 17:32:30 +000036 def __init__(self, *args):
Bryan Duxburya48b7d62011-03-09 18:05:58 +000037 TServer.__init__(self, *args)
38 self.numWorkers = 10
39 self.workers = []
40 self.isRunning = Value('b', False)
41 self.stopCondition = Condition()
42 self.postForkCallback = None
43
44 def setPostForkCallback(self, callback):
45 if not callable(callback):
46 raise TypeError("This is not a callback!")
47 self.postForkCallback = callback
48
49 def setNumWorkers(self, num):
50 """Set the number of worker threads that should be created"""
51 self.numWorkers = num
52
53 def workerProcess(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000054 """Loop getting clients from the shared queue and process them"""
Bryan Duxburya48b7d62011-03-09 18:05:58 +000055 if self.postForkCallback:
56 self.postForkCallback()
57
Bryan Duxbury69720412012-01-03 17:32:30 +000058 while self.isRunning.value:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000059 try:
60 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +020061 if not client:
62 continue
Bryan Duxburya48b7d62011-03-09 18:05:58 +000063 self.serveClient(client)
64 except (KeyboardInterrupt, SystemExit):
65 return 0
jfarrelld565e2f2015-03-18 21:02:47 -040066 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040067 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +000068
69 def serveClient(self, client):
70 """Process input/output from a client for as long as possible"""
71 itrans = self.inputTransportFactory.getTransport(client)
72 otrans = self.outputTransportFactory.getTransport(client)
73 iprot = self.inputProtocolFactory.getProtocol(itrans)
74 oprot = self.outputProtocolFactory.getProtocol(otrans)
75
76 try:
77 while True:
78 self.processor.process(iprot, oprot)
jfarrelld565e2f2015-03-18 21:02:47 -040079 except TTransportException as tx:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000080 pass
jfarrelld565e2f2015-03-18 21:02:47 -040081 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040082 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +000083
84 itrans.close()
85 otrans.close()
86
Bryan Duxburya48b7d62011-03-09 18:05:58 +000087 def serve(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000088 """Start workers and put into queue"""
89 # this is a shared state that can tell the workers to exit when False
Bryan Duxburya48b7d62011-03-09 18:05:58 +000090 self.isRunning.value = True
91
Bryan Duxbury69720412012-01-03 17:32:30 +000092 # first bind and listen to the port
Bryan Duxburya48b7d62011-03-09 18:05:58 +000093 self.serverTransport.listen()
94
Bryan Duxbury69720412012-01-03 17:32:30 +000095 # fork the children
Bryan Duxburya48b7d62011-03-09 18:05:58 +000096 for i in range(self.numWorkers):
97 try:
98 w = Process(target=self.workerProcess)
99 w.daemon = True
100 w.start()
101 self.workers.append(w)
jfarrelld565e2f2015-03-18 21:02:47 -0400102 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400103 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000104
Bryan Duxbury69720412012-01-03 17:32:30 +0000105 # wait until the condition is set by stop()
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000106 while True:
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000107 self.stopCondition.acquire()
108 try:
109 self.stopCondition.wait()
110 break
111 except (SystemExit, KeyboardInterrupt):
Bryan Duxbury69720412012-01-03 17:32:30 +0000112 break
jfarrelld565e2f2015-03-18 21:02:47 -0400113 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400114 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000115
116 self.isRunning.value = False
117
118 def stop(self):
119 self.isRunning.value = False
120 self.stopCondition.acquire()
121 self.stopCondition.notify()
122 self.stopCondition.release()