blob: fe6dc816278666d2f8642cbafbba761db611a781 [file] [log] [blame]
Bryan Duxburya48b7d62011-03-09 18:05:58 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20
21import logging
Konrad Grochowski3a724e32014-08-12 11:48:29 -040022
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090023from multiprocessing import Process, Value, Condition
Bryan Duxburya48b7d62011-03-09 18:05:58 +000024
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090025from .TServer import TServer
Bryan Duxburya48b7d62011-03-09 18:05:58 +000026from thrift.transport.TTransport import TTransportException
27
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090028logger = logging.getLogger(__name__)
29
Bryan Duxburya48b7d62011-03-09 18:05:58 +000030
Bryan Duxbury69720412012-01-03 17:32:30 +000031class TProcessPoolServer(TServer):
32 """Server with a fixed size pool of worker subprocesses to service requests
33
Bryan Duxburya48b7d62011-03-09 18:05:58 +000034 Note that if you need shared state between the handlers - it's up to you!
35 Written by Dvir Volk, doat.com
36 """
Bryan Duxbury69720412012-01-03 17:32:30 +000037 def __init__(self, *args):
Bryan Duxburya48b7d62011-03-09 18:05:58 +000038 TServer.__init__(self, *args)
39 self.numWorkers = 10
40 self.workers = []
41 self.isRunning = Value('b', False)
42 self.stopCondition = Condition()
43 self.postForkCallback = None
44
45 def setPostForkCallback(self, callback):
46 if not callable(callback):
47 raise TypeError("This is not a callback!")
48 self.postForkCallback = callback
49
50 def setNumWorkers(self, num):
51 """Set the number of worker threads that should be created"""
52 self.numWorkers = num
53
54 def workerProcess(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000055 """Loop getting clients from the shared queue and process them"""
Bryan Duxburya48b7d62011-03-09 18:05:58 +000056 if self.postForkCallback:
57 self.postForkCallback()
58
Bryan Duxbury69720412012-01-03 17:32:30 +000059 while self.isRunning.value:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000060 try:
61 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +020062 if not client:
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090063 continue
Bryan Duxburya48b7d62011-03-09 18:05:58 +000064 self.serveClient(client)
65 except (KeyboardInterrupt, SystemExit):
66 return 0
jfarrelld565e2f2015-03-18 21:02:47 -040067 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040068 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +000069
70 def serveClient(self, client):
71 """Process input/output from a client for as long as possible"""
72 itrans = self.inputTransportFactory.getTransport(client)
73 otrans = self.outputTransportFactory.getTransport(client)
74 iprot = self.inputProtocolFactory.getProtocol(itrans)
75 oprot = self.outputProtocolFactory.getProtocol(otrans)
76
77 try:
78 while True:
79 self.processor.process(iprot, oprot)
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090080 except TTransportException:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000081 pass
jfarrelld565e2f2015-03-18 21:02:47 -040082 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040083 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +000084
85 itrans.close()
86 otrans.close()
87
Bryan Duxburya48b7d62011-03-09 18:05:58 +000088 def serve(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000089 """Start workers and put into queue"""
90 # this is a shared state that can tell the workers to exit when False
Bryan Duxburya48b7d62011-03-09 18:05:58 +000091 self.isRunning.value = True
92
Bryan Duxbury69720412012-01-03 17:32:30 +000093 # first bind and listen to the port
Bryan Duxburya48b7d62011-03-09 18:05:58 +000094 self.serverTransport.listen()
95
Bryan Duxbury69720412012-01-03 17:32:30 +000096 # fork the children
Bryan Duxburya48b7d62011-03-09 18:05:58 +000097 for i in range(self.numWorkers):
98 try:
99 w = Process(target=self.workerProcess)
100 w.daemon = True
101 w.start()
102 self.workers.append(w)
jfarrelld565e2f2015-03-18 21:02:47 -0400103 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400104 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000105
Bryan Duxbury69720412012-01-03 17:32:30 +0000106 # wait until the condition is set by stop()
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000107 while True:
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000108 self.stopCondition.acquire()
109 try:
110 self.stopCondition.wait()
111 break
112 except (SystemExit, KeyboardInterrupt):
Bryan Duxbury69720412012-01-03 17:32:30 +0000113 break
jfarrelld565e2f2015-03-18 21:02:47 -0400114 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400115 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000116
117 self.isRunning.value = False
118
119 def stop(self):
120 self.isRunning.value = False
121 self.stopCondition.acquire()
122 self.stopCondition.notify()
123 self.stopCondition.release()