blob: 97097cc4ab42f2334b40f91f87d44ace1e52fcfa [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#
David Reissea2cba82009-03-30 21:35:00 +00002# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
Mark Slee89e2bb82007-03-01 00:20:36 +000019
Mark Sleeb90aa7c2006-10-24 18:49:45 +000020import Queue
Bryan Duxbury69720412012-01-03 17:32:30 +000021import logging
22import os
23import sys
24import threading
25import traceback
Mark Sleec98d0502006-09-06 02:42:25 +000026
Mark Sleec9676562006-09-05 17:34:52 +000027from thrift.Thrift import TProcessor
Mark Slee4ac459f2006-10-25 21:39:01 +000028from thrift.protocol import TBinaryProtocol
Bryan Duxbury69720412012-01-03 17:32:30 +000029from thrift.transport import TTransport
30
Mark Sleec9676562006-09-05 17:34:52 +000031
32class TServer:
Bryan Duxbury69720412012-01-03 17:32:30 +000033 """Base interface for a server, which must have a serve() method.
Mark Sleec9676562006-09-05 17:34:52 +000034
Bryan Duxbury69720412012-01-03 17:32:30 +000035 Three constructors for all servers:
Aditya Agarwal5c468192007-02-06 01:14:33 +000036 1) (processor, serverTransport)
37 2) (processor, serverTransport, transportFactory, protocolFactory)
38 3) (processor, serverTransport,
39 inputTransportFactory, outputTransportFactory,
Bryan Duxbury69720412012-01-03 17:32:30 +000040 inputProtocolFactory, outputProtocolFactory)
41 """
Aditya Agarwal5c468192007-02-06 01:14:33 +000042 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000043 if (len(args) == 2):
44 self.__initArgs__(args[0], args[1],
45 TTransport.TTransportFactoryBase(),
46 TTransport.TTransportFactoryBase(),
47 TBinaryProtocol.TBinaryProtocolFactory(),
48 TBinaryProtocol.TBinaryProtocolFactory())
49 elif (len(args) == 4):
50 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
51 elif (len(args) == 6):
52 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
53
54 def __initArgs__(self, processor, serverTransport,
55 inputTransportFactory, outputTransportFactory,
56 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000057 self.processor = processor
58 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000059 self.inputTransportFactory = inputTransportFactory
60 self.outputTransportFactory = outputTransportFactory
61 self.inputProtocolFactory = inputProtocolFactory
62 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000063
Mark Slee794993d2006-09-20 01:56:10 +000064 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000065 pass
66
Mark Sleec9676562006-09-05 17:34:52 +000067
Bryan Duxbury69720412012-01-03 17:32:30 +000068class TSimpleServer(TServer):
Mark Sleec9676562006-09-05 17:34:52 +000069 """Simple single-threaded server that just pumps around one transport."""
70
Aditya Agarwal5c468192007-02-06 01:14:33 +000071 def __init__(self, *args):
72 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000073
Mark Slee794993d2006-09-20 01:56:10 +000074 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000075 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000076 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000077 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000078 itrans = self.inputTransportFactory.getTransport(client)
79 otrans = self.outputTransportFactory.getTransport(client)
80 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000081 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000082 try:
83 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000084 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000085 except TTransport.TTransportException, tx:
86 pass
Jake Farrell6c5ee6a2012-05-18 01:00:39 +000087 except Exception as x:
David Reissb04df762008-06-10 22:55:38 +000088 logging.exception(x)
Mark Sleed788b2e2006-09-07 01:26:35 +000089
Mark Slee4ac459f2006-10-25 21:39:01 +000090 itrans.close()
91 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000092
Mark Slee4f0fed62006-10-02 17:50:08 +000093
Bryan Duxbury69720412012-01-03 17:32:30 +000094class TThreadedServer(TServer):
Mark Slee4f0fed62006-10-02 17:50:08 +000095 """Threaded server that spawns a new thread per each connection."""
96
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +000097 def __init__(self, *args, **kwargs):
Aditya Agarwal5c468192007-02-06 01:14:33 +000098 TServer.__init__(self, *args)
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +000099 self.daemon = kwargs.get("daemon", False)
Mark Slee4f0fed62006-10-02 17:50:08 +0000100
101 def serve(self):
102 self.serverTransport.listen()
103 while True:
104 try:
105 client = self.serverTransport.accept()
Bryan Duxbury69720412012-01-03 17:32:30 +0000106 t = threading.Thread(target=self.handle, args=(client,))
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000107 t.setDaemon(self.daemon)
Mark Slee4f0fed62006-10-02 17:50:08 +0000108 t.start()
Mark Slee5299a952007-10-05 00:13:24 +0000109 except KeyboardInterrupt:
110 raise
Jake Farrell6c5ee6a2012-05-18 01:00:39 +0000111 except Exception as x:
David Reissb04df762008-06-10 22:55:38 +0000112 logging.exception(x)
Mark Slee4f0fed62006-10-02 17:50:08 +0000113
114 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000115 itrans = self.inputTransportFactory.getTransport(client)
116 otrans = self.outputTransportFactory.getTransport(client)
117 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000118 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000119 try:
120 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000121 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000122 except TTransport.TTransportException, tx:
123 pass
Jake Farrell6c5ee6a2012-05-18 01:00:39 +0000124 except Exception as x:
David Reissb04df762008-06-10 22:55:38 +0000125 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000126
Mark Slee4ac459f2006-10-25 21:39:01 +0000127 itrans.close()
128 otrans.close()
129
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000130
Bryan Duxbury69720412012-01-03 17:32:30 +0000131class TThreadPoolServer(TServer):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000132 """Server with a fixed size pool of threads which service requests."""
133
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000134 def __init__(self, *args, **kwargs):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000135 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000136 self.clients = Queue.Queue()
137 self.threads = 10
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000138 self.daemon = kwargs.get("daemon", False)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000139
Mark Slee4ce787f2006-10-24 18:54:06 +0000140 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000141 """Set the number of worker threads that should be created"""
142 self.threads = num
143
144 def serveThread(self):
145 """Loop around getting clients from the shared queue and process them."""
146 while True:
147 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000148 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000149 self.serveClient(client)
150 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000151 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000152
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000153 def serveClient(self, client):
154 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000155 itrans = self.inputTransportFactory.getTransport(client)
156 otrans = self.outputTransportFactory.getTransport(client)
157 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000158 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000159 try:
160 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000161 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000162 except TTransport.TTransportException, tx:
163 pass
Jake Farrell6c5ee6a2012-05-18 01:00:39 +0000164 except Exception as x:
David Reissb04df762008-06-10 22:55:38 +0000165 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000166
Mark Slee4ac459f2006-10-25 21:39:01 +0000167 itrans.close()
168 otrans.close()
169
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000170 def serve(self):
171 """Start a fixed number of worker threads and put client into a queue"""
172 for i in range(self.threads):
173 try:
Bryan Duxbury69720412012-01-03 17:32:30 +0000174 t = threading.Thread(target=self.serveThread)
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000175 t.setDaemon(self.daemon)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000176 t.start()
Jake Farrell6c5ee6a2012-05-18 01:00:39 +0000177 except Exception as x:
David Reissb04df762008-06-10 22:55:38 +0000178 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000179
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000180 # Pump the socket for clients
181 self.serverTransport.listen()
182 while True:
183 try:
184 client = self.serverTransport.accept()
185 self.clients.put(client)
Jake Farrell6c5ee6a2012-05-18 01:00:39 +0000186 except Exception as x:
David Reissb04df762008-06-10 22:55:38 +0000187 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000188
189
David Reiss66536542008-06-10 22:54:49 +0000190class TForkingServer(TServer):
Bryan Duxbury69720412012-01-03 17:32:30 +0000191 """A Thrift server that forks a new process for each request
David Reiss66536542008-06-10 22:54:49 +0000192
David Reiss66536542008-06-10 22:54:49 +0000193 This is more scalable than the threaded server as it does not cause
194 GIL contention.
195
196 Note that this has different semantics from the threading server.
197 Specifically, updates to shared variables will no longer be shared.
198 It will also not work on windows.
199
200 This code is heavily inspired by SocketServer.ForkingMixIn in the
201 Python stdlib.
202 """
David Reiss66536542008-06-10 22:54:49 +0000203 def __init__(self, *args):
204 TServer.__init__(self, *args)
205 self.children = []
206
207 def serve(self):
David Reissbcaa2ad2008-06-10 22:55:26 +0000208 def try_close(file):
209 try:
210 file.close()
Jake Farrell6c5ee6a2012-05-18 01:00:39 +0000211 except IOError as e:
David Reissb04df762008-06-10 22:55:38 +0000212 logging.warning(e, exc_info=True)
David Reissbcaa2ad2008-06-10 22:55:26 +0000213
David Reiss66536542008-06-10 22:54:49 +0000214 self.serverTransport.listen()
215 while True:
216 client = self.serverTransport.accept()
217 try:
218 pid = os.fork()
219
Bryan Duxbury69720412012-01-03 17:32:30 +0000220 if pid: # parent
David Reiss66536542008-06-10 22:54:49 +0000221 # add before collect, otherwise you race w/ waitpid
222 self.children.append(pid)
223 self.collect_children()
224
David Reissbcaa2ad2008-06-10 22:55:26 +0000225 # Parent must close socket or the connection may not get
226 # closed promptly
227 itrans = self.inputTransportFactory.getTransport(client)
228 otrans = self.outputTransportFactory.getTransport(client)
229 try_close(itrans)
230 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000231 else:
232 itrans = self.inputTransportFactory.getTransport(client)
233 otrans = self.outputTransportFactory.getTransport(client)
234
235 iprot = self.inputProtocolFactory.getProtocol(itrans)
236 oprot = self.outputProtocolFactory.getProtocol(otrans)
237
David Reissbcaa2ad2008-06-10 22:55:26 +0000238 ecode = 0
David Reiss66536542008-06-10 22:54:49 +0000239 try:
Kevin Clark1e0744d2008-06-24 20:46:32 +0000240 try:
241 while True:
242 self.processor.process(iprot, oprot)
243 except TTransport.TTransportException, tx:
244 pass
Jake Farrell6c5ee6a2012-05-18 01:00:39 +0000245 except Exception as e:
Kevin Clark1e0744d2008-06-24 20:46:32 +0000246 logging.exception(e)
247 ecode = 1
David Reissbcaa2ad2008-06-10 22:55:26 +0000248 finally:
249 try_close(itrans)
250 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000251
David Reissbcaa2ad2008-06-10 22:55:26 +0000252 os._exit(ecode)
David Reiss66536542008-06-10 22:54:49 +0000253
254 except TTransport.TTransportException, tx:
255 pass
Jake Farrell6c5ee6a2012-05-18 01:00:39 +0000256 except Exception as x:
David Reissb04df762008-06-10 22:55:38 +0000257 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000258
David Reiss66536542008-06-10 22:54:49 +0000259 def collect_children(self):
260 while self.children:
261 try:
262 pid, status = os.waitpid(0, os.WNOHANG)
263 except os.error:
264 pid = None
265
266 if pid:
267 self.children.remove(pid)
268 else:
269 break