blob: 8456e2d40db28bd5c761b7de6990ee7e408791a9 [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#
David Reissea2cba82009-03-30 21:35:00 +00002# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
Mark Slee89e2bb82007-03-01 00:20:36 +000019
David Reissb04df762008-06-10 22:55:38 +000020import logging
Mark Sleec98d0502006-09-06 02:42:25 +000021import sys
David Reiss66536542008-06-10 22:54:49 +000022import os
Mark Sleec98d0502006-09-06 02:42:25 +000023import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +000024import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +000025import Queue
Mark Sleec98d0502006-09-06 02:42:25 +000026
Mark Sleec9676562006-09-05 17:34:52 +000027from thrift.Thrift import TProcessor
28from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +000029from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +000030
31class TServer:
32
Mark Slee794993d2006-09-20 01:56:10 +000033 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000034
Aditya Agarwal5c468192007-02-06 01:14:33 +000035 """ 3 constructors for all servers:
36 1) (processor, serverTransport)
37 2) (processor, serverTransport, transportFactory, protocolFactory)
38 3) (processor, serverTransport,
39 inputTransportFactory, outputTransportFactory,
40 inputProtocolFactory, outputProtocolFactory)"""
41 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000042 if (len(args) == 2):
43 self.__initArgs__(args[0], args[1],
44 TTransport.TTransportFactoryBase(),
45 TTransport.TTransportFactoryBase(),
46 TBinaryProtocol.TBinaryProtocolFactory(),
47 TBinaryProtocol.TBinaryProtocolFactory())
48 elif (len(args) == 4):
49 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
50 elif (len(args) == 6):
51 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
52
53 def __initArgs__(self, processor, serverTransport,
54 inputTransportFactory, outputTransportFactory,
55 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000056 self.processor = processor
57 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000058 self.inputTransportFactory = inputTransportFactory
59 self.outputTransportFactory = outputTransportFactory
60 self.inputProtocolFactory = inputProtocolFactory
61 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000062
Mark Slee794993d2006-09-20 01:56:10 +000063 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000064 pass
65
66class TSimpleServer(TServer):
67
68 """Simple single-threaded server that just pumps around one transport."""
69
Aditya Agarwal5c468192007-02-06 01:14:33 +000070 def __init__(self, *args):
71 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000072
Mark Slee794993d2006-09-20 01:56:10 +000073 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000074 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000075 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000076 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000077 itrans = self.inputTransportFactory.getTransport(client)
78 otrans = self.outputTransportFactory.getTransport(client)
79 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000080 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000081 try:
82 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000083 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000084 except TTransport.TTransportException, tx:
85 pass
Mark Sleec9676562006-09-05 17:34:52 +000086 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +000087 logging.exception(x)
Mark Sleed788b2e2006-09-07 01:26:35 +000088
Mark Slee4ac459f2006-10-25 21:39:01 +000089 itrans.close()
90 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000091
92class TThreadedServer(TServer):
93
94 """Threaded server that spawns a new thread per each connection."""
95
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +000096 def __init__(self, *args, **kwargs):
Aditya Agarwal5c468192007-02-06 01:14:33 +000097 TServer.__init__(self, *args)
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +000098 self.daemon = kwargs.get("daemon", False)
Mark Slee4f0fed62006-10-02 17:50:08 +000099
100 def serve(self):
101 self.serverTransport.listen()
102 while True:
103 try:
104 client = self.serverTransport.accept()
105 t = threading.Thread(target = self.handle, args=(client,))
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000106 t.setDaemon(self.daemon)
Mark Slee4f0fed62006-10-02 17:50:08 +0000107 t.start()
Mark Slee5299a952007-10-05 00:13:24 +0000108 except KeyboardInterrupt:
109 raise
Mark Slee4f0fed62006-10-02 17:50:08 +0000110 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000111 logging.exception(x)
Mark Slee4f0fed62006-10-02 17:50:08 +0000112
113 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000114 itrans = self.inputTransportFactory.getTransport(client)
115 otrans = self.outputTransportFactory.getTransport(client)
116 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000117 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000118 try:
119 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000120 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000121 except TTransport.TTransportException, tx:
122 pass
123 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000124 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000125
Mark Slee4ac459f2006-10-25 21:39:01 +0000126 itrans.close()
127 otrans.close()
128
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000129class TThreadPoolServer(TServer):
130
131 """Server with a fixed size pool of threads which service requests."""
132
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000133 def __init__(self, *args, **kwargs):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000134 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000135 self.clients = Queue.Queue()
136 self.threads = 10
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000137 self.daemon = kwargs.get("daemon", False)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000138
Mark Slee4ce787f2006-10-24 18:54:06 +0000139 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000140 """Set the number of worker threads that should be created"""
141 self.threads = num
142
143 def serveThread(self):
144 """Loop around getting clients from the shared queue and process them."""
145 while True:
146 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000147 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000148 self.serveClient(client)
149 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000150 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000151
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000152 def serveClient(self, client):
153 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000154 itrans = self.inputTransportFactory.getTransport(client)
155 otrans = self.outputTransportFactory.getTransport(client)
156 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000157 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000158 try:
159 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000160 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000161 except TTransport.TTransportException, tx:
162 pass
163 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000164 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000165
Mark Slee4ac459f2006-10-25 21:39:01 +0000166 itrans.close()
167 otrans.close()
168
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000169 def serve(self):
170 """Start a fixed number of worker threads and put client into a queue"""
171 for i in range(self.threads):
172 try:
173 t = threading.Thread(target = self.serveThread)
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000174 t.setDaemon(self.daemon)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000175 t.start()
176 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000177 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000178
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000179 # Pump the socket for clients
180 self.serverTransport.listen()
181 while True:
182 try:
183 client = self.serverTransport.accept()
184 self.clients.put(client)
185 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000186 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000187
188
David Reiss66536542008-06-10 22:54:49 +0000189class TForkingServer(TServer):
190
191 """A Thrift server that forks a new process for each request"""
192 """
193 This is more scalable than the threaded server as it does not cause
194 GIL contention.
195
196 Note that this has different semantics from the threading server.
197 Specifically, updates to shared variables will no longer be shared.
198 It will also not work on windows.
199
200 This code is heavily inspired by SocketServer.ForkingMixIn in the
201 Python stdlib.
202 """
203
204 def __init__(self, *args):
205 TServer.__init__(self, *args)
206 self.children = []
207
208 def serve(self):
David Reissbcaa2ad2008-06-10 22:55:26 +0000209 def try_close(file):
210 try:
211 file.close()
212 except IOError, e:
David Reissb04df762008-06-10 22:55:38 +0000213 logging.warning(e, exc_info=True)
David Reissbcaa2ad2008-06-10 22:55:26 +0000214
215
David Reiss66536542008-06-10 22:54:49 +0000216 self.serverTransport.listen()
217 while True:
218 client = self.serverTransport.accept()
219 try:
220 pid = os.fork()
221
222 if pid: # parent
223 # add before collect, otherwise you race w/ waitpid
224 self.children.append(pid)
225 self.collect_children()
226
David Reissbcaa2ad2008-06-10 22:55:26 +0000227 # Parent must close socket or the connection may not get
228 # closed promptly
229 itrans = self.inputTransportFactory.getTransport(client)
230 otrans = self.outputTransportFactory.getTransport(client)
231 try_close(itrans)
232 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000233 else:
234 itrans = self.inputTransportFactory.getTransport(client)
235 otrans = self.outputTransportFactory.getTransport(client)
236
237 iprot = self.inputProtocolFactory.getProtocol(itrans)
238 oprot = self.outputProtocolFactory.getProtocol(otrans)
239
David Reissbcaa2ad2008-06-10 22:55:26 +0000240 ecode = 0
David Reiss66536542008-06-10 22:54:49 +0000241 try:
Kevin Clark1e0744d2008-06-24 20:46:32 +0000242 try:
243 while True:
244 self.processor.process(iprot, oprot)
245 except TTransport.TTransportException, tx:
246 pass
247 except Exception, e:
248 logging.exception(e)
249 ecode = 1
David Reissbcaa2ad2008-06-10 22:55:26 +0000250 finally:
251 try_close(itrans)
252 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000253
David Reissbcaa2ad2008-06-10 22:55:26 +0000254 os._exit(ecode)
David Reiss66536542008-06-10 22:54:49 +0000255
256 except TTransport.TTransportException, tx:
257 pass
258 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000259 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000260
261
David Reiss66536542008-06-10 22:54:49 +0000262 def collect_children(self):
263 while self.children:
264 try:
265 pid, status = os.waitpid(0, os.WNOHANG)
266 except os.error:
267 pid = None
268
269 if pid:
270 self.children.remove(pid)
271 else:
272 break
273
274