blob: 147d075ce116a8059eea3c6261e398ae27993bfa [file] [log] [blame]
Mark Sleec98d0502006-09-06 02:42:25 +00001import sys
2import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +00003import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +00004import Queue
Mark Sleec98d0502006-09-06 02:42:25 +00005
Mark Sleec9676562006-09-05 17:34:52 +00006from thrift.Thrift import TProcessor
7from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +00008from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +00009
10class TServer:
11
Mark Slee794993d2006-09-20 01:56:10 +000012 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000013
Aditya Agarwal5c468192007-02-06 01:14:33 +000014 """ 3 constructors for all servers:
15 1) (processor, serverTransport)
16 2) (processor, serverTransport, transportFactory, protocolFactory)
17 3) (processor, serverTransport,
18 inputTransportFactory, outputTransportFactory,
19 inputProtocolFactory, outputProtocolFactory)"""
20 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000021 if (len(args) == 2):
22 self.__initArgs__(args[0], args[1],
23 TTransport.TTransportFactoryBase(),
24 TTransport.TTransportFactoryBase(),
25 TBinaryProtocol.TBinaryProtocolFactory(),
26 TBinaryProtocol.TBinaryProtocolFactory())
27 elif (len(args) == 4):
28 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
29 elif (len(args) == 6):
30 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
31
32 def __initArgs__(self, processor, serverTransport,
33 inputTransportFactory, outputTransportFactory,
34 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000035 self.processor = processor
36 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000037 self.inputTransportFactory = inputTransportFactory
38 self.outputTransportFactory = outputTransportFactory
39 self.inputProtocolFactory = inputProtocolFactory
40 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000041
Mark Slee794993d2006-09-20 01:56:10 +000042 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000043 pass
44
45class TSimpleServer(TServer):
46
47 """Simple single-threaded server that just pumps around one transport."""
48
Aditya Agarwal5c468192007-02-06 01:14:33 +000049 def __init__(self, *args):
50 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000051
Mark Slee794993d2006-09-20 01:56:10 +000052 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000053 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000054 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000055 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000056 itrans = self.inputTransportFactory.getTransport(client)
57 otrans = self.outputTransportFactory.getTransport(client)
58 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000059 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000060 try:
61 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000062 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000063 except TTransport.TTransportException, tx:
64 pass
Mark Sleec9676562006-09-05 17:34:52 +000065 except Exception, x:
Mark Sleec98d0502006-09-06 02:42:25 +000066 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleed788b2e2006-09-07 01:26:35 +000067
Mark Slee4ac459f2006-10-25 21:39:01 +000068 itrans.close()
69 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000070
71class TThreadedServer(TServer):
72
73 """Threaded server that spawns a new thread per each connection."""
74
Aditya Agarwal5c468192007-02-06 01:14:33 +000075 def __init__(self, *args):
76 TServer.__init__(self, *args)
Mark Slee4f0fed62006-10-02 17:50:08 +000077
78 def serve(self):
79 self.serverTransport.listen()
80 while True:
81 try:
82 client = self.serverTransport.accept()
83 t = threading.Thread(target = self.handle, args=(client,))
84 t.start()
85 except Exception, x:
86 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
87
88 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +000089 itrans = self.inputTransportFactory.getTransport(client)
90 otrans = self.outputTransportFactory.getTransport(client)
91 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000092 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +000093 try:
94 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000095 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000096 except TTransport.TTransportException, tx:
97 pass
98 except Exception, x:
99 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000100
Mark Slee4ac459f2006-10-25 21:39:01 +0000101 itrans.close()
102 otrans.close()
103
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000104class TThreadPoolServer(TServer):
105
106 """Server with a fixed size pool of threads which service requests."""
107
Aditya Agarwal5c468192007-02-06 01:14:33 +0000108 def __init__(self, *args):
109 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000110 self.clients = Queue.Queue()
111 self.threads = 10
112
Mark Slee4ce787f2006-10-24 18:54:06 +0000113 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000114 """Set the number of worker threads that should be created"""
115 self.threads = num
116
117 def serveThread(self):
118 """Loop around getting clients from the shared queue and process them."""
119 while True:
120 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000121 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000122 self.serveClient(client)
123 except Exception, x:
124 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
125
126 def serveClient(self, client):
127 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000128 itrans = self.inputTransportFactory.getTransport(client)
129 otrans = self.outputTransportFactory.getTransport(client)
130 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000131 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000132 try:
133 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000134 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000135 except TTransport.TTransportException, tx:
136 pass
137 except Exception, x:
138 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
139
Mark Slee4ac459f2006-10-25 21:39:01 +0000140 itrans.close()
141 otrans.close()
142
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000143 def serve(self):
144 """Start a fixed number of worker threads and put client into a queue"""
145 for i in range(self.threads):
146 try:
147 t = threading.Thread(target = self.serveThread)
148 t.start()
149 except Exception, x:
150 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
151
152 # Pump the socket for clients
153 self.serverTransport.listen()
154 while True:
155 try:
156 client = self.serverTransport.accept()
157 self.clients.put(client)
158 except Exception, x:
159 print '%s, %s, %s' % (type(x), x, traceback.format_exc())