blob: 6656f0db76f1f9f2020fdf3eda8d7de0298c257f [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001# Copyright (c) 2006- Facebook
2# Distributed under the Thrift Software License
3#
4# See accompanying file LICENSE or visit the Thrift site at:
5# http://developers.facebook.com/thrift/
6
David Reissb04df762008-06-10 22:55:38 +00007import logging
Mark Sleec98d0502006-09-06 02:42:25 +00008import sys
David Reiss66536542008-06-10 22:54:49 +00009import os
Mark Sleec98d0502006-09-06 02:42:25 +000010import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +000011import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +000012import Queue
Mark Sleec98d0502006-09-06 02:42:25 +000013
Mark Sleec9676562006-09-05 17:34:52 +000014from thrift.Thrift import TProcessor
15from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +000016from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +000017
18class TServer:
19
Mark Slee794993d2006-09-20 01:56:10 +000020 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000021
Aditya Agarwal5c468192007-02-06 01:14:33 +000022 """ 3 constructors for all servers:
23 1) (processor, serverTransport)
24 2) (processor, serverTransport, transportFactory, protocolFactory)
25 3) (processor, serverTransport,
26 inputTransportFactory, outputTransportFactory,
27 inputProtocolFactory, outputProtocolFactory)"""
28 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000029 if (len(args) == 2):
30 self.__initArgs__(args[0], args[1],
31 TTransport.TTransportFactoryBase(),
32 TTransport.TTransportFactoryBase(),
33 TBinaryProtocol.TBinaryProtocolFactory(),
34 TBinaryProtocol.TBinaryProtocolFactory())
35 elif (len(args) == 4):
36 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
37 elif (len(args) == 6):
38 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
39
40 def __initArgs__(self, processor, serverTransport,
41 inputTransportFactory, outputTransportFactory,
42 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000043 self.processor = processor
44 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000045 self.inputTransportFactory = inputTransportFactory
46 self.outputTransportFactory = outputTransportFactory
47 self.inputProtocolFactory = inputProtocolFactory
48 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000049
Mark Slee794993d2006-09-20 01:56:10 +000050 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000051 pass
52
53class TSimpleServer(TServer):
54
55 """Simple single-threaded server that just pumps around one transport."""
56
Aditya Agarwal5c468192007-02-06 01:14:33 +000057 def __init__(self, *args):
58 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000059
Mark Slee794993d2006-09-20 01:56:10 +000060 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000061 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000062 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000063 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000064 itrans = self.inputTransportFactory.getTransport(client)
65 otrans = self.outputTransportFactory.getTransport(client)
66 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000067 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000068 try:
69 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000070 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000071 except TTransport.TTransportException, tx:
72 pass
Mark Sleec9676562006-09-05 17:34:52 +000073 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +000074 logging.exception(x)
Mark Sleed788b2e2006-09-07 01:26:35 +000075
Mark Slee4ac459f2006-10-25 21:39:01 +000076 itrans.close()
77 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000078
79class TThreadedServer(TServer):
80
81 """Threaded server that spawns a new thread per each connection."""
82
Aditya Agarwal5c468192007-02-06 01:14:33 +000083 def __init__(self, *args):
84 TServer.__init__(self, *args)
Mark Slee4f0fed62006-10-02 17:50:08 +000085
86 def serve(self):
87 self.serverTransport.listen()
88 while True:
89 try:
90 client = self.serverTransport.accept()
91 t = threading.Thread(target = self.handle, args=(client,))
92 t.start()
Mark Slee5299a952007-10-05 00:13:24 +000093 except KeyboardInterrupt:
94 raise
Mark Slee4f0fed62006-10-02 17:50:08 +000095 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +000096 logging.exception(x)
Mark Slee4f0fed62006-10-02 17:50:08 +000097
98 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +000099 itrans = self.inputTransportFactory.getTransport(client)
100 otrans = self.outputTransportFactory.getTransport(client)
101 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000102 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000103 try:
104 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000105 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000106 except TTransport.TTransportException, tx:
107 pass
108 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000109 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000110
Mark Slee4ac459f2006-10-25 21:39:01 +0000111 itrans.close()
112 otrans.close()
113
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000114class TThreadPoolServer(TServer):
115
116 """Server with a fixed size pool of threads which service requests."""
117
Aditya Agarwal5c468192007-02-06 01:14:33 +0000118 def __init__(self, *args):
119 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000120 self.clients = Queue.Queue()
121 self.threads = 10
122
Mark Slee4ce787f2006-10-24 18:54:06 +0000123 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000124 """Set the number of worker threads that should be created"""
125 self.threads = num
126
127 def serveThread(self):
128 """Loop around getting clients from the shared queue and process them."""
129 while True:
130 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000131 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000132 self.serveClient(client)
133 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000134 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000135
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000136 def serveClient(self, client):
137 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000138 itrans = self.inputTransportFactory.getTransport(client)
139 otrans = self.outputTransportFactory.getTransport(client)
140 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000141 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000142 try:
143 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000144 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000145 except TTransport.TTransportException, tx:
146 pass
147 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000148 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000149
Mark Slee4ac459f2006-10-25 21:39:01 +0000150 itrans.close()
151 otrans.close()
152
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000153 def serve(self):
154 """Start a fixed number of worker threads and put client into a queue"""
155 for i in range(self.threads):
156 try:
157 t = threading.Thread(target = self.serveThread)
158 t.start()
159 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000160 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000161
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000162 # Pump the socket for clients
163 self.serverTransport.listen()
164 while True:
165 try:
166 client = self.serverTransport.accept()
167 self.clients.put(client)
168 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000169 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000170
171
David Reiss66536542008-06-10 22:54:49 +0000172class TForkingServer(TServer):
173
174 """A Thrift server that forks a new process for each request"""
175 """
176 This is more scalable than the threaded server as it does not cause
177 GIL contention.
178
179 Note that this has different semantics from the threading server.
180 Specifically, updates to shared variables will no longer be shared.
181 It will also not work on windows.
182
183 This code is heavily inspired by SocketServer.ForkingMixIn in the
184 Python stdlib.
185 """
186
187 def __init__(self, *args):
188 TServer.__init__(self, *args)
189 self.children = []
190
191 def serve(self):
David Reissbcaa2ad2008-06-10 22:55:26 +0000192 def try_close(file):
193 try:
194 file.close()
195 except IOError, e:
David Reissb04df762008-06-10 22:55:38 +0000196 logging.warning(e, exc_info=True)
David Reissbcaa2ad2008-06-10 22:55:26 +0000197
198
David Reiss66536542008-06-10 22:54:49 +0000199 self.serverTransport.listen()
200 while True:
201 client = self.serverTransport.accept()
202 try:
203 pid = os.fork()
204
205 if pid: # parent
206 # add before collect, otherwise you race w/ waitpid
207 self.children.append(pid)
208 self.collect_children()
209
David Reissbcaa2ad2008-06-10 22:55:26 +0000210 # Parent must close socket or the connection may not get
211 # closed promptly
212 itrans = self.inputTransportFactory.getTransport(client)
213 otrans = self.outputTransportFactory.getTransport(client)
214 try_close(itrans)
215 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000216 else:
217 itrans = self.inputTransportFactory.getTransport(client)
218 otrans = self.outputTransportFactory.getTransport(client)
219
220 iprot = self.inputProtocolFactory.getProtocol(itrans)
221 oprot = self.outputProtocolFactory.getProtocol(otrans)
222
David Reissbcaa2ad2008-06-10 22:55:26 +0000223 ecode = 0
David Reiss66536542008-06-10 22:54:49 +0000224 try:
Kevin Clark1e0744d2008-06-24 20:46:32 +0000225 try:
226 while True:
227 self.processor.process(iprot, oprot)
228 except TTransport.TTransportException, tx:
229 pass
230 except Exception, e:
231 logging.exception(e)
232 ecode = 1
David Reissbcaa2ad2008-06-10 22:55:26 +0000233 finally:
234 try_close(itrans)
235 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000236
David Reissbcaa2ad2008-06-10 22:55:26 +0000237 os._exit(ecode)
David Reiss66536542008-06-10 22:54:49 +0000238
239 except TTransport.TTransportException, tx:
240 pass
241 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000242 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000243
244
David Reiss66536542008-06-10 22:54:49 +0000245 def collect_children(self):
246 while self.children:
247 try:
248 pid, status = os.waitpid(0, os.WNOHANG)
249 except os.error:
250 pid = None
251
252 if pid:
253 self.children.remove(pid)
254 else:
255 break
256
257