blob: a44ab521e077045de42130c07414d5974097c7e5 [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#!/usr/bin/env python
2#
3# Copyright (c) 2006- Facebook
4# Distributed under the Thrift Software License
5#
6# See accompanying file LICENSE or visit the Thrift site at:
7# http://developers.facebook.com/thrift/
8
Mark Sleec98d0502006-09-06 02:42:25 +00009import sys
David Reiss66536542008-06-10 22:54:49 +000010import os
Mark Sleec98d0502006-09-06 02:42:25 +000011import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +000012import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +000013import Queue
Mark Sleec98d0502006-09-06 02:42:25 +000014
Mark Sleec9676562006-09-05 17:34:52 +000015from thrift.Thrift import TProcessor
16from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +000017from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +000018
19class TServer:
20
Mark Slee794993d2006-09-20 01:56:10 +000021 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000022
Aditya Agarwal5c468192007-02-06 01:14:33 +000023 """ 3 constructors for all servers:
24 1) (processor, serverTransport)
25 2) (processor, serverTransport, transportFactory, protocolFactory)
26 3) (processor, serverTransport,
27 inputTransportFactory, outputTransportFactory,
28 inputProtocolFactory, outputProtocolFactory)"""
29 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000030 if (len(args) == 2):
31 self.__initArgs__(args[0], args[1],
32 TTransport.TTransportFactoryBase(),
33 TTransport.TTransportFactoryBase(),
34 TBinaryProtocol.TBinaryProtocolFactory(),
35 TBinaryProtocol.TBinaryProtocolFactory())
36 elif (len(args) == 4):
37 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
38 elif (len(args) == 6):
39 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
40
41 def __initArgs__(self, processor, serverTransport,
42 inputTransportFactory, outputTransportFactory,
43 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000044 self.processor = processor
45 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000046 self.inputTransportFactory = inputTransportFactory
47 self.outputTransportFactory = outputTransportFactory
48 self.inputProtocolFactory = inputProtocolFactory
49 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000050
Mark Slee794993d2006-09-20 01:56:10 +000051 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000052 pass
53
54class TSimpleServer(TServer):
55
56 """Simple single-threaded server that just pumps around one transport."""
57
Aditya Agarwal5c468192007-02-06 01:14:33 +000058 def __init__(self, *args):
59 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000060
Mark Slee794993d2006-09-20 01:56:10 +000061 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000062 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000063 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000064 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000065 itrans = self.inputTransportFactory.getTransport(client)
66 otrans = self.outputTransportFactory.getTransport(client)
67 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000068 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000069 try:
70 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000071 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000072 except TTransport.TTransportException, tx:
73 pass
Mark Sleec9676562006-09-05 17:34:52 +000074 except Exception, x:
Mark Sleec98d0502006-09-06 02:42:25 +000075 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleed788b2e2006-09-07 01:26:35 +000076
Mark Slee4ac459f2006-10-25 21:39:01 +000077 itrans.close()
78 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000079
80class TThreadedServer(TServer):
81
82 """Threaded server that spawns a new thread per each connection."""
83
Aditya Agarwal5c468192007-02-06 01:14:33 +000084 def __init__(self, *args):
85 TServer.__init__(self, *args)
Mark Slee4f0fed62006-10-02 17:50:08 +000086
87 def serve(self):
88 self.serverTransport.listen()
89 while True:
90 try:
91 client = self.serverTransport.accept()
92 t = threading.Thread(target = self.handle, args=(client,))
93 t.start()
Mark Slee5299a952007-10-05 00:13:24 +000094 except KeyboardInterrupt:
95 raise
Mark Slee4f0fed62006-10-02 17:50:08 +000096 except Exception, x:
97 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
98
99 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000100 itrans = self.inputTransportFactory.getTransport(client)
101 otrans = self.outputTransportFactory.getTransport(client)
102 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000103 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000104 try:
105 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000106 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000107 except TTransport.TTransportException, tx:
108 pass
109 except Exception, x:
110 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000111
Mark Slee4ac459f2006-10-25 21:39:01 +0000112 itrans.close()
113 otrans.close()
114
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000115class TThreadPoolServer(TServer):
116
117 """Server with a fixed size pool of threads which service requests."""
118
Aditya Agarwal5c468192007-02-06 01:14:33 +0000119 def __init__(self, *args):
120 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000121 self.clients = Queue.Queue()
122 self.threads = 10
123
Mark Slee4ce787f2006-10-24 18:54:06 +0000124 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000125 """Set the number of worker threads that should be created"""
126 self.threads = num
127
128 def serveThread(self):
129 """Loop around getting clients from the shared queue and process them."""
130 while True:
131 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000132 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000133 self.serveClient(client)
134 except Exception, x:
135 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
David Reiss0c90f6f2008-02-06 22:18:40 +0000136
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000137 def serveClient(self, client):
138 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000139 itrans = self.inputTransportFactory.getTransport(client)
140 otrans = self.outputTransportFactory.getTransport(client)
141 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000142 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000143 try:
144 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000145 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000146 except TTransport.TTransportException, tx:
147 pass
148 except Exception, x:
149 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
150
Mark Slee4ac459f2006-10-25 21:39:01 +0000151 itrans.close()
152 otrans.close()
153
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000154 def serve(self):
155 """Start a fixed number of worker threads and put client into a queue"""
156 for i in range(self.threads):
157 try:
158 t = threading.Thread(target = self.serveThread)
159 t.start()
160 except Exception, x:
161 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
David Reiss0c90f6f2008-02-06 22:18:40 +0000162
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000163 # Pump the socket for clients
164 self.serverTransport.listen()
165 while True:
166 try:
167 client = self.serverTransport.accept()
168 self.clients.put(client)
169 except Exception, x:
170 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
David Reiss66536542008-06-10 22:54:49 +0000171
172
173
174class TForkingServer(TServer):
175
176 """A Thrift server that forks a new process for each request"""
177 """
178 This is more scalable than the threaded server as it does not cause
179 GIL contention.
180
181 Note that this has different semantics from the threading server.
182 Specifically, updates to shared variables will no longer be shared.
183 It will also not work on windows.
184
185 This code is heavily inspired by SocketServer.ForkingMixIn in the
186 Python stdlib.
187 """
188
189 def __init__(self, *args):
190 TServer.__init__(self, *args)
191 self.children = []
192
193 def serve(self):
194 self.serverTransport.listen()
195 while True:
196 client = self.serverTransport.accept()
197 try:
198 pid = os.fork()
199
200 if pid: # parent
201 # add before collect, otherwise you race w/ waitpid
202 self.children.append(pid)
203 self.collect_children()
204
205 else:
206 itrans = self.inputTransportFactory.getTransport(client)
207 otrans = self.outputTransportFactory.getTransport(client)
208
209 iprot = self.inputProtocolFactory.getProtocol(itrans)
210 oprot = self.outputProtocolFactory.getProtocol(otrans)
211
212 try:
213 while True:
214 self.processor.process(iprot, oprot)
215 except TTransport.TTransportException, tx:
216 pass
217 except Exception, e:
218 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
219 os._exit(1)
220
221 def try_close(file):
222 try:
223 file.close()
224 except IOError, e:
225 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
226
227 try_close(itrans)
228 try_close(otrans)
229 os._exit(0)
230
231 except TTransport.TTransportException, tx:
232 pass
233 except Exception, x:
234 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
235
236
237
238 def collect_children(self):
239 while self.children:
240 try:
241 pid, status = os.waitpid(0, os.WNOHANG)
242 except os.error:
243 pid = None
244
245 if pid:
246 self.children.remove(pid)
247 else:
248 break
249
250