blob: ea348a0b61593a3186a5fd5c1c7c53e012cc4d60 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
David Reiss74421272008-11-07 23:09:31 +000019"""Implementation of non-blocking server.
20
21The main idea of the server is reciving and sending requests
22only from main thread.
23
24It also makes thread pool server in tasks terms, not connections.
25"""
26import threading
27import socket
28import Queue
29import select
30import struct
31import logging
32
33from thrift.transport import TTransport
34from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
35
36__all__ = ['TNonblockingServer']
37
38class Worker(threading.Thread):
39 """Worker is a small helper to process incoming connection."""
40 def __init__(self, queue):
41 threading.Thread.__init__(self)
42 self.queue = queue
43
44 def run(self):
45 """Process queries from task queue, stop if processor is None."""
46 while True:
47 try:
48 processor, iprot, oprot, otrans, callback = self.queue.get()
49 if processor is None:
50 break
51 processor.process(iprot, oprot)
52 callback(True, otrans.getvalue())
53 except Exception:
54 logging.exception("Exception while processing request")
55 callback(False, '')
56
57WAIT_LEN = 0
58WAIT_MESSAGE = 1
59WAIT_PROCESS = 2
60SEND_ANSWER = 3
61CLOSED = 4
62
63def locked(func):
64 "Decorator which locks self.lock."
65 def nested(self, *args, **kwargs):
66 self.lock.acquire()
67 try:
68 return func(self, *args, **kwargs)
69 finally:
70 self.lock.release()
71 return nested
72
73def socket_exception(func):
74 "Decorator close object on socket.error."
75 def read(self, *args, **kwargs):
76 try:
77 return func(self, *args, **kwargs)
78 except socket.error:
79 self.close()
80 return read
81
82class Connection:
83 """Basic class is represented connection.
84
85 It can be in state:
86 WAIT_LEN --- connection is reading request len.
87 WAIT_MESSAGE --- connection is reading request.
88 WAIT_PROCESS --- connection has just read whole request and
89 waits for call ready routine.
90 SEND_ANSWER --- connection is sending answer string (including length
91 of answer).
92 CLOSED --- socket was closed and connection should be deleted.
93 """
94 def __init__(self, new_socket, wake_up):
95 self.socket = new_socket
96 self.socket.setblocking(False)
97 self.status = WAIT_LEN
98 self.len = 0
99 self.message = ''
100 self.lock = threading.Lock()
101 self.wake_up = wake_up
102
103 def _read_len(self):
104 """Reads length of request.
105
106 It's really paranoic routine and it may be replaced by
107 self.socket.recv(4)."""
108 read = self.socket.recv(4 - len(self.message))
109 if len(read) == 0:
110 # if we read 0 bytes and self.message is empty, it means client close
111 # connection
112 if len(self.message) != 0:
113 logging.error("can't read frame size from socket")
114 self.close()
115 return
116 self.message += read
117 if len(self.message) == 4:
118 self.len, = struct.unpack('!i', self.message)
119 if self.len < 0:
120 logging.error("negative frame size, it seems client"\
121 " doesn't use FramedTransport")
122 self.close()
123 elif self.len == 0:
124 logging.error("empty frame, it's really strange")
125 self.close()
126 else:
127 self.message = ''
128 self.status = WAIT_MESSAGE
129
130 @socket_exception
131 def read(self):
132 """Reads data from stream and switch state."""
133 assert self.status in (WAIT_LEN, WAIT_MESSAGE)
134 if self.status == WAIT_LEN:
135 self._read_len()
136 # go back to the main loop here for simplicity instead of
137 # falling through, even though there is a good chance that
138 # the message is already available
139 elif self.status == WAIT_MESSAGE:
140 read = self.socket.recv(self.len - len(self.message))
141 if len(read) == 0:
142 logging.error("can't read frame from socket (get %d of %d bytes)" %
143 (len(self.message), self.len))
144 self.close()
145 return
146 self.message += read
147 if len(self.message) == self.len:
148 self.status = WAIT_PROCESS
149
150 @socket_exception
151 def write(self):
152 """Writes data from socket and switch state."""
153 assert self.status == SEND_ANSWER
154 sent = self.socket.send(self.message)
155 if sent == len(self.message):
156 self.status = WAIT_LEN
157 self.message = ''
158 self.len = 0
159 else:
160 self.message = self.message[sent:]
161
162 @locked
163 def ready(self, all_ok, message):
164 """Callback function for switching state and waking up main thread.
165
166 This function is the only function witch can be called asynchronous.
167
168 The ready can switch Connection to three states:
David Reiss6ce401d2009-03-24 20:01:58 +0000169 WAIT_LEN if request was oneway.
David Reiss74421272008-11-07 23:09:31 +0000170 SEND_ANSWER if request was processed in normal way.
171 CLOSED if request throws unexpected exception.
172
173 The one wakes up main thread.
174 """
175 assert self.status == WAIT_PROCESS
176 if not all_ok:
177 self.close()
178 self.wake_up()
179 return
180 self.len = ''
David Reiss74421272008-11-07 23:09:31 +0000181 if len(message) == 0:
David Reissc51986f2009-03-24 20:01:25 +0000182 # it was a oneway request, do not write answer
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000183 self.message = ''
David Reiss74421272008-11-07 23:09:31 +0000184 self.status = WAIT_LEN
185 else:
Todd Lipconf5dea4c2009-12-03 01:18:44 +0000186 self.message = struct.pack('!i', len(message)) + message
David Reiss74421272008-11-07 23:09:31 +0000187 self.status = SEND_ANSWER
188 self.wake_up()
189
190 @locked
191 def is_writeable(self):
192 "Returns True if connection should be added to write list of select."
193 return self.status == SEND_ANSWER
194
195 # it's not necessary, but...
196 @locked
197 def is_readable(self):
198 "Returns True if connection should be added to read list of select."
199 return self.status in (WAIT_LEN, WAIT_MESSAGE)
200
201 @locked
202 def is_closed(self):
203 "Returns True if connection is closed."
204 return self.status == CLOSED
205
206 def fileno(self):
207 "Returns the file descriptor of the associated socket."
208 return self.socket.fileno()
209
210 def close(self):
211 "Closes connection"
212 self.status = CLOSED
213 self.socket.close()
214
215class TNonblockingServer:
216 """Non-blocking server."""
217 def __init__(self, processor, lsocket, inputProtocolFactory=None,
218 outputProtocolFactory=None, threads=10):
219 self.processor = processor
220 self.socket = lsocket
221 self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
222 self.out_protocol = outputProtocolFactory or self.in_protocol
223 self.threads = int(threads)
224 self.clients = {}
225 self.tasks = Queue.Queue()
226 self._read, self._write = socket.socketpair()
227 self.prepared = False
228
229 def setNumThreads(self, num):
230 """Set the number of worker threads that should be created."""
231 # implement ThreadPool interface
232 assert not self.prepared, "You can't change number of threads for working server"
233 self.threads = num
234
235 def prepare(self):
236 """Prepares server for serve requests."""
237 self.socket.listen()
238 for _ in xrange(self.threads):
239 thread = Worker(self.tasks)
240 thread.setDaemon(True)
241 thread.start()
242 self.prepared = True
243
244 def wake_up(self):
245 """Wake up main thread.
246
247 The server usualy waits in select call in we should terminate one.
248 The simplest way is using socketpair.
249
250 Select always wait to read from the first socket of socketpair.
251
252 In this case, we can just write anything to the second socket from
253 socketpair."""
254 self._write.send('1')
255
256 def _select(self):
257 """Does select on open connections."""
258 readable = [self.socket.handle.fileno(), self._read.fileno()]
259 writable = []
260 for i, connection in self.clients.items():
261 if connection.is_readable():
262 readable.append(connection.fileno())
263 if connection.is_writeable():
264 writable.append(connection.fileno())
265 if connection.is_closed():
266 del self.clients[i]
267 return select.select(readable, writable, readable)
268
269 def handle(self):
270 """Handle requests.
271
272 WARNING! You must call prepare BEFORE calling handle.
273 """
274 assert self.prepared, "You have to call prepare before handle"
275 rset, wset, xset = self._select()
276 for readable in rset:
277 if readable == self._read.fileno():
278 # don't care i just need to clean readable flag
279 self._read.recv(1024)
280 elif readable == self.socket.handle.fileno():
281 client = self.socket.accept().handle
282 self.clients[client.fileno()] = Connection(client, self.wake_up)
283 else:
284 connection = self.clients[readable]
285 connection.read()
286 if connection.status == WAIT_PROCESS:
287 itransport = TTransport.TMemoryBuffer(connection.message)
288 otransport = TTransport.TMemoryBuffer()
289 iprot = self.in_protocol.getProtocol(itransport)
290 oprot = self.out_protocol.getProtocol(otransport)
291 self.tasks.put([self.processor, iprot, oprot,
292 otransport, connection.ready])
293 for writeable in wset:
294 self.clients[writeable].write()
295 for oob in xset:
296 self.clients[oob].close()
297 del self.clients[oob]
298
299 def close(self):
300 """Closes the server."""
301 for _ in xrange(self.threads):
302 self.tasks.put([None, None, None, None, None])
303 self.socket.close()
304 self.prepared = False
305
306 def serve(self):
307 """Serve forever."""
308 self.prepare()
309 while True:
310 self.handle()