blob: 56ee9c08066610f861e9800175c686e6a35dc664 [file] [log] [blame]
Mark Sleec98d0502006-09-06 02:42:25 +00001import sys
2import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +00003import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +00004import Queue
Mark Sleec98d0502006-09-06 02:42:25 +00005
Mark Sleec9676562006-09-05 17:34:52 +00006from thrift.Thrift import TProcessor
7from thrift.transport import TTransport
8
9class TServer:
10
Mark Slee794993d2006-09-20 01:56:10 +000011 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000012
Mark Sleed788b2e2006-09-07 01:26:35 +000013 def __init__(self, processor, serverTransport, transportFactory=None):
14 self.processor = processor
15 self.serverTransport = serverTransport
16 if transportFactory == None:
17 self.transportFactory = TTransport.TTransportFactoryBase()
18 else:
19 self.transportFactory = transportFactory
Mark Sleec9676562006-09-05 17:34:52 +000020
Mark Slee794993d2006-09-20 01:56:10 +000021 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000022 pass
23
24class TSimpleServer(TServer):
25
26 """Simple single-threaded server that just pumps around one transport."""
27
Mark Sleed788b2e2006-09-07 01:26:35 +000028 def __init__(self, processor, serverTransport, transportFactory=None):
29 TServer.__init__(self, processor, serverTransport, transportFactory)
Mark Sleec9676562006-09-05 17:34:52 +000030
Mark Slee794993d2006-09-20 01:56:10 +000031 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000032 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000033 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000034 client = self.serverTransport.accept()
35 (input, output) = self.transportFactory.getIOTransports(client)
Mark Sleec9676562006-09-05 17:34:52 +000036 try:
37 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000038 self.processor.process(input, output)
Mark Slee4f0fed62006-10-02 17:50:08 +000039 except TTransport.TTransportException, tx:
40 pass
Mark Sleec9676562006-09-05 17:34:52 +000041 except Exception, x:
Mark Sleec98d0502006-09-06 02:42:25 +000042 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleed788b2e2006-09-07 01:26:35 +000043
44 input.close()
45 output.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000046
47class TThreadedServer(TServer):
48
49 """Threaded server that spawns a new thread per each connection."""
50
51 def __init__(self, processor, serverTransport, transportFactory=None):
52 TServer.__init__(self, processor, serverTransport, transportFactory)
53
54 def serve(self):
55 self.serverTransport.listen()
56 while True:
57 try:
58 client = self.serverTransport.accept()
59 t = threading.Thread(target = self.handle, args=(client,))
60 t.start()
61 except Exception, x:
62 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
63
64 def handle(self, client):
65 (input, output) = self.transportFactory.getIOTransports(client)
66 try:
67 while True:
68 self.processor.process(input, output)
69 except TTransport.TTransportException, tx:
70 pass
71 except Exception, x:
72 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleeb90aa7c2006-10-24 18:49:45 +000073
74class TThreadPoolServer(TServer):
75
76 """Server with a fixed size pool of threads which service requests."""
77
78 def __init__(self, processor, serverTransport, transportFactory=None):
79 TServer.__init__(self, processor, serverTransport, transportFactory)
80 self.clients = Queue.Queue()
81 self.threads = 10
82
Mark Slee4ce787f2006-10-24 18:54:06 +000083 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +000084 """Set the number of worker threads that should be created"""
85 self.threads = num
86
87 def serveThread(self):
88 """Loop around getting clients from the shared queue and process them."""
89 while True:
90 try:
Mark Slee9a695ba2006-10-24 18:55:36 +000091 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +000092 self.serveClient(client)
93 except Exception, x:
94 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
95
96 def serveClient(self, client):
97 """Process input/output from a client for as long as possible"""
98 (input, output) = self.transportFactory.getIOTransports(client)
99 try:
100 while True:
101 self.processor.process(input, output)
102 except TTransport.TTransportException, tx:
103 pass
104 except Exception, x:
105 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
106
107 def serve(self):
108 """Start a fixed number of worker threads and put client into a queue"""
109 for i in range(self.threads):
110 try:
111 t = threading.Thread(target = self.serveThread)
112 t.start()
113 except Exception, x:
114 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
115
116 # Pump the socket for clients
117 self.serverTransport.listen()
118 while True:
119 try:
120 client = self.serverTransport.accept()
121 self.clients.put(client)
122 except Exception, x:
123 print '%s, %s, %s' % (type(x), x, traceback.format_exc())