blob: ac064965110d42be0c1e54be612338f39011cda2 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
Bryan Duxbury69720412012-01-03 17:32:30 +000021The main idea of the server is to receive and send requests
22only from the main thread.
David Reiss74421272008-11-07 23:09:31 +000023
Bryan Duxbury69720412012-01-03 17:32:30 +000024The thread poool should be sized for concurrent tasks, not
25maximum connections
David Reiss74421272008-11-07 23:09:31 +000026"""
Konrad Grochowski3a724e32014-08-12 11:48:29 -040027
David Reiss74421272008-11-07 23:09:31 +000028import logging
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090029import select
30import socket
31import struct
32import threading
David Reiss74421272008-11-07 23:09:31 +000033
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090034from collections import deque
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090035from six.moves import queue
36
David Reiss74421272008-11-07 23:09:31 +000037from thrift.transport import TTransport
38from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
39
40__all__ = ['TNonblockingServer']
41
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090042logger = logging.getLogger(__name__)
43
Bryan Duxbury69720412012-01-03 17:32:30 +000044
David Reiss74421272008-11-07 23:09:31 +000045class Worker(threading.Thread):
46 """Worker is a small helper to process incoming connection."""
Bryan Duxbury69720412012-01-03 17:32:30 +000047
David Reiss74421272008-11-07 23:09:31 +000048 def __init__(self, queue):
49 threading.Thread.__init__(self)
50 self.queue = queue
51
52 def run(self):
53 """Process queries from task queue, stop if processor is None."""
54 while True:
55 try:
56 processor, iprot, oprot, otrans, callback = self.queue.get()
57 if processor is None:
58 break
59 processor.process(iprot, oprot)
60 callback(True, otrans.getvalue())
61 except Exception:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090062 logger.exception("Exception while processing request", exc_info=True)
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090063 callback(False, b'')
David Reiss74421272008-11-07 23:09:31 +000064
James E. King, III0ad20bd2017-09-30 15:44:16 -070065
David Reiss74421272008-11-07 23:09:31 +000066WAIT_LEN = 0
67WAIT_MESSAGE = 1
68WAIT_PROCESS = 2
69SEND_ANSWER = 3
70CLOSED = 4
71
Bryan Duxbury69720412012-01-03 17:32:30 +000072
David Reiss74421272008-11-07 23:09:31 +000073def locked(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000074 """Decorator which locks self.lock."""
David Reiss74421272008-11-07 23:09:31 +000075 def nested(self, *args, **kwargs):
76 self.lock.acquire()
77 try:
78 return func(self, *args, **kwargs)
79 finally:
80 self.lock.release()
81 return nested
82
Bryan Duxbury69720412012-01-03 17:32:30 +000083
David Reiss74421272008-11-07 23:09:31 +000084def socket_exception(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000085 """Decorator close object on socket.error."""
David Reiss74421272008-11-07 23:09:31 +000086 def read(self, *args, **kwargs):
87 try:
88 return func(self, *args, **kwargs)
89 except socket.error:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090090 logger.debug('ignoring socket exception', exc_info=True)
David Reiss74421272008-11-07 23:09:31 +000091 self.close()
92 return read
93
Bryan Duxbury69720412012-01-03 17:32:30 +000094
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090095class Message(object):
96 def __init__(self, offset, len_, header):
97 self.offset = offset
98 self.len = len_
99 self.buffer = None
100 self.is_header = header
101
102 @property
103 def end(self):
104 return self.offset + self.len
105
106
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900107class Connection(object):
David Reiss74421272008-11-07 23:09:31 +0000108 """Basic class is represented connection.
Bryan Duxbury69720412012-01-03 17:32:30 +0000109
David Reiss74421272008-11-07 23:09:31 +0000110 It can be in state:
111 WAIT_LEN --- connection is reading request len.
112 WAIT_MESSAGE --- connection is reading request.
Bryan Duxbury69720412012-01-03 17:32:30 +0000113 WAIT_PROCESS --- connection has just read whole request and
114 waits for call ready routine.
David Reiss74421272008-11-07 23:09:31 +0000115 SEND_ANSWER --- connection is sending answer string (including length
Bryan Duxbury69720412012-01-03 17:32:30 +0000116 of answer).
David Reiss74421272008-11-07 23:09:31 +0000117 CLOSED --- socket was closed and connection should be deleted.
118 """
119 def __init__(self, new_socket, wake_up):
120 self.socket = new_socket
121 self.socket.setblocking(False)
122 self.status = WAIT_LEN
123 self.len = 0
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900124 self.received = deque()
125 self._reading = Message(0, 4, True)
126 self._rbuf = b''
127 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000128 self.lock = threading.Lock()
129 self.wake_up = wake_up
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900130 self.remaining = False
David Reiss74421272008-11-07 23:09:31 +0000131
132 @socket_exception
133 def read(self):
134 """Reads data from stream and switch state."""
135 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900136 assert not self.received
137 buf_size = 8192
138 first = True
139 done = False
140 while not done:
141 read = self.socket.recv(buf_size)
142 rlen = len(read)
143 done = rlen < buf_size
144 self._rbuf += read
145 if first and rlen == 0:
146 if self.status != WAIT_LEN or self._rbuf:
147 logger.error('could not read frame from socket')
148 else:
149 logger.debug('read zero length. client might have disconnected')
David Reiss74421272008-11-07 23:09:31 +0000150 self.close()
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900151 while len(self._rbuf) >= self._reading.end:
152 if self._reading.is_header:
153 mlen, = struct.unpack('!i', self._rbuf[:4])
shangxub664cfe2020-11-13 18:03:01 +0800154 if mlen < 0:
155 logger.error('could not read the head from frame')
156 self.close()
157 break
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900158 self._reading = Message(self._reading.end, mlen, False)
159 self.status = WAIT_MESSAGE
160 else:
161 self._reading.buffer = self._rbuf
162 self.received.append(self._reading)
163 self._rbuf = self._rbuf[self._reading.end:]
164 self._reading = Message(0, 4, True)
165 first = False
166 if self.received:
David Reiss74421272008-11-07 23:09:31 +0000167 self.status = WAIT_PROCESS
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900168 break
169 self.remaining = not done
David Reiss74421272008-11-07 23:09:31 +0000170
171 @socket_exception
172 def write(self):
173 """Writes data from socket and switch state."""
174 assert self.status == SEND_ANSWER
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900175 sent = self.socket.send(self._wbuf)
176 if sent == len(self._wbuf):
David Reiss74421272008-11-07 23:09:31 +0000177 self.status = WAIT_LEN
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900178 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000179 self.len = 0
180 else:
Yubing Dong (Tom)00646bb2018-01-18 23:55:24 -0800181 self._wbuf = self._wbuf[sent:]
David Reiss74421272008-11-07 23:09:31 +0000182
183 @locked
184 def ready(self, all_ok, message):
185 """Callback function for switching state and waking up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000186
David Reiss74421272008-11-07 23:09:31 +0000187 This function is the only function witch can be called asynchronous.
Bryan Duxbury69720412012-01-03 17:32:30 +0000188
David Reiss74421272008-11-07 23:09:31 +0000189 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000190 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000191 SEND_ANSWER if request was processed in normal way.
192 CLOSED if request throws unexpected exception.
Bryan Duxbury69720412012-01-03 17:32:30 +0000193
David Reiss74421272008-11-07 23:09:31 +0000194 The one wakes up main thread.
195 """
196 assert self.status == WAIT_PROCESS
197 if not all_ok:
198 self.close()
199 self.wake_up()
200 return
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900201 self.len = 0
David Reiss74421272008-11-07 23:09:31 +0000202 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000203 # it was a oneway request, do not write answer
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900204 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000205 self.status = WAIT_LEN
206 else:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900207 self._wbuf = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000208 self.status = SEND_ANSWER
209 self.wake_up()
210
211 @locked
212 def is_writeable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000213 """Return True if connection should be added to write list of select"""
David Reiss74421272008-11-07 23:09:31 +0000214 return self.status == SEND_ANSWER
215
216 # it's not necessary, but...
217 @locked
218 def is_readable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000219 """Return True if connection should be added to read list of select"""
David Reiss74421272008-11-07 23:09:31 +0000220 return self.status in (WAIT_LEN, WAIT_MESSAGE)
221
222 @locked
223 def is_closed(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000224 """Returns True if connection is closed."""
David Reiss74421272008-11-07 23:09:31 +0000225 return self.status == CLOSED
226
227 def fileno(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000228 """Returns the file descriptor of the associated socket."""
David Reiss74421272008-11-07 23:09:31 +0000229 return self.socket.fileno()
230
231 def close(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000232 """Closes connection"""
David Reiss74421272008-11-07 23:09:31 +0000233 self.status = CLOSED
234 self.socket.close()
235
Bryan Duxbury69720412012-01-03 17:32:30 +0000236
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900237class TNonblockingServer(object):
David Reiss74421272008-11-07 23:09:31 +0000238 """Non-blocking server."""
Bryan Duxbury69720412012-01-03 17:32:30 +0000239
240 def __init__(self,
241 processor,
242 lsocket,
243 inputProtocolFactory=None,
244 outputProtocolFactory=None,
245 threads=10):
David Reiss74421272008-11-07 23:09:31 +0000246 self.processor = processor
247 self.socket = lsocket
248 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
249 self.out_protocol = outputProtocolFactory or self.in_protocol
250 self.threads = int(threads)
251 self.clients = {}
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900252 self.tasks = queue.Queue()
David Reiss74421272008-11-07 23:09:31 +0000253 self._read, self._write = socket.socketpair()
254 self.prepared = False
Roger Meiercfff8562012-04-13 14:24:55 +0000255 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000256
257 def setNumThreads(self, num):
258 """Set the number of worker threads that should be created."""
259 # implement ThreadPool interface
Bryan Duxbury69720412012-01-03 17:32:30 +0000260 assert not self.prepared, "Can't change number of threads after start"
David Reiss74421272008-11-07 23:09:31 +0000261 self.threads = num
262
263 def prepare(self):
264 """Prepares server for serve requests."""
Roger Meiercfff8562012-04-13 14:24:55 +0000265 if self.prepared:
266 return
David Reiss74421272008-11-07 23:09:31 +0000267 self.socket.listen()
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900268 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000269 thread = Worker(self.tasks)
270 thread.setDaemon(True)
271 thread.start()
272 self.prepared = True
273
274 def wake_up(self):
275 """Wake up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000276
Konrad Grochowski3b5dacb2014-11-24 10:55:31 +0100277 The server usually waits in select call in we should terminate one.
David Reiss74421272008-11-07 23:09:31 +0000278 The simplest way is using socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000279
David Reiss74421272008-11-07 23:09:31 +0000280 Select always wait to read from the first socket of socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000281
David Reiss74421272008-11-07 23:09:31 +0000282 In this case, we can just write anything to the second socket from
Bryan Duxbury69720412012-01-03 17:32:30 +0000283 socketpair.
284 """
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900285 self._write.send(b'1')
David Reiss74421272008-11-07 23:09:31 +0000286
Roger Meiercfff8562012-04-13 14:24:55 +0000287 def stop(self):
288 """Stop the server.
289
290 This method causes the serve() method to return. stop() may be invoked
291 from within your handler, or from another thread.
292
293 After stop() is called, serve() will return but the server will still
294 be listening on the socket. serve() may then be called again to resume
295 processing requests. Alternatively, close() may be called after
296 serve() returns to close the server socket and shutdown all worker
297 threads.
298 """
299 self._stop = True
300 self.wake_up()
301
David Reiss74421272008-11-07 23:09:31 +0000302 def _select(self):
303 """Does select on open connections."""
304 readable = [self.socket.handle.fileno(), self._read.fileno()]
305 writable = []
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900306 remaining = []
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900307 for i, connection in list(self.clients.items()):
David Reiss74421272008-11-07 23:09:31 +0000308 if connection.is_readable():
309 readable.append(connection.fileno())
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900310 if connection.remaining or connection.received:
311 remaining.append(connection.fileno())
David Reiss74421272008-11-07 23:09:31 +0000312 if connection.is_writeable():
313 writable.append(connection.fileno())
314 if connection.is_closed():
315 del self.clients[i]
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900316 if remaining:
317 return remaining, [], [], False
318 else:
319 return select.select(readable, writable, readable) + (True,)
Bryan Duxbury69720412012-01-03 17:32:30 +0000320
David Reiss74421272008-11-07 23:09:31 +0000321 def handle(self):
322 """Handle requests.
Bryan Duxbury69720412012-01-03 17:32:30 +0000323
324 WARNING! You must call prepare() BEFORE calling handle()
David Reiss74421272008-11-07 23:09:31 +0000325 """
326 assert self.prepared, "You have to call prepare before handle"
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900327 rset, wset, xset, selected = self._select()
David Reiss74421272008-11-07 23:09:31 +0000328 for readable in rset:
329 if readable == self._read.fileno():
330 # don't care i just need to clean readable flag
Bryan Duxbury69720412012-01-03 17:32:30 +0000331 self._read.recv(1024)
David Reiss74421272008-11-07 23:09:31 +0000332 elif readable == self.socket.handle.fileno():
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900333 try:
334 client = self.socket.accept()
335 if client:
336 self.clients[client.handle.fileno()] = Connection(client.handle,
337 self.wake_up)
338 except socket.error:
339 logger.debug('error while accepting', exc_info=True)
David Reiss74421272008-11-07 23:09:31 +0000340 else:
341 connection = self.clients[readable]
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900342 if selected:
343 connection.read()
344 if connection.received:
345 connection.status = WAIT_PROCESS
346 msg = connection.received.popleft()
347 itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset)
David Reiss74421272008-11-07 23:09:31 +0000348 otransport = TTransport.TMemoryBuffer()
349 iprot = self.in_protocol.getProtocol(itransport)
350 oprot = self.out_protocol.getProtocol(otransport)
Bryan Duxbury69720412012-01-03 17:32:30 +0000351 self.tasks.put([self.processor, iprot, oprot,
David Reiss74421272008-11-07 23:09:31 +0000352 otransport, connection.ready])
353 for writeable in wset:
354 self.clients[writeable].write()
355 for oob in xset:
356 self.clients[oob].close()
357 del self.clients[oob]
358
359 def close(self):
360 """Closes the server."""
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900361 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000362 self.tasks.put([None, None, None, None, None])
363 self.socket.close()
364 self.prepared = False
Bryan Duxbury69720412012-01-03 17:32:30 +0000365
David Reiss74421272008-11-07 23:09:31 +0000366 def serve(self):
Roger Meiercfff8562012-04-13 14:24:55 +0000367 """Serve requests.
368
369 Serve requests forever, or until stop() is called.
370 """
371 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000372 self.prepare()
Roger Meiercfff8562012-04-13 14:24:55 +0000373 while not self._stop:
David Reiss74421272008-11-07 23:09:31 +0000374 self.handle()