blob: fa478d01f4d19a7843d4efa22dd0eceaa1200b8d [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
Bryan Duxbury69720412012-01-03 17:32:30 +000021The main idea of the server is to receive and send requests
22only from the main thread.
David Reiss74421272008-11-07 23:09:31 +000023
Bryan Duxbury69720412012-01-03 17:32:30 +000024The thread poool should be sized for concurrent tasks, not
25maximum connections
David Reiss74421272008-11-07 23:09:31 +000026"""
27import threading
28import socket
29import Queue
30import select
31import struct
32import logging
33
34from thrift.transport import TTransport
35from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
36
37__all__ = ['TNonblockingServer']
38
Bryan Duxbury69720412012-01-03 17:32:30 +000039
David Reiss74421272008-11-07 23:09:31 +000040class Worker(threading.Thread):
41 """Worker is a small helper to process incoming connection."""
Bryan Duxbury69720412012-01-03 17:32:30 +000042
David Reiss74421272008-11-07 23:09:31 +000043 def __init__(self, queue):
44 threading.Thread.__init__(self)
45 self.queue = queue
46
47 def run(self):
48 """Process queries from task queue, stop if processor is None."""
49 while True:
50 try:
51 processor, iprot, oprot, otrans, callback = self.queue.get()
52 if processor is None:
53 break
54 processor.process(iprot, oprot)
55 callback(True, otrans.getvalue())
56 except Exception:
57 logging.exception("Exception while processing request")
58 callback(False, '')
59
60WAIT_LEN = 0
61WAIT_MESSAGE = 1
62WAIT_PROCESS = 2
63SEND_ANSWER = 3
64CLOSED = 4
65
Bryan Duxbury69720412012-01-03 17:32:30 +000066
David Reiss74421272008-11-07 23:09:31 +000067def locked(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000068 """Decorator which locks self.lock."""
David Reiss74421272008-11-07 23:09:31 +000069 def nested(self, *args, **kwargs):
70 self.lock.acquire()
71 try:
72 return func(self, *args, **kwargs)
73 finally:
74 self.lock.release()
75 return nested
76
Bryan Duxbury69720412012-01-03 17:32:30 +000077
David Reiss74421272008-11-07 23:09:31 +000078def socket_exception(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000079 """Decorator close object on socket.error."""
David Reiss74421272008-11-07 23:09:31 +000080 def read(self, *args, **kwargs):
81 try:
82 return func(self, *args, **kwargs)
83 except socket.error:
84 self.close()
85 return read
86
Bryan Duxbury69720412012-01-03 17:32:30 +000087
David Reiss74421272008-11-07 23:09:31 +000088class Connection:
89 """Basic class is represented connection.
Bryan Duxbury69720412012-01-03 17:32:30 +000090
David Reiss74421272008-11-07 23:09:31 +000091 It can be in state:
92 WAIT_LEN --- connection is reading request len.
93 WAIT_MESSAGE --- connection is reading request.
Bryan Duxbury69720412012-01-03 17:32:30 +000094 WAIT_PROCESS --- connection has just read whole request and
95 waits for call ready routine.
David Reiss74421272008-11-07 23:09:31 +000096 SEND_ANSWER --- connection is sending answer string (including length
Bryan Duxbury69720412012-01-03 17:32:30 +000097 of answer).
David Reiss74421272008-11-07 23:09:31 +000098 CLOSED --- socket was closed and connection should be deleted.
99 """
100 def __init__(self, new_socket, wake_up):
101 self.socket = new_socket
102 self.socket.setblocking(False)
103 self.status = WAIT_LEN
104 self.len = 0
105 self.message = ''
106 self.lock = threading.Lock()
107 self.wake_up = wake_up
108
109 def _read_len(self):
110 """Reads length of request.
Bryan Duxbury69720412012-01-03 17:32:30 +0000111
112 It's a safer alternative to self.socket.recv(4)
113 """
David Reiss74421272008-11-07 23:09:31 +0000114 read = self.socket.recv(4 - len(self.message))
115 if len(read) == 0:
Bryan Duxbury69720412012-01-03 17:32:30 +0000116 # if we read 0 bytes and self.message is empty, then
117 # the client closed the connection
David Reiss74421272008-11-07 23:09:31 +0000118 if len(self.message) != 0:
119 logging.error("can't read frame size from socket")
120 self.close()
121 return
122 self.message += read
123 if len(self.message) == 4:
124 self.len, = struct.unpack('!i', self.message)
125 if self.len < 0:
Bryan Duxbury69720412012-01-03 17:32:30 +0000126 logging.error("negative frame size, it seems client "
127 "doesn't use FramedTransport")
David Reiss74421272008-11-07 23:09:31 +0000128 self.close()
129 elif self.len == 0:
130 logging.error("empty frame, it's really strange")
131 self.close()
132 else:
133 self.message = ''
134 self.status = WAIT_MESSAGE
135
136 @socket_exception
137 def read(self):
138 """Reads data from stream and switch state."""
139 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
140 if self.status == WAIT_LEN:
141 self._read_len()
142 # go back to the main loop here for simplicity instead of
143 # falling through, even though there is a good chance that
144 # the message is already available
145 elif self.status == WAIT_MESSAGE:
146 read = self.socket.recv(self.len - len(self.message))
147 if len(read) == 0:
Bryan Duxbury69720412012-01-03 17:32:30 +0000148 logging.error("can't read frame from socket (get %d of "
149 "%d bytes)" % (len(self.message), self.len))
David Reiss74421272008-11-07 23:09:31 +0000150 self.close()
151 return
152 self.message += read
153 if len(self.message) == self.len:
154 self.status = WAIT_PROCESS
155
156 @socket_exception
157 def write(self):
158 """Writes data from socket and switch state."""
159 assert self.status == SEND_ANSWER
160 sent = self.socket.send(self.message)
161 if sent == len(self.message):
162 self.status = WAIT_LEN
163 self.message = ''
164 self.len = 0
165 else:
166 self.message = self.message[sent:]
167
168 @locked
169 def ready(self, all_ok, message):
170 """Callback function for switching state and waking up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000171
David Reiss74421272008-11-07 23:09:31 +0000172 This function is the only function witch can be called asynchronous.
Bryan Duxbury69720412012-01-03 17:32:30 +0000173
David Reiss74421272008-11-07 23:09:31 +0000174 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000175 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000176 SEND_ANSWER if request was processed in normal way.
177 CLOSED if request throws unexpected exception.
Bryan Duxbury69720412012-01-03 17:32:30 +0000178
David Reiss74421272008-11-07 23:09:31 +0000179 The one wakes up main thread.
180 """
181 assert self.status == WAIT_PROCESS
182 if not all_ok:
183 self.close()
184 self.wake_up()
185 return
186 self.len = ''
David Reiss74421272008-11-07 23:09:31 +0000187 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000188 # it was a oneway request, do not write answer
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000189 self.message = ''
David Reiss74421272008-11-07 23:09:31 +0000190 self.status = WAIT_LEN
191 else:
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000192 self.message = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000193 self.status = SEND_ANSWER
194 self.wake_up()
195
196 @locked
197 def is_writeable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000198 """Return True if connection should be added to write list of select"""
David Reiss74421272008-11-07 23:09:31 +0000199 return self.status == SEND_ANSWER
200
201 # it's not necessary, but...
202 @locked
203 def is_readable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000204 """Return True if connection should be added to read list of select"""
David Reiss74421272008-11-07 23:09:31 +0000205 return self.status in (WAIT_LEN, WAIT_MESSAGE)
206
207 @locked
208 def is_closed(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000209 """Returns True if connection is closed."""
David Reiss74421272008-11-07 23:09:31 +0000210 return self.status == CLOSED
211
212 def fileno(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000213 """Returns the file descriptor of the associated socket."""
David Reiss74421272008-11-07 23:09:31 +0000214 return self.socket.fileno()
215
216 def close(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000217 """Closes connection"""
David Reiss74421272008-11-07 23:09:31 +0000218 self.status = CLOSED
219 self.socket.close()
220
Bryan Duxbury69720412012-01-03 17:32:30 +0000221
David Reiss74421272008-11-07 23:09:31 +0000222class TNonblockingServer:
223 """Non-blocking server."""
Bryan Duxbury69720412012-01-03 17:32:30 +0000224
225 def __init__(self,
226 processor,
227 lsocket,
228 inputProtocolFactory=None,
229 outputProtocolFactory=None,
230 threads=10):
David Reiss74421272008-11-07 23:09:31 +0000231 self.processor = processor
232 self.socket = lsocket
233 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
234 self.out_protocol = outputProtocolFactory or self.in_protocol
235 self.threads = int(threads)
236 self.clients = {}
237 self.tasks = Queue.Queue()
238 self._read, self._write = socket.socketpair()
239 self.prepared = False
Roger Meiercfff8562012-04-13 14:24:55 +0000240 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000241
242 def setNumThreads(self, num):
243 """Set the number of worker threads that should be created."""
244 # implement ThreadPool interface
Bryan Duxbury69720412012-01-03 17:32:30 +0000245 assert not self.prepared, "Can't change number of threads after start"
David Reiss74421272008-11-07 23:09:31 +0000246 self.threads = num
247
248 def prepare(self):
249 """Prepares server for serve requests."""
Roger Meiercfff8562012-04-13 14:24:55 +0000250 if self.prepared:
251 return
David Reiss74421272008-11-07 23:09:31 +0000252 self.socket.listen()
253 for _ in xrange(self.threads):
254 thread = Worker(self.tasks)
255 thread.setDaemon(True)
256 thread.start()
257 self.prepared = True
258
259 def wake_up(self):
260 """Wake up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000261
David Reiss74421272008-11-07 23:09:31 +0000262 The server usualy waits in select call in we should terminate one.
263 The simplest way is using socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000264
David Reiss74421272008-11-07 23:09:31 +0000265 Select always wait to read from the first socket of socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000266
David Reiss74421272008-11-07 23:09:31 +0000267 In this case, we can just write anything to the second socket from
Bryan Duxbury69720412012-01-03 17:32:30 +0000268 socketpair.
269 """
David Reiss74421272008-11-07 23:09:31 +0000270 self._write.send('1')
271
Roger Meiercfff8562012-04-13 14:24:55 +0000272 def stop(self):
273 """Stop the server.
274
275 This method causes the serve() method to return. stop() may be invoked
276 from within your handler, or from another thread.
277
278 After stop() is called, serve() will return but the server will still
279 be listening on the socket. serve() may then be called again to resume
280 processing requests. Alternatively, close() may be called after
281 serve() returns to close the server socket and shutdown all worker
282 threads.
283 """
284 self._stop = True
285 self.wake_up()
286
David Reiss74421272008-11-07 23:09:31 +0000287 def _select(self):
288 """Does select on open connections."""
289 readable = [self.socket.handle.fileno(), self._read.fileno()]
290 writable = []
291 for i, connection in self.clients.items():
292 if connection.is_readable():
293 readable.append(connection.fileno())
294 if connection.is_writeable():
295 writable.append(connection.fileno())
296 if connection.is_closed():
297 del self.clients[i]
298 return select.select(readable, writable, readable)
Bryan Duxbury69720412012-01-03 17:32:30 +0000299
David Reiss74421272008-11-07 23:09:31 +0000300 def handle(self):
301 """Handle requests.
Bryan Duxbury69720412012-01-03 17:32:30 +0000302
303 WARNING! You must call prepare() BEFORE calling handle()
David Reiss74421272008-11-07 23:09:31 +0000304 """
305 assert self.prepared, "You have to call prepare before handle"
306 rset, wset, xset = self._select()
307 for readable in rset:
308 if readable == self._read.fileno():
309 # don't care i just need to clean readable flag
Bryan Duxbury69720412012-01-03 17:32:30 +0000310 self._read.recv(1024)
David Reiss74421272008-11-07 23:09:31 +0000311 elif readable == self.socket.handle.fileno():
312 client = self.socket.accept().handle
Bryan Duxbury69720412012-01-03 17:32:30 +0000313 self.clients[client.fileno()] = Connection(client,
314 self.wake_up)
David Reiss74421272008-11-07 23:09:31 +0000315 else:
316 connection = self.clients[readable]
317 connection.read()
318 if connection.status == WAIT_PROCESS:
319 itransport = TTransport.TMemoryBuffer(connection.message)
320 otransport = TTransport.TMemoryBuffer()
321 iprot = self.in_protocol.getProtocol(itransport)
322 oprot = self.out_protocol.getProtocol(otransport)
Bryan Duxbury69720412012-01-03 17:32:30 +0000323 self.tasks.put([self.processor, iprot, oprot,
David Reiss74421272008-11-07 23:09:31 +0000324 otransport, connection.ready])
325 for writeable in wset:
326 self.clients[writeable].write()
327 for oob in xset:
328 self.clients[oob].close()
329 del self.clients[oob]
330
331 def close(self):
332 """Closes the server."""
333 for _ in xrange(self.threads):
334 self.tasks.put([None, None, None, None, None])
335 self.socket.close()
336 self.prepared = False
Bryan Duxbury69720412012-01-03 17:32:30 +0000337
David Reiss74421272008-11-07 23:09:31 +0000338 def serve(self):
Roger Meiercfff8562012-04-13 14:24:55 +0000339 """Serve requests.
340
341 Serve requests forever, or until stop() is called.
342 """
343 self._stop = False
David Reiss74421272008-11-07 23:09:31 +0000344 self.prepare()
Roger Meiercfff8562012-04-13 14:24:55 +0000345 while not self._stop:
David Reiss74421272008-11-07 23:09:31 +0000346 self.handle()