blob: 6152911142be60c2e8bb1a03194544b129631b72 [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#
David Reissea2cba82009-03-30 21:35:00 +00002# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
Mark Slee89e2bb82007-03-01 00:20:36 +000019
David Reissb04df762008-06-10 22:55:38 +000020import logging
Mark Sleec98d0502006-09-06 02:42:25 +000021import sys
David Reiss66536542008-06-10 22:54:49 +000022import os
Mark Sleec98d0502006-09-06 02:42:25 +000023import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +000024import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +000025import Queue
Mark Sleec98d0502006-09-06 02:42:25 +000026
Mark Sleec9676562006-09-05 17:34:52 +000027from thrift.Thrift import TProcessor
28from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +000029from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +000030
31class TServer:
32
Mark Slee794993d2006-09-20 01:56:10 +000033 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000034
Aditya Agarwal5c468192007-02-06 01:14:33 +000035 """ 3 constructors for all servers:
36 1) (processor, serverTransport)
37 2) (processor, serverTransport, transportFactory, protocolFactory)
38 3) (processor, serverTransport,
39 inputTransportFactory, outputTransportFactory,
40 inputProtocolFactory, outputProtocolFactory)"""
41 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000042 if (len(args) == 2):
43 self.__initArgs__(args[0], args[1],
44 TTransport.TTransportFactoryBase(),
45 TTransport.TTransportFactoryBase(),
46 TBinaryProtocol.TBinaryProtocolFactory(),
47 TBinaryProtocol.TBinaryProtocolFactory())
48 elif (len(args) == 4):
49 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
50 elif (len(args) == 6):
51 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
52
53 def __initArgs__(self, processor, serverTransport,
54 inputTransportFactory, outputTransportFactory,
55 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000056 self.processor = processor
57 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000058 self.inputTransportFactory = inputTransportFactory
59 self.outputTransportFactory = outputTransportFactory
60 self.inputProtocolFactory = inputProtocolFactory
61 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000062
Mark Slee794993d2006-09-20 01:56:10 +000063 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000064 pass
65
66class TSimpleServer(TServer):
67
68 """Simple single-threaded server that just pumps around one transport."""
69
Aditya Agarwal5c468192007-02-06 01:14:33 +000070 def __init__(self, *args):
71 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000072
Mark Slee794993d2006-09-20 01:56:10 +000073 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000074 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000075 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000076 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000077 itrans = self.inputTransportFactory.getTransport(client)
78 otrans = self.outputTransportFactory.getTransport(client)
79 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000080 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000081 try:
82 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000083 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000084 except TTransport.TTransportException, tx:
85 pass
Mark Sleec9676562006-09-05 17:34:52 +000086 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +000087 logging.exception(x)
Mark Sleed788b2e2006-09-07 01:26:35 +000088
Mark Slee4ac459f2006-10-25 21:39:01 +000089 itrans.close()
90 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000091
92class TThreadedServer(TServer):
93
94 """Threaded server that spawns a new thread per each connection."""
95
Aditya Agarwal5c468192007-02-06 01:14:33 +000096 def __init__(self, *args):
97 TServer.__init__(self, *args)
Mark Slee4f0fed62006-10-02 17:50:08 +000098
99 def serve(self):
100 self.serverTransport.listen()
101 while True:
102 try:
103 client = self.serverTransport.accept()
104 t = threading.Thread(target = self.handle, args=(client,))
105 t.start()
Mark Slee5299a952007-10-05 00:13:24 +0000106 except KeyboardInterrupt:
107 raise
Mark Slee4f0fed62006-10-02 17:50:08 +0000108 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000109 logging.exception(x)
Mark Slee4f0fed62006-10-02 17:50:08 +0000110
111 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000112 itrans = self.inputTransportFactory.getTransport(client)
113 otrans = self.outputTransportFactory.getTransport(client)
114 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000115 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000116 try:
117 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000118 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000119 except TTransport.TTransportException, tx:
120 pass
121 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000122 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000123
Mark Slee4ac459f2006-10-25 21:39:01 +0000124 itrans.close()
125 otrans.close()
126
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000127class TThreadPoolServer(TServer):
128
129 """Server with a fixed size pool of threads which service requests."""
130
Aditya Agarwal5c468192007-02-06 01:14:33 +0000131 def __init__(self, *args):
132 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000133 self.clients = Queue.Queue()
134 self.threads = 10
135
Mark Slee4ce787f2006-10-24 18:54:06 +0000136 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000137 """Set the number of worker threads that should be created"""
138 self.threads = num
139
140 def serveThread(self):
141 """Loop around getting clients from the shared queue and process them."""
142 while True:
143 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000144 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000145 self.serveClient(client)
146 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000147 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000148
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000149 def serveClient(self, client):
150 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000151 itrans = self.inputTransportFactory.getTransport(client)
152 otrans = self.outputTransportFactory.getTransport(client)
153 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000154 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000155 try:
156 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000157 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000158 except TTransport.TTransportException, tx:
159 pass
160 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000161 logging.exception(x)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000162
Mark Slee4ac459f2006-10-25 21:39:01 +0000163 itrans.close()
164 otrans.close()
165
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000166 def serve(self):
167 """Start a fixed number of worker threads and put client into a queue"""
168 for i in range(self.threads):
169 try:
170 t = threading.Thread(target = self.serveThread)
171 t.start()
172 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000173 logging.exception(x)
David Reiss0c90f6f2008-02-06 22:18:40 +0000174
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000175 # Pump the socket for clients
176 self.serverTransport.listen()
177 while True:
178 try:
179 client = self.serverTransport.accept()
180 self.clients.put(client)
181 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000182 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000183
184
David Reiss66536542008-06-10 22:54:49 +0000185class TForkingServer(TServer):
186
187 """A Thrift server that forks a new process for each request"""
188 """
189 This is more scalable than the threaded server as it does not cause
190 GIL contention.
191
192 Note that this has different semantics from the threading server.
193 Specifically, updates to shared variables will no longer be shared.
194 It will also not work on windows.
195
196 This code is heavily inspired by SocketServer.ForkingMixIn in the
197 Python stdlib.
198 """
199
200 def __init__(self, *args):
201 TServer.__init__(self, *args)
202 self.children = []
203
204 def serve(self):
David Reissbcaa2ad2008-06-10 22:55:26 +0000205 def try_close(file):
206 try:
207 file.close()
208 except IOError, e:
David Reissb04df762008-06-10 22:55:38 +0000209 logging.warning(e, exc_info=True)
David Reissbcaa2ad2008-06-10 22:55:26 +0000210
211
David Reiss66536542008-06-10 22:54:49 +0000212 self.serverTransport.listen()
213 while True:
214 client = self.serverTransport.accept()
215 try:
216 pid = os.fork()
217
218 if pid: # parent
219 # add before collect, otherwise you race w/ waitpid
220 self.children.append(pid)
221 self.collect_children()
222
David Reissbcaa2ad2008-06-10 22:55:26 +0000223 # Parent must close socket or the connection may not get
224 # closed promptly
225 itrans = self.inputTransportFactory.getTransport(client)
226 otrans = self.outputTransportFactory.getTransport(client)
227 try_close(itrans)
228 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000229 else:
230 itrans = self.inputTransportFactory.getTransport(client)
231 otrans = self.outputTransportFactory.getTransport(client)
232
233 iprot = self.inputProtocolFactory.getProtocol(itrans)
234 oprot = self.outputProtocolFactory.getProtocol(otrans)
235
David Reissbcaa2ad2008-06-10 22:55:26 +0000236 ecode = 0
David Reiss66536542008-06-10 22:54:49 +0000237 try:
Kevin Clark1e0744d2008-06-24 20:46:32 +0000238 try:
239 while True:
240 self.processor.process(iprot, oprot)
241 except TTransport.TTransportException, tx:
242 pass
243 except Exception, e:
244 logging.exception(e)
245 ecode = 1
David Reissbcaa2ad2008-06-10 22:55:26 +0000246 finally:
247 try_close(itrans)
248 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000249
David Reissbcaa2ad2008-06-10 22:55:26 +0000250 os._exit(ecode)
David Reiss66536542008-06-10 22:54:49 +0000251
252 except TTransport.TTransportException, tx:
253 pass
254 except Exception, x:
David Reissb04df762008-06-10 22:55:38 +0000255 logging.exception(x)
David Reiss66536542008-06-10 22:54:49 +0000256
257
David Reiss66536542008-06-10 22:54:49 +0000258 def collect_children(self):
259 while self.children:
260 try:
261 pid, status = os.waitpid(0, os.WNOHANG)
262 except os.error:
263 pid = None
264
265 if pid:
266 self.children.remove(pid)
267 else:
268 break
269
270