blob: f62d486ebc8b50bf4c79a0c21926eb5ee52edf07 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
Bryan Duxbury69720412012-01-03 17:32:30 +000021The main idea of the server is to receive and send requests
22only from the main thread.
David Reiss74421272008-11-07 23:09:31 +000023
Bryan Duxbury69720412012-01-03 17:32:30 +000024The thread poool should be sized for concurrent tasks, not
25maximum connections
David Reiss74421272008-11-07 23:09:31 +000026"""
Konrad Grochowski3a724e32014-08-12 11:48:29 -040027
David Reiss74421272008-11-07 23:09:31 +000028import logging
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090029import select
30import socket
31import struct
32import threading
David Reiss74421272008-11-07 23:09:31 +000033
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090034from collections import deque
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090035from six.moves import queue
36
David Reiss74421272008-11-07 23:09:31 +000037from thrift.transport import TTransport
38from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
39
40__all__ = ['TNonblockingServer']
41
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090042logger = logging.getLogger(__name__)
43
Bryan Duxbury69720412012-01-03 17:32:30 +000044
David Reiss74421272008-11-07 23:09:31 +000045class Worker(threading.Thread):
46 """Worker is a small helper to process incoming connection."""
Bryan Duxbury69720412012-01-03 17:32:30 +000047
David Reiss74421272008-11-07 23:09:31 +000048 def __init__(self, queue):
49 threading.Thread.__init__(self)
50 self.queue = queue
51
52 def run(self):
53 """Process queries from task queue, stop if processor is None."""
54 while True:
55 try:
56 processor, iprot, oprot, otrans, callback = self.queue.get()
57 if processor is None:
58 break
59 processor.process(iprot, oprot)
60 callback(True, otrans.getvalue())
61 except Exception:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090062 logger.exception("Exception while processing request", exc_info=True)
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090063 callback(False, b'')
David Reiss74421272008-11-07 23:09:31 +000064
James E. King, III0ad20bd2017-09-30 15:44:16 -070065
David Reiss74421272008-11-07 23:09:31 +000066WAIT_LEN = 0
67WAIT_MESSAGE = 1
68WAIT_PROCESS = 2
69SEND_ANSWER = 3
70CLOSED = 4
71
Bryan Duxbury69720412012-01-03 17:32:30 +000072
David Reiss74421272008-11-07 23:09:31 +000073def locked(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000074 """Decorator which locks self.lock."""
David Reiss74421272008-11-07 23:09:31 +000075 def nested(self, *args, **kwargs):
76 self.lock.acquire()
77 try:
78 return func(self, *args, **kwargs)
79 finally:
80 self.lock.release()
81 return nested
82
Bryan Duxbury69720412012-01-03 17:32:30 +000083
David Reiss74421272008-11-07 23:09:31 +000084def socket_exception(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000085 """Decorator close object on socket.error."""
David Reiss74421272008-11-07 23:09:31 +000086 def read(self, *args, **kwargs):
87 try:
88 return func(self, *args, **kwargs)
89 except socket.error:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090090 logger.debug('ignoring socket exception', exc_info=True)
David Reiss74421272008-11-07 23:09:31 +000091 self.close()
92 return read
93
Bryan Duxbury69720412012-01-03 17:32:30 +000094
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090095class Message(object):
96 def __init__(self, offset, len_, header):
97 self.offset = offset
98 self.len = len_
99 self.buffer = None
100 self.is_header = header
101
102 @property
103 def end(self):
104 return self.offset + self.len
105
106
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900107class Connection(object):
David Reiss74421272008-11-07 23:09:31 +0000108 """Basic class is represented connection.
Bryan Duxbury69720412012-01-03 17:32:30 +0000109
David Reiss74421272008-11-07 23:09:31 +0000110 It can be in state:
111 WAIT_LEN --- connection is reading request len.
112 WAIT_MESSAGE --- connection is reading request.
Bryan Duxbury69720412012-01-03 17:32:30 +0000113 WAIT_PROCESS --- connection has just read whole request and
114 waits for call ready routine.
David Reiss74421272008-11-07 23:09:31 +0000115 SEND_ANSWER --- connection is sending answer string (including length
Bryan Duxbury69720412012-01-03 17:32:30 +0000116 of answer).
David Reiss74421272008-11-07 23:09:31 +0000117 CLOSED --- socket was closed and connection should be deleted.
118 """
119 def __init__(self, new_socket, wake_up):
120 self.socket = new_socket
121 self.socket.setblocking(False)
122 self.status = WAIT_LEN
123 self.len = 0
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900124 self.received = deque()
125 self._reading = Message(0, 4, True)
126 self._rbuf = b''
127 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000128 self.lock = threading.Lock()
129 self.wake_up = wake_up
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900130 self.remaining = False
David Reiss74421272008-11-07 23:09:31 +0000131
132 @socket_exception
133 def read(self):
134 """Reads data from stream and switch state."""
135 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900136 assert not self.received
137 buf_size = 8192
138 first = True
139 done = False
140 while not done:
141 read = self.socket.recv(buf_size)
142 rlen = len(read)
143 done = rlen < buf_size
144 self._rbuf += read
145 if first and rlen == 0:
146 if self.status != WAIT_LEN or self._rbuf:
147 logger.error('could not read frame from socket')
148 else:
149 logger.debug('read zero length. client might have disconnected')
David Reiss74421272008-11-07 23:09:31 +0000150 self.close()
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900151 while len(self._rbuf) >= self._reading.end:
152 if self._reading.is_header:
153 mlen, = struct.unpack('!i', self._rbuf[:4])
154 self._reading = Message(self._reading.end, mlen, False)
155 self.status = WAIT_MESSAGE
156 else:
157 self._reading.buffer = self._rbuf
158 self.received.append(self._reading)
159 self._rbuf = self._rbuf[self._reading.end:]
160 self._reading = Message(0, 4, True)
161 first = False
162 if self.received:
David Reiss74421272008-11-07 23:09:31 +0000163 self.status = WAIT_PROCESS
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900164 break
165 self.remaining = not done
David Reiss74421272008-11-07 23:09:31 +0000166
167 @socket_exception
168 def write(self):
169 """Writes data from socket and switch state."""
170 assert self.status == SEND_ANSWER
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900171 sent = self.socket.send(self._wbuf)
172 if sent == len(self._wbuf):
David Reiss74421272008-11-07 23:09:31 +0000173 self.status = WAIT_LEN
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900174 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000175 self.len = 0
176 else:
Yubing Dong (Tom)00646bb2018-01-18 23:55:24 -0800177 self._wbuf = self._wbuf[sent:]
David Reiss74421272008-11-07 23:09:31 +0000178
179 @locked
180 def ready(self, all_ok, message):
181 """Callback function for switching state and waking up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000182
David Reiss74421272008-11-07 23:09:31 +0000183 This function is the only function witch can be called asynchronous.
Bryan Duxbury69720412012-01-03 17:32:30 +0000184
David Reiss74421272008-11-07 23:09:31 +0000185 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000186 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000187 SEND_ANSWER if request was processed in normal way.
188 CLOSED if request throws unexpected exception.
Bryan Duxbury69720412012-01-03 17:32:30 +0000189
David Reiss74421272008-11-07 23:09:31 +0000190 The one wakes up main thread.
191 """
192 assert self.status == WAIT_PROCESS
193 if not all_ok:
194 self.close()
195 self.wake_up()
196 return
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900197 self.len = 0
David Reiss74421272008-11-07 23:09:31 +0000198 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000199 # it was a oneway request, do not write answer
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900200 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000201 self.status = WAIT_LEN
202 else:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900203 self._wbuf = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000204 self.status = SEND_ANSWER
205 self.wake_up()
206
207 @locked
208 def is_writeable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000209 """Return True if connection should be added to write list of select"""
David Reiss74421272008-11-07 23:09:31 +0000210 return self.status == SEND_ANSWER
211
212 # it's not necessary, but...
213 @locked
214 def is_readable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000215 """Return True if connection should be added to read list of select"""
David Reiss74421272008-11-07 23:09:31 +0000216 return self.status in (WAIT_LEN, WAIT_MESSAGE)
217
218 @locked
219 def is_closed(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000220 """Returns True if connection is closed."""
David Reiss74421272008-11-07 23:09:31 +0000221 return self.status == CLOSED
222
223 def fileno(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000224 """Returns the file descriptor of the associated socket."""
David Reiss74421272008-11-07 23:09:31 +0000225 return self.socket.fileno()
226
227 def close(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000228 """Closes connection"""
David Reiss74421272008-11-07 23:09:31 +0000229 self.status = CLOSED
230 self.socket.close()
231
Bryan Duxbury69720412012-01-03 17:32:30 +0000232
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900233class TNonblockingServer(object):
David Reiss74421272008-11-07 23:09:31 +0000234 """Non-blocking server."""
Bryan Duxbury69720412012-01-03 17:32:30 +0000235
236 def __init__(self,
237 processor,
238 lsocket,
239 inputProtocolFactory=None,
240 outputProtocolFactory=None,
241 threads=10):
David Reiss74421272008-11-07 23:09:31 +0000242 self.processor = processor
243 self.socket = lsocket
244 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
245 self.out_protocol = outputProtocolFactory or self.in_protocol
246 self.threads = int(threads)
247 self.clients = {}
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900248 self.tasks = queue.Queue()
David Reiss74421272008-11-07 23:09:31 +0000249 self._read, self._write = socket.socketpair()
250 self.prepared = False
Roger Meiercfff8562012-04-13 14:24:55 +0000251 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000252
253 def setNumThreads(self, num):
254 """Set the number of worker threads that should be created."""
255 # implement ThreadPool interface
Bryan Duxbury69720412012-01-03 17:32:30 +0000256 assert not self.prepared, "Can't change number of threads after start"
David Reiss74421272008-11-07 23:09:31 +0000257 self.threads = num
258
259 def prepare(self):
260 """Prepares server for serve requests."""
Roger Meiercfff8562012-04-13 14:24:55 +0000261 if self.prepared:
262 return
David Reiss74421272008-11-07 23:09:31 +0000263 self.socket.listen()
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900264 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000265 thread = Worker(self.tasks)
266 thread.setDaemon(True)
267 thread.start()
268 self.prepared = True
269
270 def wake_up(self):
271 """Wake up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000272
Konrad Grochowski3b5dacb2014-11-24 10:55:31 +0100273 The server usually waits in select call in we should terminate one.
David Reiss74421272008-11-07 23:09:31 +0000274 The simplest way is using socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000275
David Reiss74421272008-11-07 23:09:31 +0000276 Select always wait to read from the first socket of socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000277
David Reiss74421272008-11-07 23:09:31 +0000278 In this case, we can just write anything to the second socket from
Bryan Duxbury69720412012-01-03 17:32:30 +0000279 socketpair.
280 """
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900281 self._write.send(b'1')
David Reiss74421272008-11-07 23:09:31 +0000282
Roger Meiercfff8562012-04-13 14:24:55 +0000283 def stop(self):
284 """Stop the server.
285
286 This method causes the serve() method to return. stop() may be invoked
287 from within your handler, or from another thread.
288
289 After stop() is called, serve() will return but the server will still
290 be listening on the socket. serve() may then be called again to resume
291 processing requests. Alternatively, close() may be called after
292 serve() returns to close the server socket and shutdown all worker
293 threads.
294 """
295 self._stop = True
296 self.wake_up()
297
David Reiss74421272008-11-07 23:09:31 +0000298 def _select(self):
299 """Does select on open connections."""
300 readable = [self.socket.handle.fileno(), self._read.fileno()]
301 writable = []
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900302 remaining = []
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900303 for i, connection in list(self.clients.items()):
David Reiss74421272008-11-07 23:09:31 +0000304 if connection.is_readable():
305 readable.append(connection.fileno())
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900306 if connection.remaining or connection.received:
307 remaining.append(connection.fileno())
David Reiss74421272008-11-07 23:09:31 +0000308 if connection.is_writeable():
309 writable.append(connection.fileno())
310 if connection.is_closed():
311 del self.clients[i]
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900312 if remaining:
313 return remaining, [], [], False
314 else:
315 return select.select(readable, writable, readable) + (True,)
Bryan Duxbury69720412012-01-03 17:32:30 +0000316
David Reiss74421272008-11-07 23:09:31 +0000317 def handle(self):
318 """Handle requests.
Bryan Duxbury69720412012-01-03 17:32:30 +0000319
320 WARNING! You must call prepare() BEFORE calling handle()
David Reiss74421272008-11-07 23:09:31 +0000321 """
322 assert self.prepared, "You have to call prepare before handle"
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900323 rset, wset, xset, selected = self._select()
David Reiss74421272008-11-07 23:09:31 +0000324 for readable in rset:
325 if readable == self._read.fileno():
326 # don't care i just need to clean readable flag
Bryan Duxbury69720412012-01-03 17:32:30 +0000327 self._read.recv(1024)
David Reiss74421272008-11-07 23:09:31 +0000328 elif readable == self.socket.handle.fileno():
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900329 try:
330 client = self.socket.accept()
331 if client:
332 self.clients[client.handle.fileno()] = Connection(client.handle,
333 self.wake_up)
334 except socket.error:
335 logger.debug('error while accepting', exc_info=True)
David Reiss74421272008-11-07 23:09:31 +0000336 else:
337 connection = self.clients[readable]
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900338 if selected:
339 connection.read()
340 if connection.received:
341 connection.status = WAIT_PROCESS
342 msg = connection.received.popleft()
343 itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset)
David Reiss74421272008-11-07 23:09:31 +0000344 otransport = TTransport.TMemoryBuffer()
345 iprot = self.in_protocol.getProtocol(itransport)
346 oprot = self.out_protocol.getProtocol(otransport)
Bryan Duxbury69720412012-01-03 17:32:30 +0000347 self.tasks.put([self.processor, iprot, oprot,
David Reiss74421272008-11-07 23:09:31 +0000348 otransport, connection.ready])
349 for writeable in wset:
350 self.clients[writeable].write()
351 for oob in xset:
352 self.clients[oob].close()
353 del self.clients[oob]
354
355 def close(self):
356 """Closes the server."""
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900357 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000358 self.tasks.put([None, None, None, None, None])
359 self.socket.close()
360 self.prepared = False
Bryan Duxbury69720412012-01-03 17:32:30 +0000361
David Reiss74421272008-11-07 23:09:31 +0000362 def serve(self):
Roger Meiercfff8562012-04-13 14:24:55 +0000363 """Serve requests.
364
365 Serve requests forever, or until stop() is called.
366 """
367 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000368 self.prepare()
Roger Meiercfff8562012-04-13 14:24:55 +0000369 while not self._stop:
David Reiss74421272008-11-07 23:09:31 +0000370 self.handle()