blob: 976a8f376b50d6376f7729904162b3d764308aac [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#
David Reissea2cba82009-03-30 21:35:00 +00002# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
Mark Slee89e2bb82007-03-01 00:20:36 +000019
Mark Sleeb90aa7c2006-10-24 18:49:45 +000020import Queue
Bryan Duxbury69720412012-01-03 17:32:30 +000021import logging
22import os
23import sys
24import threading
25import traceback
Mark Sleec98d0502006-09-06 02:42:25 +000026
Mark Sleec9676562006-09-05 17:34:52 +000027from thrift.Thrift import TProcessor
Mark Slee4ac459f2006-10-25 21:39:01 +000028from thrift.protocol import TBinaryProtocol
Bryan Duxbury69720412012-01-03 17:32:30 +000029from thrift.transport import TTransport
30
Mark Sleec9676562006-09-05 17:34:52 +000031
32class TServer:
Bryan Duxbury69720412012-01-03 17:32:30 +000033 """Base interface for a server, which must have a serve() method.
Mark Sleec9676562006-09-05 17:34:52 +000034
Bryan Duxbury69720412012-01-03 17:32:30 +000035 Three constructors for all servers:
Aditya Agarwal5c468192007-02-06 01:14:33 +000036 1) (processor, serverTransport)
37 2) (processor, serverTransport, transportFactory, protocolFactory)
38 3) (processor, serverTransport,
39 inputTransportFactory, outputTransportFactory,
Bryan Duxbury69720412012-01-03 17:32:30 +000040 inputProtocolFactory, outputProtocolFactory)
41 """
Aditya Agarwal5c468192007-02-06 01:14:33 +000042 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000043 if (len(args) == 2):
44 self.__initArgs__(args[0], args[1],
45 TTransport.TTransportFactoryBase(),
46 TTransport.TTransportFactoryBase(),
47 TBinaryProtocol.TBinaryProtocolFactory(),
48 TBinaryProtocol.TBinaryProtocolFactory())
49 elif (len(args) == 4):
50 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
51 elif (len(args) == 6):
52 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
53
54 def __initArgs__(self, processor, serverTransport,
55 inputTransportFactory, outputTransportFactory,
56 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000057 self.processor = processor
58 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000059 self.inputTransportFactory = inputTransportFactory
60 self.outputTransportFactory = outputTransportFactory
61 self.inputProtocolFactory = inputProtocolFactory
62 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000063
Mark Slee794993d2006-09-20 01:56:10 +000064 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000065 pass
66
Mark Sleec9676562006-09-05 17:34:52 +000067
Bryan Duxbury69720412012-01-03 17:32:30 +000068class TSimpleServer(TServer):
Mark Sleec9676562006-09-05 17:34:52 +000069 """Simple single-threaded server that just pumps around one transport."""
70
Aditya Agarwal5c468192007-02-06 01:14:33 +000071 def __init__(self, *args):
72 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000073
Mark Slee794993d2006-09-20 01:56:10 +000074 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000075 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000076 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000077 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +020078 if not client:
79 continue
Aditya Agarwal5c468192007-02-06 01:14:33 +000080 itrans = self.inputTransportFactory.getTransport(client)
81 otrans = self.outputTransportFactory.getTransport(client)
82 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000083 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000084 try:
85 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000086 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000087 except TTransport.TTransportException, tx:
88 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -080089 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +000090 logging.exception(x)
Mark Sleed788b2e2006-09-07 01:26:35 +000091
Mark Slee4ac459f2006-10-25 21:39:01 +000092 itrans.close()
93 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000094
Mark Slee4f0fed62006-10-02 17:50:08 +000095
Bryan Duxbury69720412012-01-03 17:32:30 +000096class TThreadedServer(TServer):
Mark Slee4f0fed62006-10-02 17:50:08 +000097 """Threaded server that spawns a new thread per each connection."""
98
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +000099 def __init__(self, *args, **kwargs):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000100 TServer.__init__(self, *args)
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000101 self.daemon = kwargs.get("daemon", False)
Mark Slee4f0fed62006-10-02 17:50:08 +0000102
103 def serve(self):
104 self.serverTransport.listen()
105 while True:
106 try:
107 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +0200108 if not client:
109 continue
Bryan Duxbury69720412012-01-03 17:32:30 +0000110 t = threading.Thread(target=self.handle, args=(client,))
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000111 t.setDaemon(self.daemon)
Mark Slee4f0fed62006-10-02 17:50:08 +0000112 t.start()
Mark Slee5299a952007-10-05 00:13:24 +0000113 except KeyboardInterrupt:
114 raise
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800115 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000116 logging.exception(x)
Mark Slee4f0fed62006-10-02 17:50:08 +0000117
118 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000119 itrans = self.inputTransportFactory.getTransport(client)
120 otrans = self.outputTransportFactory.getTransport(client)
121 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000122 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000123 try:
124 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000125 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000126 except TTransport.TTransportException, tx:
127 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800128 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000129 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000130
Mark Slee4ac459f2006-10-25 21:39:01 +0000131 itrans.close()
132 otrans.close()
133
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000134
Bryan Duxbury69720412012-01-03 17:32:30 +0000135class TThreadPoolServer(TServer):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000136 """Server with a fixed size pool of threads which service requests."""
137
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000138 def __init__(self, *args, **kwargs):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000139 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000140 self.clients = Queue.Queue()
141 self.threads = 10
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000142 self.daemon = kwargs.get("daemon", False)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000143
Mark Slee4ce787f2006-10-24 18:54:06 +0000144 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000145 """Set the number of worker threads that should be created"""
146 self.threads = num
147
148 def serveThread(self):
149 """Loop around getting clients from the shared queue and process them."""
150 while True:
151 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000152 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000153 self.serveClient(client)
154 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000155 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000156
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000157 def serveClient(self, client):
158 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000159 itrans = self.inputTransportFactory.getTransport(client)
160 otrans = self.outputTransportFactory.getTransport(client)
161 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000162 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000163 try:
164 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000165 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000166 except TTransport.TTransportException, tx:
167 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800168 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000169 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000170
Mark Slee4ac459f2006-10-25 21:39:01 +0000171 itrans.close()
172 otrans.close()
173
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000174 def serve(self):
175 """Start a fixed number of worker threads and put client into a queue"""
176 for i in range(self.threads):
177 try:
Bryan Duxbury69720412012-01-03 17:32:30 +0000178 t = threading.Thread(target=self.serveThread)
Bryan Duxburyf2ef59f2010-09-02 15:12:06 +0000179 t.setDaemon(self.daemon)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000180 t.start()
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800181 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000182 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000183
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000184 # Pump the socket for clients
185 self.serverTransport.listen()
186 while True:
187 try:
188 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +0200189 if not client:
190 continue
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000191 self.clients.put(client)
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800192 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000193 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000194
195
David Reiss66536542008-06-10 22:54:49 +0000196class TForkingServer(TServer):
Bryan Duxbury69720412012-01-03 17:32:30 +0000197 """A Thrift server that forks a new process for each request
David Reiss66536542008-06-10 22:54:49 +0000198
David Reiss66536542008-06-10 22:54:49 +0000199 This is more scalable than the threaded server as it does not cause
200 GIL contention.
201
202 Note that this has different semantics from the threading server.
203 Specifically, updates to shared variables will no longer be shared.
204 It will also not work on windows.
205
206 This code is heavily inspired by SocketServer.ForkingMixIn in the
207 Python stdlib.
208 """
David Reiss66536542008-06-10 22:54:49 +0000209 def __init__(self, *args):
210 TServer.__init__(self, *args)
211 self.children = []
212
213 def serve(self):
David Reissbcaa2ad2008-06-10 22:55:26 +0000214 def try_close(file):
215 try:
216 file.close()
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800217 except IOError, e:
David Reissb04df762008-06-10 22:55:38 +0000218 logging.warning(e, exc_info=True)
David Reissbcaa2ad2008-06-10 22:55:26 +0000219
David Reiss66536542008-06-10 22:54:49 +0000220 self.serverTransport.listen()
221 while True:
222 client = self.serverTransport.accept()
Roger Meierab2793a2014-04-21 21:20:00 +0200223 if not client:
224 continue
David Reiss66536542008-06-10 22:54:49 +0000225 try:
226 pid = os.fork()
227
Bryan Duxbury69720412012-01-03 17:32:30 +0000228 if pid: # parent
David Reiss66536542008-06-10 22:54:49 +0000229 # add before collect, otherwise you race w/ waitpid
230 self.children.append(pid)
231 self.collect_children()
232
David Reissbcaa2ad2008-06-10 22:55:26 +0000233 # Parent must close socket or the connection may not get
234 # closed promptly
235 itrans = self.inputTransportFactory.getTransport(client)
236 otrans = self.outputTransportFactory.getTransport(client)
237 try_close(itrans)
238 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000239 else:
240 itrans = self.inputTransportFactory.getTransport(client)
241 otrans = self.outputTransportFactory.getTransport(client)
242
243 iprot = self.inputProtocolFactory.getProtocol(itrans)
244 oprot = self.outputProtocolFactory.getProtocol(otrans)
245
David Reissbcaa2ad2008-06-10 22:55:26 +0000246 ecode = 0
David Reiss66536542008-06-10 22:54:49 +0000247 try:
Kevin Clark1e0744d2008-06-24 20:46:32 +0000248 try:
249 while True:
250 self.processor.process(iprot, oprot)
251 except TTransport.TTransportException, tx:
252 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800253 except Exception, e:
Kevin Clark1e0744d2008-06-24 20:46:32 +0000254 logging.exception(e)
255 ecode = 1
David Reissbcaa2ad2008-06-10 22:55:26 +0000256 finally:
257 try_close(itrans)
258 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000259
David Reissbcaa2ad2008-06-10 22:55:26 +0000260 os._exit(ecode)
David Reiss66536542008-06-10 22:54:49 +0000261
262 except TTransport.TTransportException, tx:
263 pass
Todd Lipcon2b2560e2012-12-10 14:29:59 -0800264 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000265 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000266
David Reiss66536542008-06-10 22:54:49 +0000267 def collect_children(self):
268 while self.children:
269 try:
270 pid, status = os.waitpid(0, os.WNOHANG)
271 except os.error:
272 pid = None
273
274 if pid:
275 self.children.remove(pid)
276 else:
277 break