blob: 4a035b6f8090773d5a46d582da52fd4c42b746e0 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
Bryan Duxbury69720412012-01-03 17:32:30 +000021The main idea of the server is to receive and send requests
22only from the main thread.
David Reiss74421272008-11-07 23:09:31 +000023
Bryan Duxbury69720412012-01-03 17:32:30 +000024The thread poool should be sized for concurrent tasks, not
25maximum connections
David Reiss74421272008-11-07 23:09:31 +000026"""
27import threading
28import socket
29import Queue
30import select
31import struct
Konrad Grochowski3a724e32014-08-12 11:48:29 -040032
David Reiss74421272008-11-07 23:09:31 +000033import logging
Konrad Grochowski3a724e32014-08-12 11:48:29 -040034logger = logging.getLogger(__name__)
David Reiss74421272008-11-07 23:09:31 +000035
36from thrift.transport import TTransport
37from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
38
39__all__ = ['TNonblockingServer']
40
Bryan Duxbury69720412012-01-03 17:32:30 +000041
David Reiss74421272008-11-07 23:09:31 +000042class Worker(threading.Thread):
43 """Worker is a small helper to process incoming connection."""
Bryan Duxbury69720412012-01-03 17:32:30 +000044
David Reiss74421272008-11-07 23:09:31 +000045 def __init__(self, queue):
46 threading.Thread.__init__(self)
47 self.queue = queue
48
49 def run(self):
50 """Process queries from task queue, stop if processor is None."""
51 while True:
52 try:
53 processor, iprot, oprot, otrans, callback = self.queue.get()
54 if processor is None:
55 break
56 processor.process(iprot, oprot)
57 callback(True, otrans.getvalue())
58 except Exception:
Konrad Grochowski3a724e32014-08-12 11:48:29 -040059 logger.exception("Exception while processing request")
David Reiss74421272008-11-07 23:09:31 +000060 callback(False, '')
61
62WAIT_LEN = 0
63WAIT_MESSAGE = 1
64WAIT_PROCESS = 2
65SEND_ANSWER = 3
66CLOSED = 4
67
Bryan Duxbury69720412012-01-03 17:32:30 +000068
David Reiss74421272008-11-07 23:09:31 +000069def locked(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000070 """Decorator which locks self.lock."""
David Reiss74421272008-11-07 23:09:31 +000071 def nested(self, *args, **kwargs):
72 self.lock.acquire()
73 try:
74 return func(self, *args, **kwargs)
75 finally:
76 self.lock.release()
77 return nested
78
Bryan Duxbury69720412012-01-03 17:32:30 +000079
David Reiss74421272008-11-07 23:09:31 +000080def socket_exception(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000081 """Decorator close object on socket.error."""
David Reiss74421272008-11-07 23:09:31 +000082 def read(self, *args, **kwargs):
83 try:
84 return func(self, *args, **kwargs)
85 except socket.error:
86 self.close()
87 return read
88
Bryan Duxbury69720412012-01-03 17:32:30 +000089
David Reiss74421272008-11-07 23:09:31 +000090class Connection:
91 """Basic class is represented connection.
Bryan Duxbury69720412012-01-03 17:32:30 +000092
David Reiss74421272008-11-07 23:09:31 +000093 It can be in state:
94 WAIT_LEN --- connection is reading request len.
95 WAIT_MESSAGE --- connection is reading request.
Bryan Duxbury69720412012-01-03 17:32:30 +000096 WAIT_PROCESS --- connection has just read whole request and
97 waits for call ready routine.
David Reiss74421272008-11-07 23:09:31 +000098 SEND_ANSWER --- connection is sending answer string (including length
Bryan Duxbury69720412012-01-03 17:32:30 +000099 of answer).
David Reiss74421272008-11-07 23:09:31 +0000100 CLOSED --- socket was closed and connection should be deleted.
101 """
102 def __init__(self, new_socket, wake_up):
103 self.socket = new_socket
104 self.socket.setblocking(False)
105 self.status = WAIT_LEN
106 self.len = 0
107 self.message = ''
108 self.lock = threading.Lock()
109 self.wake_up = wake_up
110
111 def _read_len(self):
112 """Reads length of request.
Bryan Duxbury69720412012-01-03 17:32:30 +0000113
114 It's a safer alternative to self.socket.recv(4)
115 """
David Reiss74421272008-11-07 23:09:31 +0000116 read = self.socket.recv(4 - len(self.message))
117 if len(read) == 0:
Bryan Duxbury69720412012-01-03 17:32:30 +0000118 # if we read 0 bytes and self.message is empty, then
119 # the client closed the connection
David Reiss74421272008-11-07 23:09:31 +0000120 if len(self.message) != 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400121 logger.error("can't read frame size from socket")
David Reiss74421272008-11-07 23:09:31 +0000122 self.close()
123 return
124 self.message += read
125 if len(self.message) == 4:
126 self.len, = struct.unpack('!i', self.message)
127 if self.len < 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400128 logger.error("negative frame size, it seems client "
Bryan Duxbury69720412012-01-03 17:32:30 +0000129 "doesn't use FramedTransport")
David Reiss74421272008-11-07 23:09:31 +0000130 self.close()
131 elif self.len == 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400132 logger.error("empty frame, it's really strange")
David Reiss74421272008-11-07 23:09:31 +0000133 self.close()
134 else:
135 self.message = ''
136 self.status = WAIT_MESSAGE
137
138 @socket_exception
139 def read(self):
140 """Reads data from stream and switch state."""
141 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
142 if self.status == WAIT_LEN:
143 self._read_len()
144 # go back to the main loop here for simplicity instead of
145 # falling through, even though there is a good chance that
146 # the message is already available
147 elif self.status == WAIT_MESSAGE:
148 read = self.socket.recv(self.len - len(self.message))
149 if len(read) == 0:
Konrad Grochowski3a724e32014-08-12 11:48:29 -0400150 logger.error("can't read frame from socket (get %d of "
Bryan Duxbury69720412012-01-03 17:32:30 +0000151 "%d bytes)" % (len(self.message), self.len))
David Reiss74421272008-11-07 23:09:31 +0000152 self.close()
153 return
154 self.message += read
155 if len(self.message) == self.len:
156 self.status = WAIT_PROCESS
157
158 @socket_exception
159 def write(self):
160 """Writes data from socket and switch state."""
161 assert self.status == SEND_ANSWER
162 sent = self.socket.send(self.message)
163 if sent == len(self.message):
164 self.status = WAIT_LEN
165 self.message = ''
166 self.len = 0
167 else:
168 self.message = self.message[sent:]
169
170 @locked
171 def ready(self, all_ok, message):
172 """Callback function for switching state and waking up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000173
David Reiss74421272008-11-07 23:09:31 +0000174 This function is the only function witch can be called asynchronous.
Bryan Duxbury69720412012-01-03 17:32:30 +0000175
David Reiss74421272008-11-07 23:09:31 +0000176 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000177 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000178 SEND_ANSWER if request was processed in normal way.
179 CLOSED if request throws unexpected exception.
Bryan Duxbury69720412012-01-03 17:32:30 +0000180
David Reiss74421272008-11-07 23:09:31 +0000181 The one wakes up main thread.
182 """
183 assert self.status == WAIT_PROCESS
184 if not all_ok:
185 self.close()
186 self.wake_up()
187 return
188 self.len = ''
David Reiss74421272008-11-07 23:09:31 +0000189 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000190 # it was a oneway request, do not write answer
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000191 self.message = ''
David Reiss74421272008-11-07 23:09:31 +0000192 self.status = WAIT_LEN
193 else:
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000194 self.message = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000195 self.status = SEND_ANSWER
196 self.wake_up()
197
198 @locked
199 def is_writeable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000200 """Return True if connection should be added to write list of select"""
David Reiss74421272008-11-07 23:09:31 +0000201 return self.status == SEND_ANSWER
202
203 # it's not necessary, but...
204 @locked
205 def is_readable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000206 """Return True if connection should be added to read list of select"""
David Reiss74421272008-11-07 23:09:31 +0000207 return self.status in (WAIT_LEN, WAIT_MESSAGE)
208
209 @locked
210 def is_closed(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000211 """Returns True if connection is closed."""
David Reiss74421272008-11-07 23:09:31 +0000212 return self.status == CLOSED
213
214 def fileno(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000215 """Returns the file descriptor of the associated socket."""
David Reiss74421272008-11-07 23:09:31 +0000216 return self.socket.fileno()
217
218 def close(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000219 """Closes connection"""
David Reiss74421272008-11-07 23:09:31 +0000220 self.status = CLOSED
221 self.socket.close()
222
Bryan Duxbury69720412012-01-03 17:32:30 +0000223
David Reiss74421272008-11-07 23:09:31 +0000224class TNonblockingServer:
225 """Non-blocking server."""
Bryan Duxbury69720412012-01-03 17:32:30 +0000226
227 def __init__(self,
228 processor,
229 lsocket,
230 inputProtocolFactory=None,
231 outputProtocolFactory=None,
232 threads=10):
David Reiss74421272008-11-07 23:09:31 +0000233 self.processor = processor
234 self.socket = lsocket
235 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
236 self.out_protocol = outputProtocolFactory or self.in_protocol
237 self.threads = int(threads)
238 self.clients = {}
239 self.tasks = Queue.Queue()
240 self._read, self._write = socket.socketpair()
241 self.prepared = False
Roger Meiercfff8562012-04-13 14:24:55 +0000242 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000243
244 def setNumThreads(self, num):
245 """Set the number of worker threads that should be created."""
246 # implement ThreadPool interface
Bryan Duxbury69720412012-01-03 17:32:30 +0000247 assert not self.prepared, "Can't change number of threads after start"
David Reiss74421272008-11-07 23:09:31 +0000248 self.threads = num
249
250 def prepare(self):
251 """Prepares server for serve requests."""
Roger Meiercfff8562012-04-13 14:24:55 +0000252 if self.prepared:
253 return
David Reiss74421272008-11-07 23:09:31 +0000254 self.socket.listen()
255 for _ in xrange(self.threads):
256 thread = Worker(self.tasks)
257 thread.setDaemon(True)
258 thread.start()
259 self.prepared = True
260
261 def wake_up(self):
262 """Wake up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000263
David Reiss74421272008-11-07 23:09:31 +0000264 The server usualy waits in select call in we should terminate one.
265 The simplest way is using socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000266
David Reiss74421272008-11-07 23:09:31 +0000267 Select always wait to read from the first socket of socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000268
David Reiss74421272008-11-07 23:09:31 +0000269 In this case, we can just write anything to the second socket from
Bryan Duxbury69720412012-01-03 17:32:30 +0000270 socketpair.
271 """
David Reiss74421272008-11-07 23:09:31 +0000272 self._write.send('1')
273
Roger Meiercfff8562012-04-13 14:24:55 +0000274 def stop(self):
275 """Stop the server.
276
277 This method causes the serve() method to return. stop() may be invoked
278 from within your handler, or from another thread.
279
280 After stop() is called, serve() will return but the server will still
281 be listening on the socket. serve() may then be called again to resume
282 processing requests. Alternatively, close() may be called after
283 serve() returns to close the server socket and shutdown all worker
284 threads.
285 """
286 self._stop = True
287 self.wake_up()
288
David Reiss74421272008-11-07 23:09:31 +0000289 def _select(self):
290 """Does select on open connections."""
291 readable = [self.socket.handle.fileno(), self._read.fileno()]
292 writable = []
293 for i, connection in self.clients.items():
294 if connection.is_readable():
295 readable.append(connection.fileno())
296 if connection.is_writeable():
297 writable.append(connection.fileno())
298 if connection.is_closed():
299 del self.clients[i]
300 return select.select(readable, writable, readable)
Bryan Duxbury69720412012-01-03 17:32:30 +0000301
David Reiss74421272008-11-07 23:09:31 +0000302 def handle(self):
303 """Handle requests.
Bryan Duxbury69720412012-01-03 17:32:30 +0000304
305 WARNING! You must call prepare() BEFORE calling handle()
David Reiss74421272008-11-07 23:09:31 +0000306 """
307 assert self.prepared, "You have to call prepare before handle"
308 rset, wset, xset = self._select()
309 for readable in rset:
310 if readable == self._read.fileno():
311 # don't care i just need to clean readable flag
Bryan Duxbury69720412012-01-03 17:32:30 +0000312 self._read.recv(1024)
David Reiss74421272008-11-07 23:09:31 +0000313 elif readable == self.socket.handle.fileno():
314 client = self.socket.accept().handle
Bryan Duxbury69720412012-01-03 17:32:30 +0000315 self.clients[client.fileno()] = Connection(client,
316 self.wake_up)
David Reiss74421272008-11-07 23:09:31 +0000317 else:
318 connection = self.clients[readable]
319 connection.read()
320 if connection.status == WAIT_PROCESS:
321 itransport = TTransport.TMemoryBuffer(connection.message)
322 otransport = TTransport.TMemoryBuffer()
323 iprot = self.in_protocol.getProtocol(itransport)
324 oprot = self.out_protocol.getProtocol(otransport)
Bryan Duxbury69720412012-01-03 17:32:30 +0000325 self.tasks.put([self.processor, iprot, oprot,
David Reiss74421272008-11-07 23:09:31 +0000326 otransport, connection.ready])
327 for writeable in wset:
328 self.clients[writeable].write()
329 for oob in xset:
330 self.clients[oob].close()
331 del self.clients[oob]
332
333 def close(self):
334 """Closes the server."""
335 for _ in xrange(self.threads):
336 self.tasks.put([None, None, None, None, None])
337 self.socket.close()
338 self.prepared = False
Bryan Duxbury69720412012-01-03 17:32:30 +0000339
David Reiss74421272008-11-07 23:09:31 +0000340 def serve(self):
Roger Meiercfff8562012-04-13 14:24:55 +0000341 """Serve requests.
342
343 Serve requests forever, or until stop() is called.
344 """
345 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000346 self.prepare()
Roger Meiercfff8562012-04-13 14:24:55 +0000347 while not self._stop:
David Reiss74421272008-11-07 23:09:31 +0000348 self.handle()