blob: 00fd8cf247a9b8da1230d666c9feb447286ff073 [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#
David Reissea2cba82009-03-30 21:35:00 +00002# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
Mark Slee89e2bb82007-03-01 00:20:36 +000019
Mark Sleeb90aa7c2006-10-24 18:49:45 +000020import Queue
Bryan Duxbury69720412012-01-03 17:32:30 +000021import os
22import sys
23import threading
24import traceback
Mark Sleec98d0502006-09-06 02:42:25 +000025
Konrad Grochowski3a724e32014-08-12 11:48:29 -040026import logging
27logger = logging.getLogger(__name__)
28
Mark Sleec9676562006-09-05 17:34:52 +000029from thrift.Thrift import TProcessor
Mark Slee4ac459f2006-10-25 21:39:01 +000030from thrift.protocol import TBinaryProtocol
Bryan Duxbury69720412012-01-03 17:32:30 +000031from thrift.transport import TTransport
32
Mark Sleec9676562006-09-05 17:34:52 +000033
34class TServer:
Bryan Duxbury69720412012-01-03 17:32:30 +000035 """Base interface for a server, which must have a serve() method.
Mark Sleec9676562006-09-05 17:34:52 +000036
Bryan Duxbury69720412012-01-03 17:32:30 +000037 Three constructors for all servers:
Aditya Agarwal5c468192007-02-06 01:14:33 +000038 1) (processor, serverTransport)
39 2) (processor, serverTransport, transportFactory, protocolFactory)
40 3) (processor, serverTransport,
41 inputTransportFactory, outputTransportFactory,
Bryan Duxbury69720412012-01-03 17:32:30 +000042 inputProtocolFactory, outputProtocolFactory)
43 """
Aditya Agarwal5c468192007-02-06 01:14:33 +000044 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000045 if (len(args) == 2):
46 self.__initArgs__(args[0], args[1],
47 TTransport.TTransportFactoryBase(),
48 TTransport.TTransportFactoryBase(),
49 TBinaryProtocol.TBinaryProtocolFactory(),
50 TBinaryProtocol.TBinaryProtocolFactory())
51 elif (len(args) == 4):
52 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
53 elif (len(args) == 6):
54 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
55
56 def __initArgs__(self, processor, serverTransport,
57 inputTransportFactory, outputTransportFactory,
58 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000059 self.processor = processor
60 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000061 self.inputTransportFactory = inputTransportFactory
62 self.outputTransportFactory = outputTransportFactory
63 self.inputProtocolFactory = inputProtocolFactory
64 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000065
Mark Slee794993d2006-09-20 01:56:10 +000066 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000067 pass
68
Mark Sleec9676562006-09-05 17:34:52 +000069
Bryan Duxbury69720412012-01-03 17:32:30 +000070class TSimpleServer(TServer):
Mark Sleec9676562006-09-05 17:34:52 +000071 """Simple single-threaded server that just pumps around one transport."""
72
Aditya Agarwal5c468192007-02-06 01:14:33 +000073 def __init__(self, *args):
74 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000075
Mark Slee794993d2006-09-20 01:56:10 +000076 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000077 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000078 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000079 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +020080 if not client:
81 continue
Aditya Agarwal5c468192007-02-06 01:14:33 +000082 itrans = self.inputTransportFactory.getTransport(client)
83 otrans = self.outputTransportFactory.getTransport(client)
84 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000085 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000086 try:
87 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000088 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000089 except TTransport.TTransportException, tx:
90 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -080091 except Exception, x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040092 logger.exception(x)
Mark Sleed788b2e2006-09-07 01:26:35 +000093
Mark Slee4ac459f2006-10-25 21:39:01 +000094 itrans.close()
95 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000096
Mark Slee4f0fed62006-10-02 17:50:08 +000097
Bryan Duxbury69720412012-01-03 17:32:30 +000098class TThreadedServer(TServer):
Mark Slee4f0fed62006-10-02 17:50:08 +000099 """Threaded server that spawns a new thread per each connection."""
100
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000101 def __init__(self, *args, **kwargs):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000102 TServer.__init__(self, *args)
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000103 self.daemon = kwargs.get("daemon", False)
Mark Slee4f0fed62006-10-02 17:50:08 +0000104
105 def serve(self):
106 self.serverTransport.listen()
107 while True:
108 try:
109 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +0200110 if not client:
111 continue
Bryan Duxbury69720412012-01-03 17:32:30 +0000112 t = threading.Thread(target=self.handle, args=(client,))
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000113 t.setDaemon(self.daemon)
Mark Slee4f0fed62006-10-02 17:50:08 +0000114 t.start()
Mark Slee5299a952007-10-05 00:13:24 +0000115 except KeyboardInterrupt:
116 raise
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800117 except Exception, x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400118 logger.exception(x)
Mark Slee4f0fed62006-10-02 17:50:08 +0000119
120 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000121 itrans = self.inputTransportFactory.getTransport(client)
122 otrans = self.outputTransportFactory.getTransport(client)
123 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000124 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000125 try:
126 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000127 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000128 except TTransport.TTransportException, tx:
129 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800130 except Exception, x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400131 logger.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000132
Mark Slee4ac459f2006-10-25 21:39:01 +0000133 itrans.close()
134 otrans.close()
135
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000136
Bryan Duxbury69720412012-01-03 17:32:30 +0000137class TThreadPoolServer(TServer):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000138 """Server with a fixed size pool of threads which service requests."""
139
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000140 def __init__(self, *args, **kwargs):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000141 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000142 self.clients = Queue.Queue()
143 self.threads = 10
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000144 self.daemon = kwargs.get("daemon", False)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000145
Mark Slee4ce787f2006-10-24 18:54:06 +0000146 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000147 """Set the number of worker threads that should be created"""
148 self.threads = num
149
150 def serveThread(self):
151 """Loop around getting clients from the shared queue and process them."""
152 while True:
153 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000154 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000155 self.serveClient(client)
156 except Exception, x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400157 logger.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000158
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000159 def serveClient(self, client):
160 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000161 itrans = self.inputTransportFactory.getTransport(client)
162 otrans = self.outputTransportFactory.getTransport(client)
163 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000164 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000165 try:
166 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000167 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000168 except TTransport.TTransportException, tx:
169 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800170 except Exception, x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400171 logger.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000172
Mark Slee4ac459f2006-10-25 21:39:01 +0000173 itrans.close()
174 otrans.close()
175
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000176 def serve(self):
177 """Start a fixed number of worker threads and put client into a queue"""
178 for i in range(self.threads):
179 try:
Bryan Duxbury69720412012-01-03 17:32:30 +0000180 t = threading.Thread(target=self.serveThread)
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000181 t.setDaemon(self.daemon)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000182 t.start()
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800183 except Exception, x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400184 logger.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000185
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000186 # Pump the socket for clients
187 self.serverTransport.listen()
188 while True:
189 try:
190 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +0200191 if not client:
192 continue
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000193 self.clients.put(client)
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800194 except Exception, x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400195 logger.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000196
197
David Reiss66536542008-06-10 22:54:49 +0000198class TForkingServer(TServer):
Bryan Duxbury69720412012-01-03 17:32:30 +0000199 """A Thrift server that forks a new process for each request
David Reiss66536542008-06-10 22:54:49 +0000200
David Reiss66536542008-06-10 22:54:49 +0000201 This is more scalable than the threaded server as it does not cause
202 GIL contention.
203
204 Note that this has different semantics from the threading server.
205 Specifically, updates to shared variables will no longer be shared.
206 It will also not work on windows.
207
208 This code is heavily inspired by SocketServer.ForkingMixIn in the
209 Python stdlib.
210 """
David Reiss66536542008-06-10 22:54:49 +0000211 def __init__(self, *args):
212 TServer.__init__(self, *args)
213 self.children = []
214
215 def serve(self):
David Reissbcaa2ad2008-06-10 22:55:26 +0000216 def try_close(file):
217 try:
218 file.close()
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800219 except IOError, e:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400220 logger.warning(e, exc_info=True)
David Reissbcaa2ad2008-06-10 22:55:26 +0000221
David Reiss66536542008-06-10 22:54:49 +0000222 self.serverTransport.listen()
223 while True:
224 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +0200225 if not client:
226 continue
David Reiss66536542008-06-10 22:54:49 +0000227 try:
228 pid = os.fork()
229
Bryan Duxbury69720412012-01-03 17:32:30 +0000230 if pid: # parent
David Reiss66536542008-06-10 22:54:49 +0000231 # add before collect, otherwise you race w/ waitpid
232 self.children.append(pid)
233 self.collect_children()
234
David Reissbcaa2ad2008-06-10 22:55:26 +0000235 # Parent must close socket or the connection may not get
236 # closed promptly
237 itrans = self.inputTransportFactory.getTransport(client)
238 otrans = self.outputTransportFactory.getTransport(client)
239 try_close(itrans)
240 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000241 else:
242 itrans = self.inputTransportFactory.getTransport(client)
243 otrans = self.outputTransportFactory.getTransport(client)
244
245 iprot = self.inputProtocolFactory.getProtocol(itrans)
246 oprot = self.outputProtocolFactory.getProtocol(otrans)
247
David Reissbcaa2ad2008-06-10 22:55:26 +0000248 ecode = 0
David Reiss66536542008-06-10 22:54:49 +0000249 try:
Kevin Clark1e0744d2008-06-24 20:46:32 +0000250 try:
251 while True:
252 self.processor.process(iprot, oprot)
253 except TTransport.TTransportException, tx:
254 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800255 except Exception, e:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400256 logger.exception(e)
Kevin Clark1e0744d2008-06-24 20:46:32 +0000257 ecode = 1
David Reissbcaa2ad2008-06-10 22:55:26 +0000258 finally:
259 try_close(itrans)
260 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000261
David Reissbcaa2ad2008-06-10 22:55:26 +0000262 os._exit(ecode)
David Reiss66536542008-06-10 22:54:49 +0000263
264 except TTransport.TTransportException, tx:
265 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800266 except Exception, x:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400267 logger.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000268
David Reiss66536542008-06-10 22:54:49 +0000269 def collect_children(self):
270 while self.children:
271 try:
272 pid, status = os.waitpid(0, os.WNOHANG)
273 except os.error:
274 pid = None
275
276 if pid:
277 self.children.remove(pid)
278 else:
279 break