blob: 87031c13747ca9612d203889f6cd1b43b11bd652 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
Bryan Duxbury69720412012-01-03 17:32:30 +000021The main idea of the server is to receive and send requests
22only from the main thread.
David Reiss74421272008-11-07 23:09:31 +000023
Bryan Duxbury69720412012-01-03 17:32:30 +000024The thread poool should be sized for concurrent tasks, not
25maximum connections
David Reiss74421272008-11-07 23:09:31 +000026"""
Konrad Grochowski3a724e32014-08-12 11:48:29 -040027
David Reiss74421272008-11-07 23:09:31 +000028import logging
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090029import select
30import socket
31import struct
32import threading
David Reiss74421272008-11-07 23:09:31 +000033
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090034from six.moves import queue
35
David Reiss74421272008-11-07 23:09:31 +000036from thrift.transport import TTransport
37from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
38
39__all__ = ['TNonblockingServer']
40
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +090041logger = logging.getLogger(__name__)
42
Bryan Duxbury69720412012-01-03 17:32:30 +000043
David Reiss74421272008-11-07 23:09:31 +000044class Worker(threading.Thread):
45 """Worker is a small helper to process incoming connection."""
Bryan Duxbury69720412012-01-03 17:32:30 +000046
David Reiss74421272008-11-07 23:09:31 +000047 def __init__(self, queue):
48 threading.Thread.__init__(self)
49 self.queue = queue
50
51 def run(self):
52 """Process queries from task queue, stop if processor is None."""
53 while True:
54 try:
55 processor, iprot, oprot, otrans, callback = self.queue.get()
56 if processor is None:
57 break
58 processor.process(iprot, oprot)
59 callback(True, otrans.getvalue())
60 except Exception:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040061 logger.exception("Exception while processing request")
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090062 callback(False, b'')
David Reiss74421272008-11-07 23:09:31 +000063
64WAIT_LEN = 0
65WAIT_MESSAGE = 1
66WAIT_PROCESS = 2
67SEND_ANSWER = 3
68CLOSED = 4
69
Bryan Duxbury69720412012-01-03 17:32:30 +000070
David Reiss74421272008-11-07 23:09:31 +000071def locked(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000072 """Decorator which locks self.lock."""
David Reiss74421272008-11-07 23:09:31 +000073 def nested(self, *args, **kwargs):
74 self.lock.acquire()
75 try:
76 return func(self, *args, **kwargs)
77 finally:
78 self.lock.release()
79 return nested
80
Bryan Duxbury69720412012-01-03 17:32:30 +000081
David Reiss74421272008-11-07 23:09:31 +000082def socket_exception(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000083 """Decorator close object on socket.error."""
David Reiss74421272008-11-07 23:09:31 +000084 def read(self, *args, **kwargs):
85 try:
86 return func(self, *args, **kwargs)
87 except socket.error:
88 self.close()
89 return read
90
Bryan Duxbury69720412012-01-03 17:32:30 +000091
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +090092class Connection(object):
David Reiss74421272008-11-07 23:09:31 +000093 """Basic class is represented connection.
Bryan Duxbury69720412012-01-03 17:32:30 +000094
David Reiss74421272008-11-07 23:09:31 +000095 It can be in state:
96 WAIT_LEN --- connection is reading request len.
97 WAIT_MESSAGE --- connection is reading request.
Bryan Duxbury69720412012-01-03 17:32:30 +000098 WAIT_PROCESS --- connection has just read whole request and
99 waits for call ready routine.
David Reiss74421272008-11-07 23:09:31 +0000100 SEND_ANSWER --- connection is sending answer string (including length
Bryan Duxbury69720412012-01-03 17:32:30 +0000101 of answer).
David Reiss74421272008-11-07 23:09:31 +0000102 CLOSED --- socket was closed and connection should be deleted.
103 """
104 def __init__(self, new_socket, wake_up):
105 self.socket = new_socket
106 self.socket.setblocking(False)
107 self.status = WAIT_LEN
108 self.len = 0
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900109 self.message = b''
David Reiss74421272008-11-07 23:09:31 +0000110 self.lock = threading.Lock()
111 self.wake_up = wake_up
112
113 def _read_len(self):
114 """Reads length of request.
Bryan Duxbury69720412012-01-03 17:32:30 +0000115
116 It's a safer alternative to self.socket.recv(4)
117 """
David Reiss74421272008-11-07 23:09:31 +0000118 read = self.socket.recv(4 - len(self.message))
119 if len(read) == 0:
Bryan Duxbury69720412012-01-03 17:32:30 +0000120 # if we read 0 bytes and self.message is empty, then
121 # the client closed the connection
David Reiss74421272008-11-07 23:09:31 +0000122 if len(self.message) != 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400123 logger.error("can't read frame size from socket")
David Reiss74421272008-11-07 23:09:31 +0000124 self.close()
125 return
126 self.message += read
127 if len(self.message) == 4:
128 self.len, = struct.unpack('!i', self.message)
129 if self.len < 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400130 logger.error("negative frame size, it seems client "
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900131 "doesn't use FramedTransport")
David Reiss74421272008-11-07 23:09:31 +0000132 self.close()
133 elif self.len == 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400134 logger.error("empty frame, it's really strange")
David Reiss74421272008-11-07 23:09:31 +0000135 self.close()
136 else:
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900137 self.message = b''
David Reiss74421272008-11-07 23:09:31 +0000138 self.status = WAIT_MESSAGE
139
140 @socket_exception
141 def read(self):
142 """Reads data from stream and switch state."""
143 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
144 if self.status == WAIT_LEN:
145 self._read_len()
146 # go back to the main loop here for simplicity instead of
147 # falling through, even though there is a good chance that
148 # the message is already available
149 elif self.status == WAIT_MESSAGE:
150 read = self.socket.recv(self.len - len(self.message))
151 if len(read) == 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400152 logger.error("can't read frame from socket (get %d of "
Nobuaki Sukegawa10308cb2016-02-03 01:57:03 +0900153 "%d bytes)" % (len(self.message), self.len))
David Reiss74421272008-11-07 23:09:31 +0000154 self.close()
155 return
156 self.message += read
157 if len(self.message) == self.len:
158 self.status = WAIT_PROCESS
159
160 @socket_exception
161 def write(self):
162 """Writes data from socket and switch state."""
163 assert self.status == SEND_ANSWER
164 sent = self.socket.send(self.message)
165 if sent == len(self.message):
166 self.status = WAIT_LEN
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900167 self.message = b''
David Reiss74421272008-11-07 23:09:31 +0000168 self.len = 0
169 else:
170 self.message = self.message[sent:]
171
172 @locked
173 def ready(self, all_ok, message):
174 """Callback function for switching state and waking up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000175
David Reiss74421272008-11-07 23:09:31 +0000176 This function is the only function witch can be called asynchronous.
Bryan Duxbury69720412012-01-03 17:32:30 +0000177
David Reiss74421272008-11-07 23:09:31 +0000178 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000179 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000180 SEND_ANSWER if request was processed in normal way.
181 CLOSED if request throws unexpected exception.
Bryan Duxbury69720412012-01-03 17:32:30 +0000182
David Reiss74421272008-11-07 23:09:31 +0000183 The one wakes up main thread.
184 """
185 assert self.status == WAIT_PROCESS
186 if not all_ok:
187 self.close()
188 self.wake_up()
189 return
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900190 self.len = 0
David Reiss74421272008-11-07 23:09:31 +0000191 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000192 # it was a oneway request, do not write answer
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900193 self.message = b''
David Reiss74421272008-11-07 23:09:31 +0000194 self.status = WAIT_LEN
195 else:
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000196 self.message = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000197 self.status = SEND_ANSWER
198 self.wake_up()
199
200 @locked
201 def is_writeable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000202 """Return True if connection should be added to write list of select"""
David Reiss74421272008-11-07 23:09:31 +0000203 return self.status == SEND_ANSWER
204
205 # it's not necessary, but...
206 @locked
207 def is_readable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000208 """Return True if connection should be added to read list of select"""
David Reiss74421272008-11-07 23:09:31 +0000209 return self.status in (WAIT_LEN, WAIT_MESSAGE)
210
211 @locked
212 def is_closed(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000213 """Returns True if connection is closed."""
David Reiss74421272008-11-07 23:09:31 +0000214 return self.status == CLOSED
215
216 def fileno(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000217 """Returns the file descriptor of the associated socket."""
David Reiss74421272008-11-07 23:09:31 +0000218 return self.socket.fileno()
219
220 def close(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000221 """Closes connection"""
David Reiss74421272008-11-07 23:09:31 +0000222 self.status = CLOSED
223 self.socket.close()
224
Bryan Duxbury69720412012-01-03 17:32:30 +0000225
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900226class TNonblockingServer(object):
David Reiss74421272008-11-07 23:09:31 +0000227 """Non-blocking server."""
Bryan Duxbury69720412012-01-03 17:32:30 +0000228
229 def __init__(self,
230 processor,
231 lsocket,
232 inputProtocolFactory=None,
233 outputProtocolFactory=None,
234 threads=10):
David Reiss74421272008-11-07 23:09:31 +0000235 self.processor = processor
236 self.socket = lsocket
237 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
238 self.out_protocol = outputProtocolFactory or self.in_protocol
239 self.threads = int(threads)
240 self.clients = {}
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900241 self.tasks = queue.Queue()
David Reiss74421272008-11-07 23:09:31 +0000242 self._read, self._write = socket.socketpair()
243 self.prepared = False
Roger Meiercfff8562012-04-13 14:24:55 +0000244 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000245
246 def setNumThreads(self, num):
247 """Set the number of worker threads that should be created."""
248 # implement ThreadPool interface
Bryan Duxbury69720412012-01-03 17:32:30 +0000249 assert not self.prepared, "Can't change number of threads after start"
David Reiss74421272008-11-07 23:09:31 +0000250 self.threads = num
251
252 def prepare(self):
253 """Prepares server for serve requests."""
Roger Meiercfff8562012-04-13 14:24:55 +0000254 if self.prepared:
255 return
David Reiss74421272008-11-07 23:09:31 +0000256 self.socket.listen()
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900257 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000258 thread = Worker(self.tasks)
259 thread.setDaemon(True)
260 thread.start()
261 self.prepared = True
262
263 def wake_up(self):
264 """Wake up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000265
Konrad Grochowski3b5dacb2014-11-24 10:55:31 +0100266 The server usually waits in select call in we should terminate one.
David Reiss74421272008-11-07 23:09:31 +0000267 The simplest way is using socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000268
David Reiss74421272008-11-07 23:09:31 +0000269 Select always wait to read from the first socket of socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000270
David Reiss74421272008-11-07 23:09:31 +0000271 In this case, we can just write anything to the second socket from
Bryan Duxbury69720412012-01-03 17:32:30 +0000272 socketpair.
273 """
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900274 self._write.send(b'1')
David Reiss74421272008-11-07 23:09:31 +0000275
Roger Meiercfff8562012-04-13 14:24:55 +0000276 def stop(self):
277 """Stop the server.
278
279 This method causes the serve() method to return. stop() may be invoked
280 from within your handler, or from another thread.
281
282 After stop() is called, serve() will return but the server will still
283 be listening on the socket. serve() may then be called again to resume
284 processing requests. Alternatively, close() may be called after
285 serve() returns to close the server socket and shutdown all worker
286 threads.
287 """
288 self._stop = True
289 self.wake_up()
290
David Reiss74421272008-11-07 23:09:31 +0000291 def _select(self):
292 """Does select on open connections."""
293 readable = [self.socket.handle.fileno(), self._read.fileno()]
294 writable = []
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900295 for i, connection in list(self.clients.items()):
David Reiss74421272008-11-07 23:09:31 +0000296 if connection.is_readable():
297 readable.append(connection.fileno())
298 if connection.is_writeable():
299 writable.append(connection.fileno())
300 if connection.is_closed():
301 del self.clients[i]
302 return select.select(readable, writable, readable)
Bryan Duxbury69720412012-01-03 17:32:30 +0000303
David Reiss74421272008-11-07 23:09:31 +0000304 def handle(self):
305 """Handle requests.
Bryan Duxbury69720412012-01-03 17:32:30 +0000306
307 WARNING! You must call prepare() BEFORE calling handle()
David Reiss74421272008-11-07 23:09:31 +0000308 """
309 assert self.prepared, "You have to call prepare before handle"
310 rset, wset, xset = self._select()
311 for readable in rset:
312 if readable == self._read.fileno():
313 # don't care i just need to clean readable flag
Bryan Duxbury69720412012-01-03 17:32:30 +0000314 self._read.recv(1024)
David Reiss74421272008-11-07 23:09:31 +0000315 elif readable == self.socket.handle.fileno():
316 client = self.socket.accept().handle
Bryan Duxbury69720412012-01-03 17:32:30 +0000317 self.clients[client.fileno()] = Connection(client,
318 self.wake_up)
David Reiss74421272008-11-07 23:09:31 +0000319 else:
320 connection = self.clients[readable]
321 connection.read()
322 if connection.status == WAIT_PROCESS:
323 itransport = TTransport.TMemoryBuffer(connection.message)
324 otransport = TTransport.TMemoryBuffer()
325 iprot = self.in_protocol.getProtocol(itransport)
326 oprot = self.out_protocol.getProtocol(otransport)
Bryan Duxbury69720412012-01-03 17:32:30 +0000327 self.tasks.put([self.processor, iprot, oprot,
David Reiss74421272008-11-07 23:09:31 +0000328 otransport, connection.ready])
329 for writeable in wset:
330 self.clients[writeable].write()
331 for oob in xset:
332 self.clients[oob].close()
333 del self.clients[oob]
334
335 def close(self):
336 """Closes the server."""
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900337 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000338 self.tasks.put([None, None, None, None, None])
339 self.socket.close()
340 self.prepared = False
Bryan Duxbury69720412012-01-03 17:32:30 +0000341
David Reiss74421272008-11-07 23:09:31 +0000342 def serve(self):
Roger Meiercfff8562012-04-13 14:24:55 +0000343 """Serve requests.
344
345 Serve requests forever, or until stop() is called.
346 """
347 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000348 self.prepare()
Roger Meiercfff8562012-04-13 14:24:55 +0000349 while not self._stop:
David Reiss74421272008-11-07 23:09:31 +0000350 self.handle()