blob: a930a80914d724569bcb3dcc3697a39064d80afe [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
Bryan Duxbury69720412012-01-03 17:32:30 +000021The main idea of the server is to receive and send requests
22only from the main thread.
David Reiss74421272008-11-07 23:09:31 +000023
Bryan Duxbury69720412012-01-03 17:32:30 +000024The thread poool should be sized for concurrent tasks, not
25maximum connections
David Reiss74421272008-11-07 23:09:31 +000026"""
27import threading
28import socket
David Reiss74421272008-11-07 23:09:31 +000029import select
30import struct
Konrad Grochowski3a724e32014-08-12 11:48:29 -040031
David Reiss74421272008-11-07 23:09:31 +000032import logging
Konrad Grochowski3a724e32014-08-12 11:48:29 -040033logger = logging.getLogger(__name__)
David Reiss74421272008-11-07 23:09:31 +000034
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090035from six.moves import queue
36
David Reiss74421272008-11-07 23:09:31 +000037from thrift.transport import TTransport
38from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
39
40__all__ = ['TNonblockingServer']
41
Bryan Duxbury69720412012-01-03 17:32:30 +000042
David Reiss74421272008-11-07 23:09:31 +000043class Worker(threading.Thread):
44 """Worker is a small helper to process incoming connection."""
Bryan Duxbury69720412012-01-03 17:32:30 +000045
David Reiss74421272008-11-07 23:09:31 +000046 def __init__(self, queue):
47 threading.Thread.__init__(self)
48 self.queue = queue
49
50 def run(self):
51 """Process queries from task queue, stop if processor is None."""
52 while True:
53 try:
54 processor, iprot, oprot, otrans, callback = self.queue.get()
55 if processor is None:
56 break
57 processor.process(iprot, oprot)
58 callback(True, otrans.getvalue())
59 except Exception:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040060 logger.exception("Exception while processing request")
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +090061 callback(False, b'')
David Reiss74421272008-11-07 23:09:31 +000062
63WAIT_LEN = 0
64WAIT_MESSAGE = 1
65WAIT_PROCESS = 2
66SEND_ANSWER = 3
67CLOSED = 4
68
Bryan Duxbury69720412012-01-03 17:32:30 +000069
David Reiss74421272008-11-07 23:09:31 +000070def locked(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000071 """Decorator which locks self.lock."""
David Reiss74421272008-11-07 23:09:31 +000072 def nested(self, *args, **kwargs):
73 self.lock.acquire()
74 try:
75 return func(self, *args, **kwargs)
76 finally:
77 self.lock.release()
78 return nested
79
Bryan Duxbury69720412012-01-03 17:32:30 +000080
David Reiss74421272008-11-07 23:09:31 +000081def socket_exception(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000082 """Decorator close object on socket.error."""
David Reiss74421272008-11-07 23:09:31 +000083 def read(self, *args, **kwargs):
84 try:
85 return func(self, *args, **kwargs)
86 except socket.error:
87 self.close()
88 return read
89
Bryan Duxbury69720412012-01-03 17:32:30 +000090
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +090091class Connection(object):
David Reiss74421272008-11-07 23:09:31 +000092 """Basic class is represented connection.
Bryan Duxbury69720412012-01-03 17:32:30 +000093
David Reiss74421272008-11-07 23:09:31 +000094 It can be in state:
95 WAIT_LEN --- connection is reading request len.
96 WAIT_MESSAGE --- connection is reading request.
Bryan Duxbury69720412012-01-03 17:32:30 +000097 WAIT_PROCESS --- connection has just read whole request and
98 waits for call ready routine.
David Reiss74421272008-11-07 23:09:31 +000099 SEND_ANSWER --- connection is sending answer string (including length
Bryan Duxbury69720412012-01-03 17:32:30 +0000100 of answer).
David Reiss74421272008-11-07 23:09:31 +0000101 CLOSED --- socket was closed and connection should be deleted.
102 """
103 def __init__(self, new_socket, wake_up):
104 self.socket = new_socket
105 self.socket.setblocking(False)
106 self.status = WAIT_LEN
107 self.len = 0
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900108 self.message = b''
David Reiss74421272008-11-07 23:09:31 +0000109 self.lock = threading.Lock()
110 self.wake_up = wake_up
111
112 def _read_len(self):
113 """Reads length of request.
Bryan Duxbury69720412012-01-03 17:32:30 +0000114
115 It's a safer alternative to self.socket.recv(4)
116 """
David Reiss74421272008-11-07 23:09:31 +0000117 read = self.socket.recv(4 - len(self.message))
118 if len(read) == 0:
Bryan Duxbury69720412012-01-03 17:32:30 +0000119 # if we read 0 bytes and self.message is empty, then
120 # the client closed the connection
David Reiss74421272008-11-07 23:09:31 +0000121 if len(self.message) != 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400122 logger.error("can't read frame size from socket")
David Reiss74421272008-11-07 23:09:31 +0000123 self.close()
124 return
125 self.message += read
126 if len(self.message) == 4:
127 self.len, = struct.unpack('!i', self.message)
128 if self.len < 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400129 logger.error("negative frame size, it seems client "
Bryan Duxbury69720412012-01-03 17:32:30 +0000130 "doesn't use FramedTransport")
David Reiss74421272008-11-07 23:09:31 +0000131 self.close()
132 elif self.len == 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400133 logger.error("empty frame, it's really strange")
David Reiss74421272008-11-07 23:09:31 +0000134 self.close()
135 else:
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900136 self.message = b''
David Reiss74421272008-11-07 23:09:31 +0000137 self.status = WAIT_MESSAGE
138
139 @socket_exception
140 def read(self):
141 """Reads data from stream and switch state."""
142 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
143 if self.status == WAIT_LEN:
144 self._read_len()
145 # go back to the main loop here for simplicity instead of
146 # falling through, even though there is a good chance that
147 # the message is already available
148 elif self.status == WAIT_MESSAGE:
149 read = self.socket.recv(self.len - len(self.message))
150 if len(read) == 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400151 logger.error("can't read frame from socket (get %d of "
Bryan Duxbury69720412012-01-03 17:32:30 +0000152 "%d bytes)" % (len(self.message), self.len))
David Reiss74421272008-11-07 23:09:31 +0000153 self.close()
154 return
155 self.message += read
156 if len(self.message) == self.len:
157 self.status = WAIT_PROCESS
158
159 @socket_exception
160 def write(self):
161 """Writes data from socket and switch state."""
162 assert self.status == SEND_ANSWER
163 sent = self.socket.send(self.message)
164 if sent == len(self.message):
165 self.status = WAIT_LEN
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900166 self.message = b''
David Reiss74421272008-11-07 23:09:31 +0000167 self.len = 0
168 else:
169 self.message = self.message[sent:]
170
171 @locked
172 def ready(self, all_ok, message):
173 """Callback function for switching state and waking up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000174
David Reiss74421272008-11-07 23:09:31 +0000175 This function is the only function witch can be called asynchronous.
Bryan Duxbury69720412012-01-03 17:32:30 +0000176
David Reiss74421272008-11-07 23:09:31 +0000177 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000178 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000179 SEND_ANSWER if request was processed in normal way.
180 CLOSED if request throws unexpected exception.
Bryan Duxbury69720412012-01-03 17:32:30 +0000181
David Reiss74421272008-11-07 23:09:31 +0000182 The one wakes up main thread.
183 """
184 assert self.status == WAIT_PROCESS
185 if not all_ok:
186 self.close()
187 self.wake_up()
188 return
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900189 self.len = 0
David Reiss74421272008-11-07 23:09:31 +0000190 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000191 # it was a oneway request, do not write answer
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900192 self.message = b''
David Reiss74421272008-11-07 23:09:31 +0000193 self.status = WAIT_LEN
194 else:
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000195 self.message = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000196 self.status = SEND_ANSWER
197 self.wake_up()
198
199 @locked
200 def is_writeable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000201 """Return True if connection should be added to write list of select"""
David Reiss74421272008-11-07 23:09:31 +0000202 return self.status == SEND_ANSWER
203
204 # it's not necessary, but...
205 @locked
206 def is_readable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000207 """Return True if connection should be added to read list of select"""
David Reiss74421272008-11-07 23:09:31 +0000208 return self.status in (WAIT_LEN, WAIT_MESSAGE)
209
210 @locked
211 def is_closed(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000212 """Returns True if connection is closed."""
David Reiss74421272008-11-07 23:09:31 +0000213 return self.status == CLOSED
214
215 def fileno(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000216 """Returns the file descriptor of the associated socket."""
David Reiss74421272008-11-07 23:09:31 +0000217 return self.socket.fileno()
218
219 def close(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000220 """Closes connection"""
David Reiss74421272008-11-07 23:09:31 +0000221 self.status = CLOSED
222 self.socket.close()
223
Bryan Duxbury69720412012-01-03 17:32:30 +0000224
Nobuaki Sukegawab9c859a2015-12-21 01:10:25 +0900225class TNonblockingServer(object):
David Reiss74421272008-11-07 23:09:31 +0000226 """Non-blocking server."""
Bryan Duxbury69720412012-01-03 17:32:30 +0000227
228 def __init__(self,
229 processor,
230 lsocket,
231 inputProtocolFactory=None,
232 outputProtocolFactory=None,
233 threads=10):
David Reiss74421272008-11-07 23:09:31 +0000234 self.processor = processor
235 self.socket = lsocket
236 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
237 self.out_protocol = outputProtocolFactory or self.in_protocol
238 self.threads = int(threads)
239 self.clients = {}
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900240 self.tasks = queue.Queue()
David Reiss74421272008-11-07 23:09:31 +0000241 self._read, self._write = socket.socketpair()
242 self.prepared = False
Roger Meiercfff8562012-04-13 14:24:55 +0000243 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000244
245 def setNumThreads(self, num):
246 """Set the number of worker threads that should be created."""
247 # implement ThreadPool interface
Bryan Duxbury69720412012-01-03 17:32:30 +0000248 assert not self.prepared, "Can't change number of threads after start"
David Reiss74421272008-11-07 23:09:31 +0000249 self.threads = num
250
251 def prepare(self):
252 """Prepares server for serve requests."""
Roger Meiercfff8562012-04-13 14:24:55 +0000253 if self.prepared:
254 return
David Reiss74421272008-11-07 23:09:31 +0000255 self.socket.listen()
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900256 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000257 thread = Worker(self.tasks)
258 thread.setDaemon(True)
259 thread.start()
260 self.prepared = True
261
262 def wake_up(self):
263 """Wake up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000264
Konrad Grochowski3b5dacb2014-11-24 10:55:31 +0100265 The server usually waits in select call in we should terminate one.
David Reiss74421272008-11-07 23:09:31 +0000266 The simplest way is using socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000267
David Reiss74421272008-11-07 23:09:31 +0000268 Select always wait to read from the first socket of socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000269
David Reiss74421272008-11-07 23:09:31 +0000270 In this case, we can just write anything to the second socket from
Bryan Duxbury69720412012-01-03 17:32:30 +0000271 socketpair.
272 """
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900273 self._write.send(b'1')
David Reiss74421272008-11-07 23:09:31 +0000274
Roger Meiercfff8562012-04-13 14:24:55 +0000275 def stop(self):
276 """Stop the server.
277
278 This method causes the serve() method to return. stop() may be invoked
279 from within your handler, or from another thread.
280
281 After stop() is called, serve() will return but the server will still
282 be listening on the socket. serve() may then be called again to resume
283 processing requests. Alternatively, close() may be called after
284 serve() returns to close the server socket and shutdown all worker
285 threads.
286 """
287 self._stop = True
288 self.wake_up()
289
David Reiss74421272008-11-07 23:09:31 +0000290 def _select(self):
291 """Does select on open connections."""
292 readable = [self.socket.handle.fileno(), self._read.fileno()]
293 writable = []
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900294 for i, connection in list(self.clients.items()):
David Reiss74421272008-11-07 23:09:31 +0000295 if connection.is_readable():
296 readable.append(connection.fileno())
297 if connection.is_writeable():
298 writable.append(connection.fileno())
299 if connection.is_closed():
300 del self.clients[i]
301 return select.select(readable, writable, readable)
Bryan Duxbury69720412012-01-03 17:32:30 +0000302
David Reiss74421272008-11-07 23:09:31 +0000303 def handle(self):
304 """Handle requests.
Bryan Duxbury69720412012-01-03 17:32:30 +0000305
306 WARNING! You must call prepare() BEFORE calling handle()
David Reiss74421272008-11-07 23:09:31 +0000307 """
308 assert self.prepared, "You have to call prepare before handle"
309 rset, wset, xset = self._select()
310 for readable in rset:
311 if readable == self._read.fileno():
312 # don't care i just need to clean readable flag
Bryan Duxbury69720412012-01-03 17:32:30 +0000313 self._read.recv(1024)
David Reiss74421272008-11-07 23:09:31 +0000314 elif readable == self.socket.handle.fileno():
315 client = self.socket.accept().handle
Bryan Duxbury69720412012-01-03 17:32:30 +0000316 self.clients[client.fileno()] = Connection(client,
317 self.wake_up)
David Reiss74421272008-11-07 23:09:31 +0000318 else:
319 connection = self.clients[readable]
320 connection.read()
321 if connection.status == WAIT_PROCESS:
322 itransport = TTransport.TMemoryBuffer(connection.message)
323 otransport = TTransport.TMemoryBuffer()
324 iprot = self.in_protocol.getProtocol(itransport)
325 oprot = self.out_protocol.getProtocol(otransport)
Bryan Duxbury69720412012-01-03 17:32:30 +0000326 self.tasks.put([self.processor, iprot, oprot,
David Reiss74421272008-11-07 23:09:31 +0000327 otransport, connection.ready])
328 for writeable in wset:
329 self.clients[writeable].write()
330 for oob in xset:
331 self.clients[oob].close()
332 del self.clients[oob]
333
334 def close(self):
335 """Closes the server."""
Nobuaki Sukegawa760511f2015-11-06 21:24:16 +0900336 for _ in range(self.threads):
David Reiss74421272008-11-07 23:09:31 +0000337 self.tasks.put([None, None, None, None, None])
338 self.socket.close()
339 self.prepared = False
Bryan Duxbury69720412012-01-03 17:32:30 +0000340
David Reiss74421272008-11-07 23:09:31 +0000341 def serve(self):
Roger Meiercfff8562012-04-13 14:24:55 +0000342 """Serve requests.
343
344 Serve requests forever, or until stop() is called.
345 """
346 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000347 self.prepare()
Roger Meiercfff8562012-04-13 14:24:55 +0000348 while not self._stop:
David Reiss74421272008-11-07 23:09:31 +0000349 self.handle()