blob: cd90b4fc040e233de595aa2950447f72864970d2 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
Bryan Duxbury69720412012-01-03 17:32:30 +000021The main idea of the server is to receive and send requests
22only from the main thread.
David Reiss74421272008-11-07 23:09:31 +000023
Bryan Duxbury69720412012-01-03 17:32:30 +000024The thread poool should be sized for concurrent tasks, not
25maximum connections
David Reiss74421272008-11-07 23:09:31 +000026"""
27import threading
28import socket
29import Queue
30import select
31import struct
32import logging
33
34from thrift.transport import TTransport
35from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
36
37__all__ = ['TNonblockingServer']
38
Bryan Duxbury69720412012-01-03 17:32:30 +000039
David Reiss74421272008-11-07 23:09:31 +000040class Worker(threading.Thread):
41 """Worker is a small helper to process incoming connection."""
Bryan Duxbury69720412012-01-03 17:32:30 +000042
David Reiss74421272008-11-07 23:09:31 +000043 def __init__(self, queue):
44 threading.Thread.__init__(self)
45 self.queue = queue
46
47 def run(self):
48 """Process queries from task queue, stop if processor is None."""
49 while True:
50 try:
51 processor, iprot, oprot, otrans, callback = self.queue.get()
52 if processor is None:
53 break
54 processor.process(iprot, oprot)
55 callback(True, otrans.getvalue())
56 except Exception:
57 logging.exception("Exception while processing request")
58 callback(False, '')
59
60WAIT_LEN = 0
61WAIT_MESSAGE = 1
62WAIT_PROCESS = 2
63SEND_ANSWER = 3
64CLOSED = 4
65
Bryan Duxbury69720412012-01-03 17:32:30 +000066
David Reiss74421272008-11-07 23:09:31 +000067def locked(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000068 """Decorator which locks self.lock."""
David Reiss74421272008-11-07 23:09:31 +000069 def nested(self, *args, **kwargs):
70 self.lock.acquire()
71 try:
72 return func(self, *args, **kwargs)
73 finally:
74 self.lock.release()
75 return nested
76
Bryan Duxbury69720412012-01-03 17:32:30 +000077
David Reiss74421272008-11-07 23:09:31 +000078def socket_exception(func):
Bryan Duxbury69720412012-01-03 17:32:30 +000079 """Decorator close object on socket.error."""
David Reiss74421272008-11-07 23:09:31 +000080 def read(self, *args, **kwargs):
81 try:
82 return func(self, *args, **kwargs)
83 except socket.error:
84 self.close()
85 return read
86
Bryan Duxbury69720412012-01-03 17:32:30 +000087
David Reiss74421272008-11-07 23:09:31 +000088class Connection:
89 """Basic class is represented connection.
Bryan Duxbury69720412012-01-03 17:32:30 +000090
David Reiss74421272008-11-07 23:09:31 +000091 It can be in state:
92 WAIT_LEN --- connection is reading request len.
93 WAIT_MESSAGE --- connection is reading request.
Bryan Duxbury69720412012-01-03 17:32:30 +000094 WAIT_PROCESS --- connection has just read whole request and
95 waits for call ready routine.
David Reiss74421272008-11-07 23:09:31 +000096 SEND_ANSWER --- connection is sending answer string (including length
Bryan Duxbury69720412012-01-03 17:32:30 +000097 of answer).
David Reiss74421272008-11-07 23:09:31 +000098 CLOSED --- socket was closed and connection should be deleted.
99 """
100 def __init__(self, new_socket, wake_up):
101 self.socket = new_socket
102 self.socket.setblocking(False)
103 self.status = WAIT_LEN
104 self.len = 0
105 self.message = ''
106 self.lock = threading.Lock()
107 self.wake_up = wake_up
108
109 def _read_len(self):
110 """Reads length of request.
Bryan Duxbury69720412012-01-03 17:32:30 +0000111
112 It's a safer alternative to self.socket.recv(4)
113 """
David Reiss74421272008-11-07 23:09:31 +0000114 read = self.socket.recv(4 - len(self.message))
115 if len(read) == 0:
Bryan Duxbury69720412012-01-03 17:32:30 +0000116 # if we read 0 bytes and self.message is empty, then
117 # the client closed the connection
David Reiss74421272008-11-07 23:09:31 +0000118 if len(self.message) != 0:
119 logging.error("can't read frame size from socket")
120 self.close()
121 return
122 self.message += read
123 if len(self.message) == 4:
124 self.len, = struct.unpack('!i', self.message)
125 if self.len < 0:
Bryan Duxbury69720412012-01-03 17:32:30 +0000126 logging.error("negative frame size, it seems client "
127 "doesn't use FramedTransport")
David Reiss74421272008-11-07 23:09:31 +0000128 self.close()
129 elif self.len == 0:
130 logging.error("empty frame, it's really strange")
131 self.close()
132 else:
133 self.message = ''
134 self.status = WAIT_MESSAGE
135
136 @socket_exception
137 def read(self):
138 """Reads data from stream and switch state."""
139 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
140 if self.status == WAIT_LEN:
141 self._read_len()
142 # go back to the main loop here for simplicity instead of
143 # falling through, even though there is a good chance that
144 # the message is already available
145 elif self.status == WAIT_MESSAGE:
146 read = self.socket.recv(self.len - len(self.message))
147 if len(read) == 0:
Bryan Duxbury69720412012-01-03 17:32:30 +0000148 logging.error("can't read frame from socket (get %d of "
149 "%d bytes)" % (len(self.message), self.len))
David Reiss74421272008-11-07 23:09:31 +0000150 self.close()
151 return
152 self.message += read
153 if len(self.message) == self.len:
154 self.status = WAIT_PROCESS
155
156 @socket_exception
157 def write(self):
158 """Writes data from socket and switch state."""
159 assert self.status == SEND_ANSWER
160 sent = self.socket.send(self.message)
161 if sent == len(self.message):
162 self.status = WAIT_LEN
163 self.message = ''
164 self.len = 0
165 else:
166 self.message = self.message[sent:]
167
168 @locked
169 def ready(self, all_ok, message):
170 """Callback function for switching state and waking up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000171
David Reiss74421272008-11-07 23:09:31 +0000172 This function is the only function witch can be called asynchronous.
Bryan Duxbury69720412012-01-03 17:32:30 +0000173
David Reiss74421272008-11-07 23:09:31 +0000174 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000175 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000176 SEND_ANSWER if request was processed in normal way.
177 CLOSED if request throws unexpected exception.
Bryan Duxbury69720412012-01-03 17:32:30 +0000178
David Reiss74421272008-11-07 23:09:31 +0000179 The one wakes up main thread.
180 """
181 assert self.status == WAIT_PROCESS
182 if not all_ok:
183 self.close()
184 self.wake_up()
185 return
186 self.len = ''
David Reiss74421272008-11-07 23:09:31 +0000187 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000188 # it was a oneway request, do not write answer
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000189 self.message = ''
David Reiss74421272008-11-07 23:09:31 +0000190 self.status = WAIT_LEN
191 else:
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000192 self.message = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000193 self.status = SEND_ANSWER
194 self.wake_up()
195
196 @locked
197 def is_writeable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000198 """Return True if connection should be added to write list of select"""
David Reiss74421272008-11-07 23:09:31 +0000199 return self.status == SEND_ANSWER
200
201 # it's not necessary, but...
202 @locked
203 def is_readable(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000204 """Return True if connection should be added to read list of select"""
David Reiss74421272008-11-07 23:09:31 +0000205 return self.status in (WAIT_LEN, WAIT_MESSAGE)
206
207 @locked
208 def is_closed(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000209 """Returns True if connection is closed."""
David Reiss74421272008-11-07 23:09:31 +0000210 return self.status == CLOSED
211
212 def fileno(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000213 """Returns the file descriptor of the associated socket."""
David Reiss74421272008-11-07 23:09:31 +0000214 return self.socket.fileno()
215
216 def close(self):
Bryan Duxbury69720412012-01-03 17:32:30 +0000217 """Closes connection"""
David Reiss74421272008-11-07 23:09:31 +0000218 self.status = CLOSED
219 self.socket.close()
220
Bryan Duxbury69720412012-01-03 17:32:30 +0000221
David Reiss74421272008-11-07 23:09:31 +0000222class TNonblockingServer:
223 """Non-blocking server."""
Bryan Duxbury69720412012-01-03 17:32:30 +0000224
225 def __init__(self,
226 processor,
227 lsocket,
228 inputProtocolFactory=None,
229 outputProtocolFactory=None,
230 threads=10):
David Reiss74421272008-11-07 23:09:31 +0000231 self.processor = processor
232 self.socket = lsocket
233 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
234 self.out_protocol = outputProtocolFactory or self.in_protocol
235 self.threads = int(threads)
236 self.clients = {}
237 self.tasks = Queue.Queue()
238 self._read, self._write = socket.socketpair()
239 self.prepared = False
240
241 def setNumThreads(self, num):
242 """Set the number of worker threads that should be created."""
243 # implement ThreadPool interface
Bryan Duxbury69720412012-01-03 17:32:30 +0000244 assert not self.prepared, "Can't change number of threads after start"
David Reiss74421272008-11-07 23:09:31 +0000245 self.threads = num
246
247 def prepare(self):
248 """Prepares server for serve requests."""
249 self.socket.listen()
250 for _ in xrange(self.threads):
251 thread = Worker(self.tasks)
252 thread.setDaemon(True)
253 thread.start()
254 self.prepared = True
255
256 def wake_up(self):
257 """Wake up main thread.
Bryan Duxbury69720412012-01-03 17:32:30 +0000258
David Reiss74421272008-11-07 23:09:31 +0000259 The server usualy waits in select call in we should terminate one.
260 The simplest way is using socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000261
David Reiss74421272008-11-07 23:09:31 +0000262 Select always wait to read from the first socket of socketpair.
Bryan Duxbury69720412012-01-03 17:32:30 +0000263
David Reiss74421272008-11-07 23:09:31 +0000264 In this case, we can just write anything to the second socket from
Bryan Duxbury69720412012-01-03 17:32:30 +0000265 socketpair.
266 """
David Reiss74421272008-11-07 23:09:31 +0000267 self._write.send('1')
268
269 def _select(self):
270 """Does select on open connections."""
271 readable = [self.socket.handle.fileno(), self._read.fileno()]
272 writable = []
273 for i, connection in self.clients.items():
274 if connection.is_readable():
275 readable.append(connection.fileno())
276 if connection.is_writeable():
277 writable.append(connection.fileno())
278 if connection.is_closed():
279 del self.clients[i]
280 return select.select(readable, writable, readable)
Bryan Duxbury69720412012-01-03 17:32:30 +0000281
David Reiss74421272008-11-07 23:09:31 +0000282 def handle(self):
283 """Handle requests.
Bryan Duxbury69720412012-01-03 17:32:30 +0000284
285 WARNING! You must call prepare() BEFORE calling handle()
David Reiss74421272008-11-07 23:09:31 +0000286 """
287 assert self.prepared, "You have to call prepare before handle"
288 rset, wset, xset = self._select()
289 for readable in rset:
290 if readable == self._read.fileno():
291 # don't care i just need to clean readable flag
Bryan Duxbury69720412012-01-03 17:32:30 +0000292 self._read.recv(1024)
David Reiss74421272008-11-07 23:09:31 +0000293 elif readable == self.socket.handle.fileno():
294 client = self.socket.accept().handle
Bryan Duxbury69720412012-01-03 17:32:30 +0000295 self.clients[client.fileno()] = Connection(client,
296 self.wake_up)
David Reiss74421272008-11-07 23:09:31 +0000297 else:
298 connection = self.clients[readable]
299 connection.read()
300 if connection.status == WAIT_PROCESS:
301 itransport = TTransport.TMemoryBuffer(connection.message)
302 otransport = TTransport.TMemoryBuffer()
303 iprot = self.in_protocol.getProtocol(itransport)
304 oprot = self.out_protocol.getProtocol(otransport)
Bryan Duxbury69720412012-01-03 17:32:30 +0000305 self.tasks.put([self.processor, iprot, oprot,
David Reiss74421272008-11-07 23:09:31 +0000306 otransport, connection.ready])
307 for writeable in wset:
308 self.clients[writeable].write()
309 for oob in xset:
310 self.clients[oob].close()
311 del self.clients[oob]
312
313 def close(self):
314 """Closes the server."""
315 for _ in xrange(self.threads):
316 self.tasks.put([None, None, None, None, None])
317 self.socket.close()
318 self.prepared = False
Bryan Duxbury69720412012-01-03 17:32:30 +0000319
David Reiss74421272008-11-07 23:09:31 +0000320 def serve(self):
321 """Serve forever."""
322 self.prepare()
323 while True:
324 self.handle()