blob: c9cfa1104bce2299c1f3dcb0a40bcf0576afaf0a [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
from multiprocessing import Process, Value, Condition
from .TServer import TServer
from thrift.transport.TTransport import TTransportException
logger = logging.getLogger(__name__)
class TProcessPoolServer(TServer):
"""Server with a fixed size pool of worker subprocesses to service requests
Note that if you need shared state between the handlers - it's up to you!
Written by Dvir Volk, doat.com
"""
def __init__(self, *args):
TServer.__init__(self, *args)
self.numWorkers = 10
self.workers = []
self.isRunning = Value('b', False)
self.stopCondition = Condition()
self.postForkCallback = None
def __getstate__(self):
state = self.__dict__.copy()
state['workers'] = None
return state
def setPostForkCallback(self, callback):
if not callable(callback):
raise TypeError("This is not a callback!")
self.postForkCallback = callback
def setNumWorkers(self, num):
"""Set the number of worker threads that should be created"""
self.numWorkers = num
def workerProcess(self):
"""Loop getting clients from the shared queue and process them"""
if self.postForkCallback:
self.postForkCallback()
while self.isRunning.value:
try:
client = self.serverTransport.accept()
if not client:
continue
self.serveClient(client)
except (KeyboardInterrupt, SystemExit):
return 0
except Exception as x:
logger.exception(x)
def serveClient(self, client):
"""Process input/output from a client for as long as possible"""
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
except TTransportException:
pass
except Exception as x:
logger.exception(x)
itrans.close()
otrans.close()
def serve(self):
"""Start workers and put into queue"""
# this is a shared state that can tell the workers to exit when False
self.isRunning.value = True
# first bind and listen to the port
self.serverTransport.listen()
# fork the children
for i in range(self.numWorkers):
try:
w = Process(target=self.workerProcess)
w.daemon = True
w.start()
self.workers.append(w)
except Exception as x:
logger.exception(x)
# wait until the condition is set by stop()
while True:
self.stopCondition.acquire()
try:
self.stopCondition.wait()
break
except (SystemExit, KeyboardInterrupt):
break
except Exception as x:
logger.exception(x)
self.isRunning.value = False
def stop(self):
self.isRunning.value = False
self.stopCondition.acquire()
self.stopCondition.notify()
self.stopCondition.release()