blob: c9cfa1104bce2299c1f3dcb0a40bcf0576afaf0a [file] [log] [blame]
Bryan Duxburya48b7d62011-03-09 18:05:58 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20
21import logging
Konrad Grochowski3a724e32014-08-12 11:48:29 -040022
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090023from multiprocessing import Process, Value, Condition
Bryan Duxburya48b7d62011-03-09 18:05:58 +000024
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090025from .TServer import TServer
Bryan Duxburya48b7d62011-03-09 18:05:58 +000026from thrift.transport.TTransport import TTransportException
27
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090028logger = logging.getLogger(__name__)
29
Bryan Duxburya48b7d62011-03-09 18:05:58 +000030
Bryan Duxbury69720412012-01-03 17:32:30 +000031class TProcessPoolServer(TServer):
32 """Server with a fixed size pool of worker subprocesses to service requests
33
Bryan Duxburya48b7d62011-03-09 18:05:58 +000034 Note that if you need shared state between the handlers - it's up to you!
35 Written by Dvir Volk, doat.com
36 """
Bryan Duxbury69720412012-01-03 17:32:30 +000037 def __init__(self, *args):
Bryan Duxburya48b7d62011-03-09 18:05:58 +000038 TServer.__init__(self, *args)
39 self.numWorkers = 10
40 self.workers = []
41 self.isRunning = Value('b', False)
42 self.stopCondition = Condition()
43 self.postForkCallback = None
44
Yiyang Zhouda1e19b2021-08-13 16:30:58 +080045 def __getstate__(self):
Jens Geyer443a03c2021-11-15 19:30:13 +010046 state = self.__dict__.copy()
Yiyang Zhouda1e19b2021-08-13 16:30:58 +080047 state['workers'] = None
48 return state
49
Bryan Duxburya48b7d62011-03-09 18:05:58 +000050 def setPostForkCallback(self, callback):
51 if not callable(callback):
52 raise TypeError("This is not a callback!")
53 self.postForkCallback = callback
54
55 def setNumWorkers(self, num):
56 """Set the number of worker threads that should be created"""
57 self.numWorkers = num
58
59 def workerProcess(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000060 """Loop getting clients from the shared queue and process them"""
Bryan Duxburya48b7d62011-03-09 18:05:58 +000061 if self.postForkCallback:
62 self.postForkCallback()
63
Bryan Duxbury69720412012-01-03 17:32:30 +000064 while self.isRunning.value:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000065 try:
66 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +020067 if not client:
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090068 continue
Bryan Duxburya48b7d62011-03-09 18:05:58 +000069 self.serveClient(client)
70 except (KeyboardInterrupt, SystemExit):
71 return 0
jfarrelld565e2f2015-03-18 21:02:47 -040072 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040073 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +000074
75 def serveClient(self, client):
76 """Process input/output from a client for as long as possible"""
77 itrans = self.inputTransportFactory.getTransport(client)
78 otrans = self.outputTransportFactory.getTransport(client)
79 iprot = self.inputProtocolFactory.getProtocol(itrans)
80 oprot = self.outputProtocolFactory.getProtocol(otrans)
81
82 try:
83 while True:
84 self.processor.process(iprot, oprot)
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090085 except TTransportException:
Bryan Duxburya48b7d62011-03-09 18:05:58 +000086 pass
jfarrelld565e2f2015-03-18 21:02:47 -040087 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040088 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +000089
90 itrans.close()
91 otrans.close()
92
Bryan Duxburya48b7d62011-03-09 18:05:58 +000093 def serve(self):
Bryan Duxbury69720412012-01-03 17:32:30 +000094 """Start workers and put into queue"""
95 # this is a shared state that can tell the workers to exit when False
Bryan Duxburya48b7d62011-03-09 18:05:58 +000096 self.isRunning.value = True
97
Bryan Duxbury69720412012-01-03 17:32:30 +000098 # first bind and listen to the port
Bryan Duxburya48b7d62011-03-09 18:05:58 +000099 self.serverTransport.listen()
100
Bryan Duxbury69720412012-01-03 17:32:30 +0000101 # fork the children
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000102 for i in range(self.numWorkers):
103 try:
104 w = Process(target=self.workerProcess)
105 w.daemon = True
106 w.start()
107 self.workers.append(w)
jfarrelld565e2f2015-03-18 21:02:47 -0400108 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400109 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000110
Bryan Duxbury69720412012-01-03 17:32:30 +0000111 # wait until the condition is set by stop()
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000112 while True:
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000113 self.stopCondition.acquire()
114 try:
115 self.stopCondition.wait()
116 break
117 except (SystemExit, KeyboardInterrupt):
Bryan Duxbury69720412012-01-03 17:32:30 +0000118 break
jfarrelld565e2f2015-03-18 21:02:47 -0400119 except Exception as x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400120 logger.exception(x)
Bryan Duxburya48b7d62011-03-09 18:05:58 +0000121
122 self.isRunning.value = False
123
124 def stop(self):
125 self.isRunning.value = False
126 self.stopCondition.acquire()
127 self.stopCondition.notify()
128 self.stopCondition.release()