blob: 48a4fcbe6449e8edb158ed07aa1c1d5156f98290 [file] [log] [blame]
Mark Sleec98d0502006-09-06 02:42:25 +00001import sys
2import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +00003import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +00004import Queue
Mark Sleec98d0502006-09-06 02:42:25 +00005
Mark Sleec9676562006-09-05 17:34:52 +00006from thrift.Thrift import TProcessor
7from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +00008from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +00009
10class TServer:
11
Mark Slee794993d2006-09-20 01:56:10 +000012 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000013
Aditya Agarwal5c468192007-02-06 01:14:33 +000014 """ 3 constructors for all servers:
15 1) (processor, serverTransport)
16 2) (processor, serverTransport, transportFactory, protocolFactory)
17 3) (processor, serverTransport,
18 inputTransportFactory, outputTransportFactory,
19 inputProtocolFactory, outputProtocolFactory)"""
20 def __init__(self, *args):
21 print args
22 if (len(args) == 2):
23 self.__initArgs__(args[0], args[1],
24 TTransport.TTransportFactoryBase(),
25 TTransport.TTransportFactoryBase(),
26 TBinaryProtocol.TBinaryProtocolFactory(),
27 TBinaryProtocol.TBinaryProtocolFactory())
28 elif (len(args) == 4):
29 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
30 elif (len(args) == 6):
31 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
32
33 def __initArgs__(self, processor, serverTransport,
34 inputTransportFactory, outputTransportFactory,
35 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000036 self.processor = processor
37 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000038 self.inputTransportFactory = inputTransportFactory
39 self.outputTransportFactory = outputTransportFactory
40 self.inputProtocolFactory = inputProtocolFactory
41 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000042
Mark Slee794993d2006-09-20 01:56:10 +000043 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000044 pass
45
46class TSimpleServer(TServer):
47
48 """Simple single-threaded server that just pumps around one transport."""
49
Aditya Agarwal5c468192007-02-06 01:14:33 +000050 def __init__(self, *args):
51 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000052
Mark Slee794993d2006-09-20 01:56:10 +000053 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000054 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000055 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000056 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000057 itrans = self.inputTransportFactory.getTransport(client)
58 otrans = self.outputTransportFactory.getTransport(client)
59 iprot = self.inputProtocolFactory.getProtocol(itrans)
60 oprot = self.oututProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000061 try:
62 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000063 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000064 except TTransport.TTransportException, tx:
65 pass
Mark Sleec9676562006-09-05 17:34:52 +000066 except Exception, x:
Mark Sleec98d0502006-09-06 02:42:25 +000067 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleed788b2e2006-09-07 01:26:35 +000068
Mark Slee4ac459f2006-10-25 21:39:01 +000069 itrans.close()
70 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000071
72class TThreadedServer(TServer):
73
74 """Threaded server that spawns a new thread per each connection."""
75
Aditya Agarwal5c468192007-02-06 01:14:33 +000076 def __init__(self, *args):
77 TServer.__init__(self, *args)
Mark Slee4f0fed62006-10-02 17:50:08 +000078
79 def serve(self):
80 self.serverTransport.listen()
81 while True:
82 try:
83 client = self.serverTransport.accept()
84 t = threading.Thread(target = self.handle, args=(client,))
85 t.start()
86 except Exception, x:
87 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
88
89 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +000090 itrans = self.inputTransportFactory.getTransport(client)
91 otrans = self.outputTransportFactory.getTransport(client)
92 iprot = self.inputProtocolFactory.getProtocol(itrans)
93 oprot = self.oututProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +000094 try:
95 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000096 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000097 except TTransport.TTransportException, tx:
98 pass
99 except Exception, x:
100 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000101
Mark Slee4ac459f2006-10-25 21:39:01 +0000102 itrans.close()
103 otrans.close()
104
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000105class TThreadPoolServer(TServer):
106
107 """Server with a fixed size pool of threads which service requests."""
108
Aditya Agarwal5c468192007-02-06 01:14:33 +0000109 def __init__(self, *args):
110 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000111 self.clients = Queue.Queue()
112 self.threads = 10
113
Mark Slee4ce787f2006-10-24 18:54:06 +0000114 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000115 """Set the number of worker threads that should be created"""
116 self.threads = num
117
118 def serveThread(self):
119 """Loop around getting clients from the shared queue and process them."""
120 while True:
121 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000122 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000123 self.serveClient(client)
124 except Exception, x:
125 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
126
127 def serveClient(self, client):
128 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000129 itrans = self.inputTransportFactory.getTransport(client)
130 otrans = self.outputTransportFactory.getTransport(client)
131 iprot = self.inputProtocolFactory.getProtocol(itrans)
132 oprot = self.oututProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000133 try:
134 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000135 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000136 except TTransport.TTransportException, tx:
137 pass
138 except Exception, x:
139 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
140
Mark Slee4ac459f2006-10-25 21:39:01 +0000141 itrans.close()
142 otrans.close()
143
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000144 def serve(self):
145 """Start a fixed number of worker threads and put client into a queue"""
146 for i in range(self.threads):
147 try:
148 t = threading.Thread(target = self.serveThread)
149 t.start()
150 except Exception, x:
151 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
152
153 # Pump the socket for clients
154 self.serverTransport.listen()
155 while True:
156 try:
157 client = self.serverTransport.accept()
158 self.clients.put(client)
159 except Exception, x:
160 print '%s, %s, %s' % (type(x), x, traceback.format_exc())