blob: 67ee04ed5c1d3d3f1929f24018036a0c30ef55ef [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
Bryan Duxbury69720412012-01-03 17:32:30 +000021The main idea of the server is to receive and send requests
22only from the main thread.
David Reiss74421272008-11-07 23:09:31 +000023
Bryan Duxbury69720412012-01-03 17:32:30 +000024The thread poool should be sized for concurrent tasks, not
25maximum connections
David Reiss74421272008-11-07 23:09:31 +000026"""
Konrad Grochowski3a724e32014-08-12 11:48:29 -040027
David Reiss74421272008-11-07 23:09:31 +000028import logging
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090029import select
30import socket
31import struct
32import threading
David Reiss74421272008-11-07 23:09:31 +000033
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090034from collections import deque
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090035from six.moves import queue
36
David Reiss74421272008-11-07 23:09:31 +000037from thrift.transport import TTransport
38from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
39
40__all__ = ['TNonblockingServer']
41
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090042logger = logging.getLogger(__name__)
43
Bryan Duxbury69720412012-01-03 17:32:30 +000044
David Reiss74421272008-11-07 23:09:31 +000045class Worker(threading.Thread):
46 """Worker is a small helper to process incoming connection."""
Bryan Duxbury69720412012-01-03 17:32:30 +000047
David Reiss74421272008-11-07 23:09:31 +000048 def __init__(self, queue):
49 threading.Thread.__init__(self)
50 self.queue = queue
51
52 def run(self):
53 """Process queries from task queue, stop if processor is None."""
54 while True:
55 try:
56 processor, iprot, oprot, otrans, callback = self.queue.get()
57 if processor is None:
58 break
59 processor.process(iprot, oprot)
60 callback(True, otrans.getvalue())
61 except Exception:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090062 logger.exception("Exception while processing request", exc_info=True)
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090063 callback(False, b'')
David Reiss74421272008-11-07 23:09:31 +000064
65WAIT_LEN = 0
66WAIT_MESSAGE = 1
67WAIT_PROCESS = 2
68SEND_ANSWER = 3
69CLOSED = 4
70
Bryan Duxbury69720412012-01-03 17:32:30 +000071
David Reiss74421272008-11-07 23:09:31 +000072def locked(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000073 """Decorator which locks self.lock."""
David Reiss74421272008-11-07 23:09:31 +000074 def nested(self, *args, **kwargs):
75 self.lock.acquire()
76 try:
77 return func(self, *args, **kwargs)
78 finally:
79 self.lock.release()
80 return nested
81
Bryan Duxbury69720412012-01-03 17:32:30 +000082
David Reiss74421272008-11-07 23:09:31 +000083def socket_exception(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000084 """Decorator close object on socket.error."""
David Reiss74421272008-11-07 23:09:31 +000085 def read(self, *args, **kwargs):
86 try:
87 return func(self, *args, **kwargs)
88 except socket.error:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090089 logger.debug('ignoring socket exception', exc_info=True)
David Reiss74421272008-11-07 23:09:31 +000090 self.close()
91 return read
92
Bryan Duxbury69720412012-01-03 17:32:30 +000093
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +090094class Message(object):
95 def __init__(self, offset, len_, header):
96 self.offset = offset
97 self.len = len_
98 self.buffer = None
99 self.is_header = header
100
101 @property
102 def end(self):
103 return self.offset + self.len
104
105
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900106class Connection(object):
David Reiss74421272008-11-07 23:09:31 +0000107 """Basic class is represented connection.
Bryan Duxbury69720412012-01-03 17:32:30 +0000108
David Reiss74421272008-11-07 23:09:31 +0000109 It can be in state:
110 WAIT_LEN --- connection is reading request len.
111 WAIT_MESSAGE --- connection is reading request.
Bryan Duxbury69720412012-01-03 17:32:30 +0000112 WAIT_PROCESS --- connection has just read whole request and
113 waits for call ready routine.
David Reiss74421272008-11-07 23:09:31 +0000114 SEND_ANSWER --- connection is sending answer string (including length
Bryan Duxbury69720412012-01-03 17:32:30 +0000115 of answer).
David Reiss74421272008-11-07 23:09:31 +0000116 CLOSED --- socket was closed and connection should be deleted.
117 """
118 def __init__(self, new_socket, wake_up):
119 self.socket = new_socket
120 self.socket.setblocking(False)
121 self.status = WAIT_LEN
122 self.len = 0
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900123 self.received = deque()
124 self._reading = Message(0, 4, True)
125 self._rbuf = b''
126 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000127 self.lock = threading.Lock()
128 self.wake_up = wake_up
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900129 self.remaining = False
David Reiss74421272008-11-07 23:09:31 +0000130
131 @socket_exception
132 def read(self):
133 """Reads data from stream and switch state."""
134 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900135 assert not self.received
136 buf_size = 8192
137 first = True
138 done = False
139 while not done:
140 read = self.socket.recv(buf_size)
141 rlen = len(read)
142 done = rlen < buf_size
143 self._rbuf += read
144 if first and rlen == 0:
145 if self.status != WAIT_LEN or self._rbuf:
146 logger.error('could not read frame from socket')
147 else:
148 logger.debug('read zero length. client might have disconnected')
David Reiss74421272008-11-07 23:09:31 +0000149 self.close()
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900150 while len(self._rbuf) >= self._reading.end:
151 if self._reading.is_header:
152 mlen, = struct.unpack('!i', self._rbuf[:4])
153 self._reading = Message(self._reading.end, mlen, False)
154 self.status = WAIT_MESSAGE
155 else:
156 self._reading.buffer = self._rbuf
157 self.received.append(self._reading)
158 self._rbuf = self._rbuf[self._reading.end:]
159 self._reading = Message(0, 4, True)
160 first = False
161 if self.received:
David Reiss74421272008-11-07 23:09:31 +0000162 self.status = WAIT_PROCESS
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900163 break
164 self.remaining = not done
David Reiss74421272008-11-07 23:09:31 +0000165
166 @socket_exception
167 def write(self):
168 """Writes data from socket and switch state."""
169 assert self.status == SEND_ANSWER
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900170 sent = self.socket.send(self._wbuf)
171 if sent == len(self._wbuf):
David Reiss74421272008-11-07 23:09:31 +0000172 self.status = WAIT_LEN
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900173 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000174 self.len = 0
175 else:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900176 self._wbuf = self.message[sent:]
David Reiss74421272008-11-07 23:09:31 +0000177
178 @locked
179 def ready(self, all_ok, message):
180 """Callback function for switching state and waking up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000181
David Reiss74421272008-11-07 23:09:31 +0000182 This function is the only function witch can be called asynchronous.
Bryan Duxbury69720412012-01-03 17:32:30 +0000183
David Reiss74421272008-11-07 23:09:31 +0000184 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000185 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000186 SEND_ANSWER if request was processed in normal way.
187 CLOSED if request throws unexpected exception.
Bryan Duxbury69720412012-01-03 17:32:30 +0000188
David Reiss74421272008-11-07 23:09:31 +0000189 The one wakes up main thread.
190 """
191 assert self.status == WAIT_PROCESS
192 if not all_ok:
193 self.close()
194 self.wake_up()
195 return
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900196 self.len = 0
David Reiss74421272008-11-07 23:09:31 +0000197 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000198 # it was a oneway request, do not write answer
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900199 self._wbuf = b''
David Reiss74421272008-11-07 23:09:31 +0000200 self.status = WAIT_LEN
201 else:
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900202 self._wbuf = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000203 self.status = SEND_ANSWER
204 self.wake_up()
205
206 @locked
207 def is_writeable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000208 """Return True if connection should be added to write list of select"""
David Reiss74421272008-11-07 23:09:31 +0000209 return self.status == SEND_ANSWER
210
211 # it's not necessary, but...
212 @locked
213 def is_readable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000214 """Return True if connection should be added to read list of select"""
David Reiss74421272008-11-07 23:09:31 +0000215 return self.status in (WAIT_LEN, WAIT_MESSAGE)
216
217 @locked
218 def is_closed(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000219 """Returns True if connection is closed."""
David Reiss74421272008-11-07 23:09:31 +0000220 return self.status == CLOSED
221
222 def fileno(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000223 """Returns the file descriptor of the associated socket."""
David Reiss74421272008-11-07 23:09:31 +0000224 return self.socket.fileno()
225
226 def close(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000227 """Closes connection"""
David Reiss74421272008-11-07 23:09:31 +0000228 self.status = CLOSED
229 self.socket.close()
230
Bryan Duxbury69720412012-01-03 17:32:30 +0000231
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900232class TNonblockingServer(object):
David Reiss74421272008-11-07 23:09:31 +0000233 """Non-blocking server."""
Bryan Duxbury69720412012-01-03 17:32:30 +0000234
235 def __init__(self,
236 processor,
237 lsocket,
238 inputProtocolFactory=None,
239 outputProtocolFactory=None,
240 threads=10):
David Reiss74421272008-11-07 23:09:31 +0000241 self.processor = processor
242 self.socket = lsocket
243 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
244 self.out_protocol = outputProtocolFactory or self.in_protocol
245 self.threads = int(threads)
246 self.clients = {}
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900247 self.tasks = queue.Queue()
David Reiss74421272008-11-07 23:09:31 +0000248 self._read, self._write = socket.socketpair()
249 self.prepared = False
Roger Meiercfff8562012-04-13 14:24:55 +0000250 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000251
252 def setNumThreads(self, num):
253 """Set the number of worker threads that should be created."""
254 # implement ThreadPool interface
Bryan Duxbury69720412012-01-03 17:32:30 +0000255 assert not self.prepared, "Can't change number of threads after start"
David Reiss74421272008-11-07 23:09:31 +0000256 self.threads = num
257
258 def prepare(self):
259 """Prepares server for serve requests."""
Roger Meiercfff8562012-04-13 14:24:55 +0000260 if self.prepared:
261 return
David Reiss74421272008-11-07 23:09:31 +0000262 self.socket.listen()
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900263 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000264 thread = Worker(self.tasks)
265 thread.setDaemon(True)
266 thread.start()
267 self.prepared = True
268
269 def wake_up(self):
270 """Wake up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000271
Konrad Grochowski3b5dacb2014-11-24 10:55:31 +0100272 The server usually waits in select call in we should terminate one.
David Reiss74421272008-11-07 23:09:31 +0000273 The simplest way is using socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000274
David Reiss74421272008-11-07 23:09:31 +0000275 Select always wait to read from the first socket of socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000276
David Reiss74421272008-11-07 23:09:31 +0000277 In this case, we can just write anything to the second socket from
Bryan Duxbury69720412012-01-03 17:32:30 +0000278 socketpair.
279 """
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900280 self._write.send(b'1')
David Reiss74421272008-11-07 23:09:31 +0000281
Roger Meiercfff8562012-04-13 14:24:55 +0000282 def stop(self):
283 """Stop the server.
284
285 This method causes the serve() method to return. stop() may be invoked
286 from within your handler, or from another thread.
287
288 After stop() is called, serve() will return but the server will still
289 be listening on the socket. serve() may then be called again to resume
290 processing requests. Alternatively, close() may be called after
291 serve() returns to close the server socket and shutdown all worker
292 threads.
293 """
294 self._stop = True
295 self.wake_up()
296
David Reiss74421272008-11-07 23:09:31 +0000297 def _select(self):
298 """Does select on open connections."""
299 readable = [self.socket.handle.fileno(), self._read.fileno()]
300 writable = []
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900301 remaining = []
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900302 for i, connection in list(self.clients.items()):
David Reiss74421272008-11-07 23:09:31 +0000303 if connection.is_readable():
304 readable.append(connection.fileno())
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900305 if connection.remaining or connection.received:
306 remaining.append(connection.fileno())
David Reiss74421272008-11-07 23:09:31 +0000307 if connection.is_writeable():
308 writable.append(connection.fileno())
309 if connection.is_closed():
310 del self.clients[i]
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900311 if remaining:
312 return remaining, [], [], False
313 else:
314 return select.select(readable, writable, readable) + (True,)
Bryan Duxbury69720412012-01-03 17:32:30 +0000315
David Reiss74421272008-11-07 23:09:31 +0000316 def handle(self):
317 """Handle requests.
Bryan Duxbury69720412012-01-03 17:32:30 +0000318
319 WARNING! You must call prepare() BEFORE calling handle()
David Reiss74421272008-11-07 23:09:31 +0000320 """
321 assert self.prepared, "You have to call prepare before handle"
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900322 rset, wset, xset, selected = self._select()
David Reiss74421272008-11-07 23:09:31 +0000323 for readable in rset:
324 if readable == self._read.fileno():
325 # don't care i just need to clean readable flag
Bryan Duxbury69720412012-01-03 17:32:30 +0000326 self._read.recv(1024)
David Reiss74421272008-11-07 23:09:31 +0000327 elif readable == self.socket.handle.fileno():
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900328 try:
329 client = self.socket.accept()
330 if client:
331 self.clients[client.handle.fileno()] = Connection(client.handle,
332 self.wake_up)
333 except socket.error:
334 logger.debug('error while accepting', exc_info=True)
David Reiss74421272008-11-07 23:09:31 +0000335 else:
336 connection = self.clients[readable]
Nobuaki Sukegawa4626fd82017-02-12 21:11:36 +0900337 if selected:
338 connection.read()
339 if connection.received:
340 connection.status = WAIT_PROCESS
341 msg = connection.received.popleft()
342 itransport = TTransport.TMemoryBuffer(msg.buffer, msg.offset)
David Reiss74421272008-11-07 23:09:31 +0000343 otransport = TTransport.TMemoryBuffer()
344 iprot = self.in_protocol.getProtocol(itransport)
345 oprot = self.out_protocol.getProtocol(otransport)
Bryan Duxbury69720412012-01-03 17:32:30 +0000346 self.tasks.put([self.processor, iprot, oprot,
David Reiss74421272008-11-07 23:09:31 +0000347 otransport, connection.ready])
348 for writeable in wset:
349 self.clients[writeable].write()
350 for oob in xset:
351 self.clients[oob].close()
352 del self.clients[oob]
353
354 def close(self):
355 """Closes the server."""
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900356 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000357 self.tasks.put([None, None, None, None, None])
358 self.socket.close()
359 self.prepared = False
Bryan Duxbury69720412012-01-03 17:32:30 +0000360
David Reiss74421272008-11-07 23:09:31 +0000361 def serve(self):
Roger Meiercfff8562012-04-13 14:24:55 +0000362 """Serve requests.
363
364 Serve requests forever, or until stop() is called.
365 """
366 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000367 self.prepare()
Roger Meiercfff8562012-04-13 14:24:55 +0000368 while not self._stop:
David Reiss74421272008-11-07 23:09:31 +0000369 self.handle()