blob: 8b2f938a62a33a40c34777aa014382185f2892ab [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#
David Reissea2cba82009-03-30 21:35:00 +00002# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
Mark Slee89e2bb82007-03-01 00:20:36 +000019
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090020from six.moves import queue
Konrad Grochowski3a724e32014-08-12 11:48:29 -040021import logging
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090022import os
23import threading
Konrad Grochowski3a724e32014-08-12 11:48:29 -040024
Mark Slee4ac459f2006-10-25 21:39:01 +000025from thrift.protocol import TBinaryProtocol
Neil Williams66a44c52018-08-13 16:12:24 -070026from thrift.protocol.THeaderProtocol import THeaderProtocolFactory
Bryan Duxbury69720412012-01-03 17:32:30 +000027from thrift.transport import TTransport
28
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090029logger = logging.getLogger(__name__)
30
Mark Sleec9676562006-09-05 17:34:52 +000031
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +090032class TServer(object):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090033 """Base interface for a server, which must have a serve() method.
Mark Sleec9676562006-09-05 17:34:52 +000034
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090035 Three constructors for all servers:
36 1) (processor, serverTransport)
37 2) (processor, serverTransport, transportFactory, protocolFactory)
38 3) (processor, serverTransport,
39 inputTransportFactory, outputTransportFactory,
40 inputProtocolFactory, outputProtocolFactory)
41 """
42 def __init__(self, *args):
43 if (len(args) == 2):
44 self.__initArgs__(args[0], args[1],
45 TTransport.TTransportFactoryBase(),
46 TTransport.TTransportFactoryBase(),
47 TBinaryProtocol.TBinaryProtocolFactory(),
48 TBinaryProtocol.TBinaryProtocolFactory())
49 elif (len(args) == 4):
50 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
51 elif (len(args) == 6):
52 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
Aditya Agarwal5c468192007-02-06 01:14:33 +000053
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090054 def __initArgs__(self, processor, serverTransport,
55 inputTransportFactory, outputTransportFactory,
56 inputProtocolFactory, outputProtocolFactory):
57 self.processor = processor
58 self.serverTransport = serverTransport
59 self.inputTransportFactory = inputTransportFactory
60 self.outputTransportFactory = outputTransportFactory
61 self.inputProtocolFactory = inputProtocolFactory
62 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000063
Neil Williams66a44c52018-08-13 16:12:24 -070064 input_is_header = isinstance(self.inputProtocolFactory, THeaderProtocolFactory)
65 output_is_header = isinstance(self.outputProtocolFactory, THeaderProtocolFactory)
66 if any((input_is_header, output_is_header)) and input_is_header != output_is_header:
67 raise ValueError("THeaderProtocol servers require that both the input and "
68 "output protocols are THeaderProtocol.")
69
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090070 def serve(self):
71 pass
Mark Sleec9676562006-09-05 17:34:52 +000072
Mark Sleec9676562006-09-05 17:34:52 +000073
Bryan Duxbury69720412012-01-03 17:32:30 +000074class TSimpleServer(TServer):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090075 """Simple single-threaded server that just pumps around one transport."""
Mark Sleec9676562006-09-05 17:34:52 +000076
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090077 def __init__(self, *args):
78 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000079
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090080 def serve(self):
81 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000082 while True:
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090083 client = self.serverTransport.accept()
84 if not client:
85 continue
Neil Williams66a44c52018-08-13 16:12:24 -070086
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090087 itrans = self.inputTransportFactory.getTransport(client)
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090088 iprot = self.inputProtocolFactory.getProtocol(itrans)
Neil Williams66a44c52018-08-13 16:12:24 -070089
90 # for THeaderProtocol, we must use the same protocol instance for
91 # input and output so that the response is in the same dialect that
92 # the server detected the request was in.
93 if isinstance(self.inputProtocolFactory, THeaderProtocolFactory):
94 otrans = None
95 oprot = iprot
96 else:
97 otrans = self.outputTransportFactory.getTransport(client)
98 oprot = self.outputProtocolFactory.getProtocol(otrans)
99
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900100 try:
101 while True:
102 self.processor.process(iprot, oprot)
103 except TTransport.TTransportException:
104 pass
105 except Exception as x:
106 logger.exception(x)
Mark Sleed788b2e2006-09-07 01:26:35 +0000107
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900108 itrans.close()
Neil Williams66a44c52018-08-13 16:12:24 -0700109 if otrans:
110 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +0000111
Mark Slee4f0fed62006-10-02 17:50:08 +0000112
Bryan Duxbury69720412012-01-03 17:32:30 +0000113class TThreadedServer(TServer):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900114 """Threaded server that spawns a new thread per each connection."""
Mark Slee4f0fed62006-10-02 17:50:08 +0000115
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900116 def __init__(self, *args, **kwargs):
117 TServer.__init__(self, *args)
118 self.daemon = kwargs.get("daemon", False)
Mark Slee4f0fed62006-10-02 17:50:08 +0000119
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900120 def serve(self):
121 self.serverTransport.listen()
122 while True:
123 try:
124 client = self.serverTransport.accept()
125 if not client:
126 continue
127 t = threading.Thread(target=self.handle, args=(client,))
Jiayu Liu6f339002023-04-20 07:39:35 +0800128 t.daemon = self.daemon
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900129 t.start()
130 except KeyboardInterrupt:
131 raise
132 except Exception as x:
133 logger.exception(x)
Mark Slee4f0fed62006-10-02 17:50:08 +0000134
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900135 def handle(self, client):
136 itrans = self.inputTransportFactory.getTransport(client)
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900137 iprot = self.inputProtocolFactory.getProtocol(itrans)
Neil Williams66a44c52018-08-13 16:12:24 -0700138
139 # for THeaderProtocol, we must use the same protocol instance for input
140 # and output so that the response is in the same dialect that the
141 # server detected the request was in.
142 if isinstance(self.inputProtocolFactory, THeaderProtocolFactory):
143 otrans = None
144 oprot = iprot
145 else:
146 otrans = self.outputTransportFactory.getTransport(client)
147 oprot = self.outputProtocolFactory.getProtocol(otrans)
148
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900149 try:
150 while True:
151 self.processor.process(iprot, oprot)
152 except TTransport.TTransportException:
153 pass
154 except Exception as x:
155 logger.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000156
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900157 itrans.close()
Neil Williams66a44c52018-08-13 16:12:24 -0700158 if otrans:
159 otrans.close()
Mark Slee4ac459f2006-10-25 21:39:01 +0000160
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000161
Bryan Duxbury69720412012-01-03 17:32:30 +0000162class TThreadPoolServer(TServer):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900163 """Server with a fixed size pool of threads which service requests."""
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000164
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900165 def __init__(self, *args, **kwargs):
166 TServer.__init__(self, *args)
167 self.clients = queue.Queue()
168 self.threads = 10
169 self.daemon = kwargs.get("daemon", False)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000170
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900171 def setNumThreads(self, num):
172 """Set the number of worker threads that should be created"""
173 self.threads = num
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000174
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900175 def serveThread(self):
176 """Loop around getting clients from the shared queue and process them."""
177 while True:
178 try:
179 client = self.clients.get()
180 self.serveClient(client)
181 except Exception as x:
182 logger.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000183
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900184 def serveClient(self, client):
185 """Process input/output from a client for as long as possible"""
186 itrans = self.inputTransportFactory.getTransport(client)
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900187 iprot = self.inputProtocolFactory.getProtocol(itrans)
Neil Williams66a44c52018-08-13 16:12:24 -0700188
189 # for THeaderProtocol, we must use the same protocol instance for input
190 # and output so that the response is in the same dialect that the
191 # server detected the request was in.
192 if isinstance(self.inputProtocolFactory, THeaderProtocolFactory):
193 otrans = None
194 oprot = iprot
195 else:
196 otrans = self.outputTransportFactory.getTransport(client)
197 oprot = self.outputProtocolFactory.getProtocol(otrans)
198
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900199 try:
200 while True:
201 self.processor.process(iprot, oprot)
202 except TTransport.TTransportException:
203 pass
204 except Exception as x:
205 logger.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000206
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900207 itrans.close()
Neil Williams66a44c52018-08-13 16:12:24 -0700208 if otrans:
209 otrans.close()
Mark Slee4ac459f2006-10-25 21:39:01 +0000210
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900211 def serve(self):
212 """Start a fixed number of worker threads and put client into a queue"""
213 for i in range(self.threads):
214 try:
215 t = threading.Thread(target=self.serveThread)
Jiayu Liu6f339002023-04-20 07:39:35 +0800216 t.daemon = self.daemon
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900217 t.start()
218 except Exception as x:
219 logger.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000220
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900221 # Pump the socket for clients
222 self.serverTransport.listen()
223 while True:
224 try:
225 client = self.serverTransport.accept()
226 if not client:
227 continue
228 self.clients.put(client)
229 except Exception as x:
230 logger.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000231
232
David Reiss66536542008-06-10 22:54:49 +0000233class TForkingServer(TServer):
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900234 """A Thrift server that forks a new process for each request
David Reiss66536542008-06-10 22:54:49 +0000235
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900236 This is more scalable than the threaded server as it does not cause
237 GIL contention.
David Reiss66536542008-06-10 22:54:49 +0000238
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900239 Note that this has different semantics from the threading server.
240 Specifically, updates to shared variables will no longer be shared.
241 It will also not work on windows.
David Reiss66536542008-06-10 22:54:49 +0000242
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900243 This code is heavily inspired by SocketServer.ForkingMixIn in the
244 Python stdlib.
245 """
246 def __init__(self, *args):
247 TServer.__init__(self, *args)
248 self.children = []
David Reiss66536542008-06-10 22:54:49 +0000249
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900250 def serve(self):
251 def try_close(file):
Kevin Clark1e0744d2008-06-24 20:46:32 +0000252 try:
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900253 file.close()
254 except IOError as e:
255 logger.warning(e, exc_info=True)
256
257 self.serverTransport.listen()
258 while True:
259 client = self.serverTransport.accept()
260 if not client:
261 continue
262 try:
263 pid = os.fork()
264
265 if pid: # parent
266 # add before collect, otherwise you race w/ waitpid
267 self.children.append(pid)
268 self.collect_children()
269
270 # Parent must close socket or the connection may not get
271 # closed promptly
272 itrans = self.inputTransportFactory.getTransport(client)
273 otrans = self.outputTransportFactory.getTransport(client)
274 try_close(itrans)
275 try_close(otrans)
276 else:
277 itrans = self.inputTransportFactory.getTransport(client)
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900278 iprot = self.inputProtocolFactory.getProtocol(itrans)
Neil Williams66a44c52018-08-13 16:12:24 -0700279
280 # for THeaderProtocol, we must use the same protocol
281 # instance for input and output so that the response is in
282 # the same dialect that the server detected the request was
283 # in.
284 if isinstance(self.inputProtocolFactory, THeaderProtocolFactory):
285 otrans = None
286 oprot = iprot
287 else:
288 otrans = self.outputTransportFactory.getTransport(client)
289 oprot = self.outputProtocolFactory.getProtocol(otrans)
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900290
291 ecode = 0
292 try:
293 try:
294 while True:
295 self.processor.process(iprot, oprot)
296 except TTransport.TTransportException:
297 pass
298 except Exception as e:
299 logger.exception(e)
300 ecode = 1
301 finally:
302 try_close(itrans)
Neil Williams66a44c52018-08-13 16:12:24 -0700303 if otrans:
304 try_close(otrans)
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900305
306 os._exit(ecode)
307
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900308 except TTransport.TTransportException:
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900309 pass
310 except Exception as x:
311 logger.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000312
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900313 def collect_children(self):
314 while self.children:
315 try:
316 pid, status = os.waitpid(0, os.WNOHANG)
317 except os.error:
318 pid = None
David Reiss66536542008-06-10 22:54:49 +0000319
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900320 if pid:
321 self.children.remove(pid)
322 else:
323 break