blob: 7514264211638a2b631945afbaab5c92e0a6ee75 [file] [log] [blame]
Mark Sleec98d0502006-09-06 02:42:25 +00001import sys
2import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +00003import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +00004import Queue
Mark Sleec98d0502006-09-06 02:42:25 +00005
Mark Sleec9676562006-09-05 17:34:52 +00006from thrift.Thrift import TProcessor
7from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +00008from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +00009
10class TServer:
11
Mark Slee794993d2006-09-20 01:56:10 +000012 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000013
Mark Slee4ac459f2006-10-25 21:39:01 +000014 def __init__(self, processor, serverTransport, transportFactory=None, protocolFactory=None):
Mark Sleed788b2e2006-09-07 01:26:35 +000015 self.processor = processor
16 self.serverTransport = serverTransport
17 if transportFactory == None:
18 self.transportFactory = TTransport.TTransportFactoryBase()
19 else:
20 self.transportFactory = transportFactory
Mark Slee4ac459f2006-10-25 21:39:01 +000021 if protocolFactory == None:
22 self.protocolFactory = TBinaryProtocol.TBinaryProtocolFactory()
23 else:
24 self.protocolFactory = protocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000025
Mark Slee794993d2006-09-20 01:56:10 +000026 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000027 pass
28
29class TSimpleServer(TServer):
30
31 """Simple single-threaded server that just pumps around one transport."""
32
Mark Slee4ac459f2006-10-25 21:39:01 +000033 def __init__(self, processor, serverTransport, transportFactory=None, protocolFactory=None):
34 TServer.__init__(self, processor, serverTransport, transportFactory, protocolFactory)
Mark Sleec9676562006-09-05 17:34:52 +000035
Mark Slee794993d2006-09-20 01:56:10 +000036 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000037 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000038 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000039 client = self.serverTransport.accept()
Mark Slee4ac459f2006-10-25 21:39:01 +000040 (itrans, otrans) = self.transportFactory.getIOTransports(client)
41 (iprot, oprot) = self.protocolFactory.getIOProtocols(itrans, otrans)
Mark Sleec9676562006-09-05 17:34:52 +000042 try:
43 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000044 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000045 except TTransport.TTransportException, tx:
46 pass
Mark Sleec9676562006-09-05 17:34:52 +000047 except Exception, x:
Mark Sleec98d0502006-09-06 02:42:25 +000048 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleed788b2e2006-09-07 01:26:35 +000049
Mark Slee4ac459f2006-10-25 21:39:01 +000050 itrans.close()
51 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000052
53class TThreadedServer(TServer):
54
55 """Threaded server that spawns a new thread per each connection."""
56
Mark Slee4ac459f2006-10-25 21:39:01 +000057 def __init__(self, processor, serverTransport, transportFactory=None, protocolFactory=None):
58 TServer.__init__(self, processor, serverTransport, transportFactory, protocolFactory)
Mark Slee4f0fed62006-10-02 17:50:08 +000059
60 def serve(self):
61 self.serverTransport.listen()
62 while True:
63 try:
64 client = self.serverTransport.accept()
65 t = threading.Thread(target = self.handle, args=(client,))
66 t.start()
67 except Exception, x:
68 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
69
70 def handle(self, client):
Mark Slee4ac459f2006-10-25 21:39:01 +000071 (itrans, otrans) = self.transportFactory.getIOTransports(client)
72 (iprot, oprot) = self.protocolFactory.getIOProtocols(itrans, otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +000073 try:
74 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000075 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000076 except TTransport.TTransportException, tx:
77 pass
78 except Exception, x:
79 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleeb90aa7c2006-10-24 18:49:45 +000080
Mark Slee4ac459f2006-10-25 21:39:01 +000081 itrans.close()
82 otrans.close()
83
Mark Sleeb90aa7c2006-10-24 18:49:45 +000084class TThreadPoolServer(TServer):
85
86 """Server with a fixed size pool of threads which service requests."""
87
88 def __init__(self, processor, serverTransport, transportFactory=None):
89 TServer.__init__(self, processor, serverTransport, transportFactory)
90 self.clients = Queue.Queue()
91 self.threads = 10
92
Mark Slee4ce787f2006-10-24 18:54:06 +000093 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +000094 """Set the number of worker threads that should be created"""
95 self.threads = num
96
97 def serveThread(self):
98 """Loop around getting clients from the shared queue and process them."""
99 while True:
100 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000101 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000102 self.serveClient(client)
103 except Exception, x:
104 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
105
106 def serveClient(self, client):
107 """Process input/output from a client for as long as possible"""
Mark Slee4ac459f2006-10-25 21:39:01 +0000108 (itrans, otrans) = self.transportFactory.getIOTransports(client)
109 (iprot, oprot) = self.protocolFactory.getIOProtocols(itrans, otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000110 try:
111 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000112 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000113 except TTransport.TTransportException, tx:
114 pass
115 except Exception, x:
116 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
117
Mark Slee4ac459f2006-10-25 21:39:01 +0000118 itrans.close()
119 otrans.close()
120
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000121 def serve(self):
122 """Start a fixed number of worker threads and put client into a queue"""
123 for i in range(self.threads):
124 try:
125 t = threading.Thread(target = self.serveThread)
126 t.start()
127 except Exception, x:
128 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
129
130 # Pump the socket for clients
131 self.serverTransport.listen()
132 while True:
133 try:
134 client = self.serverTransport.accept()
135 self.clients.put(client)
136 except Exception, x:
137 print '%s, %s, %s' % (type(x), x, traceback.format_exc())