blob: d5739959ab2e3efe843b1b8e25fb36245672607f [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#!/usr/bin/env python
2#
3# Copyright (c) 2006- Facebook
4# Distributed under the Thrift Software License
5#
6# See accompanying file LICENSE or visit the Thrift site at:
7# http://developers.facebook.com/thrift/
8
David Reissb04df762008-06-10 22:55:38 +00009import logging
Mark Sleec98d0502006-09-06 02:42:25 +000010import sys
David Reiss66536542008-06-10 22:54:49 +000011import os
Mark Sleec98d0502006-09-06 02:42:25 +000012import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +000013import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +000014import Queue
Mark Sleec98d0502006-09-06 02:42:25 +000015
Mark Sleec9676562006-09-05 17:34:52 +000016from thrift.Thrift import TProcessor
17from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +000018from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +000019
20class TServer:
21
Mark Slee794993d2006-09-20 01:56:10 +000022 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000023
Aditya Agarwal5c468192007-02-06 01:14:33 +000024 """ 3 constructors for all servers:
25 1) (processor, serverTransport)
26 2) (processor, serverTransport, transportFactory, protocolFactory)
27 3) (processor, serverTransport,
28 inputTransportFactory, outputTransportFactory,
29 inputProtocolFactory, outputProtocolFactory)"""
30 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000031 if (len(args) == 2):
32 self.__initArgs__(args[0], args[1],
33 TTransport.TTransportFactoryBase(),
34 TTransport.TTransportFactoryBase(),
35 TBinaryProtocol.TBinaryProtocolFactory(),
36 TBinaryProtocol.TBinaryProtocolFactory())
37 elif (len(args) == 4):
38 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
39 elif (len(args) == 6):
40 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
41
42 def __initArgs__(self, processor, serverTransport,
43 inputTransportFactory, outputTransportFactory,
44 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000045 self.processor = processor
46 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000047 self.inputTransportFactory = inputTransportFactory
48 self.outputTransportFactory = outputTransportFactory
49 self.inputProtocolFactory = inputProtocolFactory
50 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000051
Mark Slee794993d2006-09-20 01:56:10 +000052 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000053 pass
54
55class TSimpleServer(TServer):
56
57 """Simple single-threaded server that just pumps around one transport."""
58
Aditya Agarwal5c468192007-02-06 01:14:33 +000059 def __init__(self, *args):
60 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000061
Mark Slee794993d2006-09-20 01:56:10 +000062 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000063 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000064 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000065 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000066 itrans = self.inputTransportFactory.getTransport(client)
67 otrans = self.outputTransportFactory.getTransport(client)
68 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000069 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000070 try:
71 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000072 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000073 except TTransport.TTransportException, tx:
74 pass
Mark Sleec9676562006-09-05 17:34:52 +000075 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +000076 logging.exception(x)
Mark Sleed788b2e2006-09-07 01:26:35 +000077
Mark Slee4ac459f2006-10-25 21:39:01 +000078 itrans.close()
79 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000080
81class TThreadedServer(TServer):
82
83 """Threaded server that spawns a new thread per each connection."""
84
Aditya Agarwal5c468192007-02-06 01:14:33 +000085 def __init__(self, *args):
86 TServer.__init__(self, *args)
Mark Slee4f0fed62006-10-02 17:50:08 +000087
88 def serve(self):
89 self.serverTransport.listen()
90 while True:
91 try:
92 client = self.serverTransport.accept()
93 t = threading.Thread(target = self.handle, args=(client,))
94 t.start()
Mark Slee5299a952007-10-05 00:13:24 +000095 except KeyboardInterrupt:
96 raise
Mark Slee4f0fed62006-10-02 17:50:08 +000097 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +000098 logging.exception(x)
Mark Slee4f0fed62006-10-02 17:50:08 +000099
100 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000101 itrans = self.inputTransportFactory.getTransport(client)
102 otrans = self.outputTransportFactory.getTransport(client)
103 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000104 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000105 try:
106 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000107 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000108 except TTransport.TTransportException, tx:
109 pass
110 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000111 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000112
Mark Slee4ac459f2006-10-25 21:39:01 +0000113 itrans.close()
114 otrans.close()
115
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000116class TThreadPoolServer(TServer):
117
118 """Server with a fixed size pool of threads which service requests."""
119
Aditya Agarwal5c468192007-02-06 01:14:33 +0000120 def __init__(self, *args):
121 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000122 self.clients = Queue.Queue()
123 self.threads = 10
124
Mark Slee4ce787f2006-10-24 18:54:06 +0000125 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000126 """Set the number of worker threads that should be created"""
127 self.threads = num
128
129 def serveThread(self):
130 """Loop around getting clients from the shared queue and process them."""
131 while True:
132 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000133 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000134 self.serveClient(client)
135 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000136 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000137
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000138 def serveClient(self, client):
139 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000140 itrans = self.inputTransportFactory.getTransport(client)
141 otrans = self.outputTransportFactory.getTransport(client)
142 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000143 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000144 try:
145 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000146 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000147 except TTransport.TTransportException, tx:
148 pass
149 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000150 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000151
Mark Slee4ac459f2006-10-25 21:39:01 +0000152 itrans.close()
153 otrans.close()
154
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000155 def serve(self):
156 """Start a fixed number of worker threads and put client into a queue"""
157 for i in range(self.threads):
158 try:
159 t = threading.Thread(target = self.serveThread)
160 t.start()
161 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000162 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000163
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000164 # Pump the socket for clients
165 self.serverTransport.listen()
166 while True:
167 try:
168 client = self.serverTransport.accept()
169 self.clients.put(client)
170 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000171 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000172
173
David Reiss66536542008-06-10 22:54:49 +0000174class TForkingServer(TServer):
175
176 """A Thrift server that forks a new process for each request"""
177 """
178 This is more scalable than the threaded server as it does not cause
179 GIL contention.
180
181 Note that this has different semantics from the threading server.
182 Specifically, updates to shared variables will no longer be shared.
183 It will also not work on windows.
184
185 This code is heavily inspired by SocketServer.ForkingMixIn in the
186 Python stdlib.
187 """
188
189 def __init__(self, *args):
190 TServer.__init__(self, *args)
191 self.children = []
192
193 def serve(self):
David Reissbcaa2ad2008-06-10 22:55:26 +0000194 def try_close(file):
195 try:
196 file.close()
197 except IOError, e:
David Reissb04df762008-06-10 22:55:38 +0000198 logging.warning(e, exc_info=True)
David Reissbcaa2ad2008-06-10 22:55:26 +0000199
200
David Reiss66536542008-06-10 22:54:49 +0000201 self.serverTransport.listen()
202 while True:
203 client = self.serverTransport.accept()
204 try:
205 pid = os.fork()
206
207 if pid: # parent
208 # add before collect, otherwise you race w/ waitpid
209 self.children.append(pid)
210 self.collect_children()
211
David Reissbcaa2ad2008-06-10 22:55:26 +0000212 # Parent must close socket or the connection may not get
213 # closed promptly
214 itrans = self.inputTransportFactory.getTransport(client)
215 otrans = self.outputTransportFactory.getTransport(client)
216 try_close(itrans)
217 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000218 else:
219 itrans = self.inputTransportFactory.getTransport(client)
220 otrans = self.outputTransportFactory.getTransport(client)
221
222 iprot = self.inputProtocolFactory.getProtocol(itrans)
223 oprot = self.outputProtocolFactory.getProtocol(otrans)
224
David Reissbcaa2ad2008-06-10 22:55:26 +0000225 ecode = 0
David Reiss66536542008-06-10 22:54:49 +0000226 try:
Kevin Clark1e0744d2008-06-24 20:46:32 +0000227 try:
228 while True:
229 self.processor.process(iprot, oprot)
230 except TTransport.TTransportException, tx:
231 pass
232 except Exception, e:
233 logging.exception(e)
234 ecode = 1
David Reissbcaa2ad2008-06-10 22:55:26 +0000235 finally:
236 try_close(itrans)
237 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000238
David Reissbcaa2ad2008-06-10 22:55:26 +0000239 os._exit(ecode)
David Reiss66536542008-06-10 22:54:49 +0000240
241 except TTransport.TTransportException, tx:
242 pass
243 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000244 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000245
246
David Reiss66536542008-06-10 22:54:49 +0000247 def collect_children(self):
248 while self.children:
249 try:
250 pid, status = os.waitpid(0, os.WNOHANG)
251 except os.error:
252 pid = None
253
254 if pid:
255 self.children.remove(pid)
256 else:
257 break
258
259