blob: d5d9c98a93d923044e6476324bd2377738d83fcb [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#
David Reissea2cba82009-03-30 21:35:00 +00002# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
Mark Slee89e2bb82007-03-01 00:20:36 +000019
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090020from six.moves import queue
Konrad Grochowski3a724e32014-08-12 11:48:29 -040021import logging
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090022import os
23import threading
Konrad Grochowski3a724e32014-08-12 11:48:29 -040024
Mark Slee4ac459f2006-10-25 21:39:01 +000025from thrift.protocol import TBinaryProtocol
Bryan Duxbury69720412012-01-03 17:32:30 +000026from thrift.transport import TTransport
27
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090028logger = logging.getLogger(__name__)
29
Mark Sleec9676562006-09-05 17:34:52 +000030
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +090031class TServer(object):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090032 """Base interface for a server, which must have a serve() method.
Mark Sleec9676562006-09-05 17:34:52 +000033
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090034 Three constructors for all servers:
35 1) (processor, serverTransport)
36 2) (processor, serverTransport, transportFactory, protocolFactory)
37 3) (processor, serverTransport,
38 inputTransportFactory, outputTransportFactory,
39 inputProtocolFactory, outputProtocolFactory)
40 """
41 def __init__(self, *args):
42 if (len(args) == 2):
43 self.__initArgs__(args[0], args[1],
44 TTransport.TTransportFactoryBase(),
45 TTransport.TTransportFactoryBase(),
46 TBinaryProtocol.TBinaryProtocolFactory(),
47 TBinaryProtocol.TBinaryProtocolFactory())
48 elif (len(args) == 4):
49 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
50 elif (len(args) == 6):
51 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
Aditya Agarwal5c468192007-02-06 01:14:33 +000052
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090053 def __initArgs__(self, processor, serverTransport,
54 inputTransportFactory, outputTransportFactory,
55 inputProtocolFactory, outputProtocolFactory):
56 self.processor = processor
57 self.serverTransport = serverTransport
58 self.inputTransportFactory = inputTransportFactory
59 self.outputTransportFactory = outputTransportFactory
60 self.inputProtocolFactory = inputProtocolFactory
61 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000062
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090063 def serve(self):
64 pass
Mark Sleec9676562006-09-05 17:34:52 +000065
Mark Sleec9676562006-09-05 17:34:52 +000066
Bryan Duxbury69720412012-01-03 17:32:30 +000067class TSimpleServer(TServer):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090068 """Simple single-threaded server that just pumps around one transport."""
Mark Sleec9676562006-09-05 17:34:52 +000069
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090070 def __init__(self, *args):
71 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000072
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090073 def serve(self):
74 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000075 while True:
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090076 client = self.serverTransport.accept()
77 if not client:
78 continue
79 itrans = self.inputTransportFactory.getTransport(client)
80 otrans = self.outputTransportFactory.getTransport(client)
81 iprot = self.inputProtocolFactory.getProtocol(itrans)
82 oprot = self.outputProtocolFactory.getProtocol(otrans)
83 try:
84 while True:
85 self.processor.process(iprot, oprot)
86 except TTransport.TTransportException:
87 pass
88 except Exception as x:
89 logger.exception(x)
Mark Sleed788b2e2006-09-07 01:26:35 +000090
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090091 itrans.close()
92 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000093
Mark Slee4f0fed62006-10-02 17:50:08 +000094
Bryan Duxbury69720412012-01-03 17:32:30 +000095class TThreadedServer(TServer):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090096 """Threaded server that spawns a new thread per each connection."""
Mark Slee4f0fed62006-10-02 17:50:08 +000097
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090098 def __init__(self, *args, **kwargs):
99 TServer.__init__(self, *args)
100 self.daemon = kwargs.get("daemon", False)
Mark Slee4f0fed62006-10-02 17:50:08 +0000101
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900102 def serve(self):
103 self.serverTransport.listen()
104 while True:
105 try:
106 client = self.serverTransport.accept()
107 if not client:
108 continue
109 t = threading.Thread(target=self.handle, args=(client,))
110 t.setDaemon(self.daemon)
111 t.start()
112 except KeyboardInterrupt:
113 raise
114 except Exception as x:
115 logger.exception(x)
Mark Slee4f0fed62006-10-02 17:50:08 +0000116
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900117 def handle(self, client):
118 itrans = self.inputTransportFactory.getTransport(client)
119 otrans = self.outputTransportFactory.getTransport(client)
120 iprot = self.inputProtocolFactory.getProtocol(itrans)
121 oprot = self.outputProtocolFactory.getProtocol(otrans)
122 try:
123 while True:
124 self.processor.process(iprot, oprot)
125 except TTransport.TTransportException:
126 pass
127 except Exception as x:
128 logger.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000129
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900130 itrans.close()
131 otrans.close()
Mark Slee4ac459f2006-10-25 21:39:01 +0000132
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000133
Bryan Duxbury69720412012-01-03 17:32:30 +0000134class TThreadPoolServer(TServer):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900135 """Server with a fixed size pool of threads which service requests."""
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000136
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900137 def __init__(self, *args, **kwargs):
138 TServer.__init__(self, *args)
139 self.clients = queue.Queue()
140 self.threads = 10
141 self.daemon = kwargs.get("daemon", False)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000142
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900143 def setNumThreads(self, num):
144 """Set the number of worker threads that should be created"""
145 self.threads = num
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000146
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900147 def serveThread(self):
148 """Loop around getting clients from the shared queue and process them."""
149 while True:
150 try:
151 client = self.clients.get()
152 self.serveClient(client)
153 except Exception as x:
154 logger.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000155
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900156 def serveClient(self, client):
157 """Process input/output from a client for as long as possible"""
158 itrans = self.inputTransportFactory.getTransport(client)
159 otrans = self.outputTransportFactory.getTransport(client)
160 iprot = self.inputProtocolFactory.getProtocol(itrans)
161 oprot = self.outputProtocolFactory.getProtocol(otrans)
162 try:
163 while True:
164 self.processor.process(iprot, oprot)
165 except TTransport.TTransportException:
166 pass
167 except Exception as x:
168 logger.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000169
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900170 itrans.close()
171 otrans.close()
Mark Slee4ac459f2006-10-25 21:39:01 +0000172
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900173 def serve(self):
174 """Start a fixed number of worker threads and put client into a queue"""
175 for i in range(self.threads):
176 try:
177 t = threading.Thread(target=self.serveThread)
178 t.setDaemon(self.daemon)
179 t.start()
180 except Exception as x:
181 logger.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000182
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900183 # Pump the socket for clients
184 self.serverTransport.listen()
185 while True:
186 try:
187 client = self.serverTransport.accept()
188 if not client:
189 continue
190 self.clients.put(client)
191 except Exception as x:
192 logger.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000193
194
David Reiss66536542008-06-10 22:54:49 +0000195class TForkingServer(TServer):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900196 """A Thrift server that forks a new process for each request
David Reiss66536542008-06-10 22:54:49 +0000197
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900198 This is more scalable than the threaded server as it does not cause
199 GIL contention.
David Reiss66536542008-06-10 22:54:49 +0000200
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900201 Note that this has different semantics from the threading server.
202 Specifically, updates to shared variables will no longer be shared.
203 It will also not work on windows.
David Reiss66536542008-06-10 22:54:49 +0000204
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900205 This code is heavily inspired by SocketServer.ForkingMixIn in the
206 Python stdlib.
207 """
208 def __init__(self, *args):
209 TServer.__init__(self, *args)
210 self.children = []
David Reiss66536542008-06-10 22:54:49 +0000211
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900212 def serve(self):
213 def try_close(file):
Kevin Clark1e0744d2008-06-24 20:46:32 +0000214 try:
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900215 file.close()
216 except IOError as e:
217 logger.warning(e, exc_info=True)
218
219 self.serverTransport.listen()
220 while True:
221 client = self.serverTransport.accept()
222 if not client:
223 continue
224 try:
225 pid = os.fork()
226
227 if pid: # parent
228 # add before collect, otherwise you race w/ waitpid
229 self.children.append(pid)
230 self.collect_children()
231
232 # Parent must close socket or the connection may not get
233 # closed promptly
234 itrans = self.inputTransportFactory.getTransport(client)
235 otrans = self.outputTransportFactory.getTransport(client)
236 try_close(itrans)
237 try_close(otrans)
238 else:
239 itrans = self.inputTransportFactory.getTransport(client)
240 otrans = self.outputTransportFactory.getTransport(client)
241
242 iprot = self.inputProtocolFactory.getProtocol(itrans)
243 oprot = self.outputProtocolFactory.getProtocol(otrans)
244
245 ecode = 0
246 try:
247 try:
248 while True:
249 self.processor.process(iprot, oprot)
250 except TTransport.TTransportException:
251 pass
252 except Exception as e:
253 logger.exception(e)
254 ecode = 1
255 finally:
256 try_close(itrans)
257 try_close(otrans)
258
259 os._exit(ecode)
260
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900261 except TTransport.TTransportException:
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900262 pass
263 except Exception as x:
264 logger.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000265
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900266 def collect_children(self):
267 while self.children:
268 try:
269 pid, status = os.waitpid(0, os.WNOHANG)
270 except os.error:
271 pid = None
David Reiss66536542008-06-10 22:54:49 +0000272
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900273 if pid:
274 self.children.remove(pid)
275 else:
276 break