blob: 76947608f56e664675e47992a83b7e67a95fe604 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
Bryan Duxbury69720412012-01-03 17:32:30 +000021The main idea of the server is to receive and send requests
22only from the main thread.
David Reiss74421272008-11-07 23:09:31 +000023
PikachuHy98140072022-07-31 01:00:43 +080024The thread pool should be sized for concurrent tasks, not
Bryan Duxbury69720412012-01-03 17:32:30 +000025maximum connections
David Reiss74421272008-11-07 23:09:31 +000026"""
Konrad Grochowski3a724e32014-08-12 11:48:29 -040027
David Reiss74421272008-11-07 23:09:31 +000028import logging
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090029import select
30import socket
31import struct
32import threading
David Reiss74421272008-11-07 23:09:31 +000033
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090034from collections import deque
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090035from six.moves import queue
36
David Reiss74421272008-11-07 23:09:31 +000037from thrift.transport import TTransport
38from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
39
40__all__ = ['TNonblockingServer']
41
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090042logger = logging.getLogger(__name__)
43
Bryan Duxbury69720412012-01-03 17:32:30 +000044
David Reiss74421272008-11-07 23:09:31 +000045class Worker(threading.Thread):
46 """Worker is a small helper to process incoming connection."""
Bryan Duxbury69720412012-01-03 17:32:30 +000047
David Reiss74421272008-11-07 23:09:31 +000048 def __init__(self, queue):
49 threading.Thread.__init__(self)
50 self.queue = queue
51
52 def run(self):
53 """Process queries from task queue, stop if processor is None."""
54 while True:
55 try:
56 processor, iprot, oprot, otrans, callback = self.queue.get()
57 if processor is None:
58 break
59 processor.process(iprot, oprot)
60 callback(True, otrans.getvalue())
61 except Exception:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090062 logger.exception("Exception while processing request", exc_info=True)
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090063 callback(False, b'')
David Reiss74421272008-11-07 23:09:31 +000064
James E. King, III0ad20bd2017-09-30 15:44:16 -070065
David Reiss74421272008-11-07 23:09:31 +000066WAIT_LEN = 0
67WAIT_MESSAGE = 1
68WAIT_PROCESS = 2
69SEND_ANSWER = 3
70CLOSED = 4
71
Bryan Duxbury69720412012-01-03 17:32:30 +000072
David Reiss74421272008-11-07 23:09:31 +000073def locked(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000074 """Decorator which locks self.lock."""
David Reiss74421272008-11-07 23:09:31 +000075 def nested(self, *args, **kwargs):
76 self.lock.acquire()
77 try:
78 return func(self, *args, **kwargs)
79 finally:
80 self.lock.release()
81 return nested
82
Bryan Duxbury69720412012-01-03 17:32:30 +000083
David Reiss74421272008-11-07 23:09:31 +000084def socket_exception(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000085 """Decorator close object on socket.error."""
David Reiss74421272008-11-07 23:09:31 +000086 def read(self, *args, **kwargs):
87 try:
88 return func(self, *args, **kwargs)
89 except socket.error:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090090 logger.debug('ignoring socket exception', exc_info=True)
David Reiss74421272008-11-07 23:09:31 +000091 self.close()
92 return read
93
Bryan Duxbury69720412012-01-03 17:32:30 +000094
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090095class Message(object):
96 def __init__(self, offset, len_, header):
97 self.offset = offset
98 self.len = len_
99 self.buffer = None
100 self.is_header = header
101
102 @property
103 def end(self):
104 return self.offset + self.len
105
106
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900107class Connection(object):
David Reiss74421272008-11-07 23:09:31 +0000108 """Basic class is represented connection.
Bryan Duxbury69720412012-01-03 17:32:30 +0000109
David Reiss74421272008-11-07 23:09:31 +0000110 It can be in state:
111 WAIT_LEN --- connection is reading request len.
112 WAIT_MESSAGE --- connection is reading request.
Bryan Duxbury69720412012-01-03 17:32:30 +0000113 WAIT_PROCESS --- connection has just read whole request and
114 waits for call ready routine.
David Reiss74421272008-11-07 23:09:31 +0000115 SEND_ANSWER --- connection is sending answer string (including length
Bryan Duxbury69720412012-01-03 17:32:30 +0000116 of answer).
David Reiss74421272008-11-07 23:09:31 +0000117 CLOSED --- socket was closed and connection should be deleted.
118 """
119 def __init__(self, new_socket, wake_up):
120 self.socket = new_socket
121 self.socket.setblocking(False)
122 self.status = WAIT_LEN
123 self.len = 0
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900124 self.received = deque()
125 self._reading = Message(0, 4, True)
126 self._rbuf = b''
127 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000128 self.lock = threading.Lock()
129 self.wake_up = wake_up
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900130 self.remaining = False
David Reiss74421272008-11-07 23:09:31 +0000131
132 @socket_exception
133 def read(self):
134 """Reads data from stream and switch state."""
135 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900136 assert not self.received
137 buf_size = 8192
138 first = True
139 done = False
140 while not done:
141 read = self.socket.recv(buf_size)
142 rlen = len(read)
143 done = rlen < buf_size
144 self._rbuf += read
145 if first and rlen == 0:
146 if self.status != WAIT_LEN or self._rbuf:
147 logger.error('could not read frame from socket')
148 else:
149 logger.debug('read zero length. client might have disconnected')
David Reiss74421272008-11-07 23:09:31 +0000150 self.close()
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900151 while len(self._rbuf) >= self._reading.end:
152 if self._reading.is_header:
153 mlen, = struct.unpack('!i', self._rbuf[:4])
shangxub664cfe2020-11-13 18:03:01 +0800154 if mlen < 0:
155 logger.error('could not read the head from frame')
156 self.close()
157 break
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900158 self._reading = Message(self._reading.end, mlen, False)
159 self.status = WAIT_MESSAGE
160 else:
161 self._reading.buffer = self._rbuf
162 self.received.append(self._reading)
163 self._rbuf = self._rbuf[self._reading.end:]
164 self._reading = Message(0, 4, True)
165 first = False
166 if self.received:
David Reiss74421272008-11-07 23:09:31 +0000167 self.status = WAIT_PROCESS
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900168 break
169 self.remaining = not done
David Reiss74421272008-11-07 23:09:31 +0000170
171 @socket_exception
172 def write(self):
173 """Writes data from socket and switch state."""
174 assert self.status == SEND_ANSWER
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900175 sent = self.socket.send(self._wbuf)
176 if sent == len(self._wbuf):
David Reiss74421272008-11-07 23:09:31 +0000177 self.status = WAIT_LEN
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900178 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000179 self.len = 0
180 else:
Yubing Dong (Tom)00646bb2018-01-18 23:55:24 -0800181 self._wbuf = self._wbuf[sent:]
David Reiss74421272008-11-07 23:09:31 +0000182
183 @locked
184 def ready(self, all_ok, message):
185 """Callback function for switching state and waking up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000186
David Reiss74421272008-11-07 23:09:31 +0000187 This function is the only function witch can be called asynchronous.
Bryan Duxbury69720412012-01-03 17:32:30 +0000188
David Reiss74421272008-11-07 23:09:31 +0000189 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000190 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000191 SEND_ANSWER if request was processed in normal way.
192 CLOSED if request throws unexpected exception.
Bryan Duxbury69720412012-01-03 17:32:30 +0000193
David Reiss74421272008-11-07 23:09:31 +0000194 The one wakes up main thread.
195 """
196 assert self.status == WAIT_PROCESS
197 if not all_ok:
198 self.close()
199 self.wake_up()
200 return
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900201 self.len = 0
David Reiss74421272008-11-07 23:09:31 +0000202 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000203 # it was a oneway request, do not write answer
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900204 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000205 self.status = WAIT_LEN
206 else:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900207 self._wbuf = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000208 self.status = SEND_ANSWER
209 self.wake_up()
210
211 @locked
212 def is_writeable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000213 """Return True if connection should be added to write list of select"""
David Reiss74421272008-11-07 23:09:31 +0000214 return self.status == SEND_ANSWER
215
216 # it's not necessary, but...
217 @locked
218 def is_readable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000219 """Return True if connection should be added to read list of select"""
David Reiss74421272008-11-07 23:09:31 +0000220 return self.status in (WAIT_LEN, WAIT_MESSAGE)
221
222 @locked
223 def is_closed(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000224 """Returns True if connection is closed."""
David Reiss74421272008-11-07 23:09:31 +0000225 return self.status == CLOSED
226
227 def fileno(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000228 """Returns the file descriptor of the associated socket."""
David Reiss74421272008-11-07 23:09:31 +0000229 return self.socket.fileno()
230
231 def close(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000232 """Closes connection"""
David Reiss74421272008-11-07 23:09:31 +0000233 self.status = CLOSED
234 self.socket.close()
235
Bryan Duxbury69720412012-01-03 17:32:30 +0000236
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900237class TNonblockingServer(object):
David Reiss74421272008-11-07 23:09:31 +0000238 """Non-blocking server."""
Bryan Duxbury69720412012-01-03 17:32:30 +0000239
240 def __init__(self,
241 processor,
242 lsocket,
243 inputProtocolFactory=None,
244 outputProtocolFactory=None,
245 threads=10):
David Reiss74421272008-11-07 23:09:31 +0000246 self.processor = processor
247 self.socket = lsocket
248 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
249 self.out_protocol = outputProtocolFactory or self.in_protocol
250 self.threads = int(threads)
251 self.clients = {}
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900252 self.tasks = queue.Queue()
David Reiss74421272008-11-07 23:09:31 +0000253 self._read, self._write = socket.socketpair()
254 self.prepared = False
Roger Meiercfff8562012-04-13 14:24:55 +0000255 self._stop = False
Yiyang Zhou88a45ac2021-08-04 21:55:04 +0800256 self.poll = select.poll() if hasattr(select, 'poll') else None
David Reiss74421272008-11-07 23:09:31 +0000257
258 def setNumThreads(self, num):
259 """Set the number of worker threads that should be created."""
260 # implement ThreadPool interface
Bryan Duxbury69720412012-01-03 17:32:30 +0000261 assert not self.prepared, "Can't change number of threads after start"
David Reiss74421272008-11-07 23:09:31 +0000262 self.threads = num
263
264 def prepare(self):
265 """Prepares server for serve requests."""
Roger Meiercfff8562012-04-13 14:24:55 +0000266 if self.prepared:
267 return
David Reiss74421272008-11-07 23:09:31 +0000268 self.socket.listen()
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900269 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000270 thread = Worker(self.tasks)
Jiayu Liu6f339002023-04-20 07:39:35 +0800271 thread.daemon = True
David Reiss74421272008-11-07 23:09:31 +0000272 thread.start()
273 self.prepared = True
274
275 def wake_up(self):
276 """Wake up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000277
Konrad Grochowski3b5dacb2014-11-24 10:55:31 +0100278 The server usually waits in select call in we should terminate one.
David Reiss74421272008-11-07 23:09:31 +0000279 The simplest way is using socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000280
David Reiss74421272008-11-07 23:09:31 +0000281 Select always wait to read from the first socket of socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000282
David Reiss74421272008-11-07 23:09:31 +0000283 In this case, we can just write anything to the second socket from
Bryan Duxbury69720412012-01-03 17:32:30 +0000284 socketpair.
285 """
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900286 self._write.send(b'1')
David Reiss74421272008-11-07 23:09:31 +0000287
Roger Meiercfff8562012-04-13 14:24:55 +0000288 def stop(self):
289 """Stop the server.
290
291 This method causes the serve() method to return. stop() may be invoked
292 from within your handler, or from another thread.
293
294 After stop() is called, serve() will return but the server will still
295 be listening on the socket. serve() may then be called again to resume
296 processing requests. Alternatively, close() may be called after
297 serve() returns to close the server socket and shutdown all worker
298 threads.
299 """
300 self._stop = True
301 self.wake_up()
302
David Reiss74421272008-11-07 23:09:31 +0000303 def _select(self):
304 """Does select on open connections."""
305 readable = [self.socket.handle.fileno(), self._read.fileno()]
306 writable = []
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900307 remaining = []
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900308 for i, connection in list(self.clients.items()):
David Reiss74421272008-11-07 23:09:31 +0000309 if connection.is_readable():
310 readable.append(connection.fileno())
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900311 if connection.remaining or connection.received:
312 remaining.append(connection.fileno())
David Reiss74421272008-11-07 23:09:31 +0000313 if connection.is_writeable():
314 writable.append(connection.fileno())
315 if connection.is_closed():
316 del self.clients[i]
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900317 if remaining:
318 return remaining, [], [], False
319 else:
320 return select.select(readable, writable, readable) + (True,)
Bryan Duxbury69720412012-01-03 17:32:30 +0000321
Yiyang Zhou88a45ac2021-08-04 21:55:04 +0800322 def _poll_select(self):
323 """Does poll on open connections, if available."""
324 remaining = []
325
326 self.poll.register(self.socket.handle.fileno(), select.POLLIN | select.POLLRDNORM)
327 self.poll.register(self._read.fileno(), select.POLLIN | select.POLLRDNORM)
328
329 for i, connection in list(self.clients.items()):
330 if connection.is_readable():
331 self.poll.register(connection.fileno(), select.POLLIN | select.POLLRDNORM | select.POLLERR | select.POLLHUP | select.POLLNVAL)
332 if connection.remaining or connection.received:
333 remaining.append(connection.fileno())
334 if connection.is_writeable():
335 self.poll.register(connection.fileno(), select.POLLOUT | select.POLLWRNORM)
336 if connection.is_closed():
337 try:
338 self.poll.unregister(i)
339 except KeyError:
340 logger.debug("KeyError in unregistering connections...")
341 del self.clients[i]
342 if remaining:
343 return remaining, [], [], False
344
345 rlist = []
346 wlist = []
347 xlist = []
348 pollres = self.poll.poll()
349 for fd, event in pollres:
350 if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
351 xlist.append(fd)
352 elif event & (select.POLLOUT | select.POLLWRNORM):
353 wlist.append(fd)
354 elif event & (select.POLLIN | select.POLLRDNORM):
355 rlist.append(fd)
356 else: # should be impossible
357 logger.debug("reached an impossible state in _poll_select")
358 xlist.append(fd)
359
360 return rlist, wlist, xlist, True
361
David Reiss74421272008-11-07 23:09:31 +0000362 def handle(self):
363 """Handle requests.
Bryan Duxbury69720412012-01-03 17:32:30 +0000364
365 WARNING! You must call prepare() BEFORE calling handle()
David Reiss74421272008-11-07 23:09:31 +0000366 """
367 assert self.prepared, "You have to call prepare before handle"
Yiyang Zhou88a45ac2021-08-04 21:55:04 +0800368 rset, wset, xset, selected = self._select() if not self.poll else self._poll_select()
David Reiss74421272008-11-07 23:09:31 +0000369 for readable in rset:
370 if readable == self._read.fileno():
371 # don't care i just need to clean readable flag
Bryan Duxbury69720412012-01-03 17:32:30 +0000372 self._read.recv(1024)
David Reiss74421272008-11-07 23:09:31 +0000373 elif readable == self.socket.handle.fileno():
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900374 try:
375 client = self.socket.accept()
376 if client:
377 self.clients[client.handle.fileno()] = Connection(client.handle,
378 self.wake_up)
379 except socket.error:
380 logger.debug('error while accepting', exc_info=True)
David Reiss74421272008-11-07 23:09:31 +0000381 else:
382 connection = self.clients[readable]
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900383 if selected:
384 connection.read()
385 if connection.received:
386 connection.status = WAIT_PROCESS
Yiyang Zhou88a45ac2021-08-04 21:55:04 +0800387 if self.poll:
388 self.poll.unregister(connection.fileno())
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900389 msg = connection.received.popleft()
390 itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset)
David Reiss74421272008-11-07 23:09:31 +0000391 otransport = TTransport.TMemoryBuffer()
392 iprot = self.in_protocol.getProtocol(itransport)
393 oprot = self.out_protocol.getProtocol(otransport)
Bryan Duxbury69720412012-01-03 17:32:30 +0000394 self.tasks.put([self.processor, iprot, oprot,
David Reiss74421272008-11-07 23:09:31 +0000395 otransport, connection.ready])
396 for writeable in wset:
397 self.clients[writeable].write()
398 for oob in xset:
399 self.clients[oob].close()
David Reiss74421272008-11-07 23:09:31 +0000400
401 def close(self):
402 """Closes the server."""
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900403 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000404 self.tasks.put([None, None, None, None, None])
405 self.socket.close()
406 self.prepared = False
Bryan Duxbury69720412012-01-03 17:32:30 +0000407
David Reiss74421272008-11-07 23:09:31 +0000408 def serve(self):
Roger Meiercfff8562012-04-13 14:24:55 +0000409 """Serve requests.
410
411 Serve requests forever, or until stop() is called.
412 """
413 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000414 self.prepare()
Roger Meiercfff8562012-04-13 14:24:55 +0000415 while not self._stop:
David Reiss74421272008-11-07 23:09:31 +0000416 self.handle()