blob: 2a8d5f40be36b1e5f3d786312b2384beb4c81ae1 [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#!/usr/bin/env python
2#
3# Copyright (c) 2006- Facebook
4# Distributed under the Thrift Software License
5#
6# See accompanying file LICENSE or visit the Thrift site at:
7# http://developers.facebook.com/thrift/
8
Mark Sleec98d0502006-09-06 02:42:25 +00009import sys
10import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +000011import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +000012import Queue
Mark Sleec98d0502006-09-06 02:42:25 +000013
Mark Sleec9676562006-09-05 17:34:52 +000014from thrift.Thrift import TProcessor
15from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +000016from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +000017
18class TServer:
19
Mark Slee794993d2006-09-20 01:56:10 +000020 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000021
Aditya Agarwal5c468192007-02-06 01:14:33 +000022 """ 3 constructors for all servers:
23 1) (processor, serverTransport)
24 2) (processor, serverTransport, transportFactory, protocolFactory)
25 3) (processor, serverTransport,
26 inputTransportFactory, outputTransportFactory,
27 inputProtocolFactory, outputProtocolFactory)"""
28 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000029 if (len(args) == 2):
30 self.__initArgs__(args[0], args[1],
31 TTransport.TTransportFactoryBase(),
32 TTransport.TTransportFactoryBase(),
33 TBinaryProtocol.TBinaryProtocolFactory(),
34 TBinaryProtocol.TBinaryProtocolFactory())
35 elif (len(args) == 4):
36 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
37 elif (len(args) == 6):
38 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
39
40 def __initArgs__(self, processor, serverTransport,
41 inputTransportFactory, outputTransportFactory,
42 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000043 self.processor = processor
44 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000045 self.inputTransportFactory = inputTransportFactory
46 self.outputTransportFactory = outputTransportFactory
47 self.inputProtocolFactory = inputProtocolFactory
48 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000049
Mark Slee794993d2006-09-20 01:56:10 +000050 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000051 pass
52
53class TSimpleServer(TServer):
54
55 """Simple single-threaded server that just pumps around one transport."""
56
Aditya Agarwal5c468192007-02-06 01:14:33 +000057 def __init__(self, *args):
58 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000059
Mark Slee794993d2006-09-20 01:56:10 +000060 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000061 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000062 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000063 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000064 itrans = self.inputTransportFactory.getTransport(client)
65 otrans = self.outputTransportFactory.getTransport(client)
66 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000067 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000068 try:
69 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000070 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000071 except TTransport.TTransportException, tx:
72 pass
Mark Sleec9676562006-09-05 17:34:52 +000073 except Exception, x:
Mark Sleec98d0502006-09-06 02:42:25 +000074 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleed788b2e2006-09-07 01:26:35 +000075
Mark Slee4ac459f2006-10-25 21:39:01 +000076 itrans.close()
77 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000078
79class TThreadedServer(TServer):
80
81 """Threaded server that spawns a new thread per each connection."""
82
Aditya Agarwal5c468192007-02-06 01:14:33 +000083 def __init__(self, *args):
84 TServer.__init__(self, *args)
Mark Slee4f0fed62006-10-02 17:50:08 +000085
86 def serve(self):
87 self.serverTransport.listen()
88 while True:
89 try:
90 client = self.serverTransport.accept()
91 t = threading.Thread(target = self.handle, args=(client,))
92 t.start()
93 except Exception, x:
94 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
95
96 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +000097 itrans = self.inputTransportFactory.getTransport(client)
98 otrans = self.outputTransportFactory.getTransport(client)
99 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000100 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000101 try:
102 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000103 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000104 except TTransport.TTransportException, tx:
105 pass
106 except Exception, x:
107 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000108
Mark Slee4ac459f2006-10-25 21:39:01 +0000109 itrans.close()
110 otrans.close()
111
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000112class TThreadPoolServer(TServer):
113
114 """Server with a fixed size pool of threads which service requests."""
115
Aditya Agarwal5c468192007-02-06 01:14:33 +0000116 def __init__(self, *args):
117 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000118 self.clients = Queue.Queue()
119 self.threads = 10
120
Mark Slee4ce787f2006-10-24 18:54:06 +0000121 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000122 """Set the number of worker threads that should be created"""
123 self.threads = num
124
125 def serveThread(self):
126 """Loop around getting clients from the shared queue and process them."""
127 while True:
128 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000129 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000130 self.serveClient(client)
131 except Exception, x:
132 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
133
134 def serveClient(self, client):
135 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000136 itrans = self.inputTransportFactory.getTransport(client)
137 otrans = self.outputTransportFactory.getTransport(client)
138 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000139 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000140 try:
141 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000142 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000143 except TTransport.TTransportException, tx:
144 pass
145 except Exception, x:
146 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
147
Mark Slee4ac459f2006-10-25 21:39:01 +0000148 itrans.close()
149 otrans.close()
150
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000151 def serve(self):
152 """Start a fixed number of worker threads and put client into a queue"""
153 for i in range(self.threads):
154 try:
155 t = threading.Thread(target = self.serveThread)
156 t.start()
157 except Exception, x:
158 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
159
160 # Pump the socket for clients
161 self.serverTransport.listen()
162 while True:
163 try:
164 client = self.serverTransport.accept()
165 self.clients.put(client)
166 except Exception, x:
167 print '%s, %s, %s' % (type(x), x, traceback.format_exc())