| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| |
| import logging |
| from multiprocessing import Process, Value, Condition, reduction |
| |
| from TServer import TServer |
| from thrift.transport.TTransport import TTransportException |
| |
| |
| class TProcessPoolServer(TServer): |
| """Server with a fixed size pool of worker subprocesses to service requests |
| |
| Note that if you need shared state between the handlers - it's up to you! |
| Written by Dvir Volk, doat.com |
| """ |
| def __init__(self, *args): |
| TServer.__init__(self, *args) |
| self.numWorkers = 10 |
| self.workers = [] |
| self.isRunning = Value('b', False) |
| self.stopCondition = Condition() |
| self.postForkCallback = None |
| |
| def setPostForkCallback(self, callback): |
| if not callable(callback): |
| raise TypeError("This is not a callback!") |
| self.postForkCallback = callback |
| |
| def setNumWorkers(self, num): |
| """Set the number of worker threads that should be created""" |
| self.numWorkers = num |
| |
| def workerProcess(self): |
| """Loop getting clients from the shared queue and process them""" |
| if self.postForkCallback: |
| self.postForkCallback() |
| |
| while self.isRunning.value: |
| try: |
| client = self.serverTransport.accept() |
| if not client: |
| continue |
| self.serveClient(client) |
| except (KeyboardInterrupt, SystemExit): |
| return 0 |
| except Exception, x: |
| logging.exception(x) |
| |
| def serveClient(self, client): |
| """Process input/output from a client for as long as possible""" |
| itrans = self.inputTransportFactory.getTransport(client) |
| otrans = self.outputTransportFactory.getTransport(client) |
| iprot = self.inputProtocolFactory.getProtocol(itrans) |
| oprot = self.outputProtocolFactory.getProtocol(otrans) |
| |
| try: |
| while True: |
| self.processor.process(iprot, oprot) |
| except TTransportException, tx: |
| pass |
| except Exception, x: |
| logging.exception(x) |
| |
| itrans.close() |
| otrans.close() |
| |
| def serve(self): |
| """Start workers and put into queue""" |
| # this is a shared state that can tell the workers to exit when False |
| self.isRunning.value = True |
| |
| # first bind and listen to the port |
| self.serverTransport.listen() |
| |
| # fork the children |
| for i in range(self.numWorkers): |
| try: |
| w = Process(target=self.workerProcess) |
| w.daemon = True |
| w.start() |
| self.workers.append(w) |
| except Exception, x: |
| logging.exception(x) |
| |
| # wait until the condition is set by stop() |
| while True: |
| self.stopCondition.acquire() |
| try: |
| self.stopCondition.wait() |
| break |
| except (SystemExit, KeyboardInterrupt): |
| break |
| except Exception, x: |
| logging.exception(x) |
| |
| self.isRunning.value = False |
| |
| def stop(self): |
| self.isRunning.value = False |
| self.stopCondition.acquire() |
| self.stopCondition.notify() |
| self.stopCondition.release() |