blob: 147d075ce116a8059eea3c6261e398ae27993bfa [file] [log] [blame]
import sys
import traceback
import threading
import Queue
from thrift.Thrift import TProcessor
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
class TServer:
"""Base interface for a server, which must have a serve method."""
""" 3 constructors for all servers:
1) (processor, serverTransport)
2) (processor, serverTransport, transportFactory, protocolFactory)
3) (processor, serverTransport,
inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory)"""
def __init__(self, *args):
if (len(args) == 2):
self.__initArgs__(args[0], args[1],
TTransport.TTransportFactoryBase(),
TTransport.TTransportFactoryBase(),
TBinaryProtocol.TBinaryProtocolFactory(),
TBinaryProtocol.TBinaryProtocolFactory())
elif (len(args) == 4):
self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
elif (len(args) == 6):
self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
def __initArgs__(self, processor, serverTransport,
inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory):
self.processor = processor
self.serverTransport = serverTransport
self.inputTransportFactory = inputTransportFactory
self.outputTransportFactory = outputTransportFactory
self.inputProtocolFactory = inputProtocolFactory
self.outputProtocolFactory = outputProtocolFactory
def serve(self):
pass
class TSimpleServer(TServer):
"""Simple single-threaded server that just pumps around one transport."""
def __init__(self, *args):
TServer.__init__(self, *args)
def serve(self):
self.serverTransport.listen()
while True:
client = self.serverTransport.accept()
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
except TTransport.TTransportException, tx:
pass
except Exception, x:
print '%s, %s, %s' % (type(x), x, traceback.format_exc())
itrans.close()
otrans.close()
class TThreadedServer(TServer):
"""Threaded server that spawns a new thread per each connection."""
def __init__(self, *args):
TServer.__init__(self, *args)
def serve(self):
self.serverTransport.listen()
while True:
try:
client = self.serverTransport.accept()
t = threading.Thread(target = self.handle, args=(client,))
t.start()
except Exception, x:
print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
def handle(self, client):
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
except TTransport.TTransportException, tx:
pass
except Exception, x:
print '%s, %s, %s' % (type(x), x, traceback.format_exc())
itrans.close()
otrans.close()
class TThreadPoolServer(TServer):
"""Server with a fixed size pool of threads which service requests."""
def __init__(self, *args):
TServer.__init__(self, *args)
self.clients = Queue.Queue()
self.threads = 10
def setNumThreads(self, num):
"""Set the number of worker threads that should be created"""
self.threads = num
def serveThread(self):
"""Loop around getting clients from the shared queue and process them."""
while True:
try:
client = self.clients.get()
self.serveClient(client)
except Exception, x:
print '%s, %s, %s' % (type(x), x, traceback.format_exc())
def serveClient(self, client):
"""Process input/output from a client for as long as possible"""
itrans = self.inputTransportFactory.getTransport(client)
otrans = self.outputTransportFactory.getTransport(client)
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
while True:
self.processor.process(iprot, oprot)
except TTransport.TTransportException, tx:
pass
except Exception, x:
print '%s, %s, %s' % (type(x), x, traceback.format_exc())
itrans.close()
otrans.close()
def serve(self):
"""Start a fixed number of worker threads and put client into a queue"""
for i in range(self.threads):
try:
t = threading.Thread(target = self.serveThread)
t.start()
except Exception, x:
print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
# Pump the socket for clients
self.serverTransport.listen()
while True:
try:
client = self.serverTransport.accept()
self.clients.put(client)
except Exception, x:
print '%s, %s, %s' % (type(x), x, traceback.format_exc())