blob: d1b5e0907581a6ab3d116877793753797b4fbb4a [file] [log] [blame]
Mark Slee7eb0d632007-03-01 00:00:27 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeffcddd62006-09-06 20:37:03 +00007package com.facebook.thrift.server;
8
9import com.facebook.thrift.TException;
10import com.facebook.thrift.TProcessor;
Mark Slee448849d2007-05-31 01:30:22 +000011import com.facebook.thrift.TProcessorFactory;
Mark Slee456b7a82006-10-25 20:53:37 +000012import com.facebook.thrift.protocol.TProtocol;
13import com.facebook.thrift.protocol.TProtocolFactory;
Aditya Agarwal5a429582007-02-06 02:51:15 +000014import com.facebook.thrift.protocol.TBinaryProtocol;
Mark Sleeffcddd62006-09-06 20:37:03 +000015import com.facebook.thrift.transport.TServerTransport;
16import com.facebook.thrift.transport.TTransport;
17import com.facebook.thrift.transport.TTransportException;
Mark Sleed788b2e2006-09-07 01:26:35 +000018import com.facebook.thrift.transport.TTransportFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +000019
20import java.util.concurrent.ExecutorService;
Mark Slee737ce022008-01-15 02:59:12 +000021import java.util.concurrent.Executors;
Mark Slee19c97772008-01-10 19:57:47 +000022import java.util.concurrent.SynchronousQueue;
Mark Sleeffcddd62006-09-06 20:37:03 +000023import java.util.concurrent.ThreadPoolExecutor;
24import java.util.concurrent.TimeUnit;
25
26
27/**
28 * Server which uses Java's built in ThreadPool management to spawn off
Mark Slee19c97772008-01-10 19:57:47 +000029 * a worker pool that
Mark Sleeffcddd62006-09-06 20:37:03 +000030 *
31 * @author Mark Slee <mcslee@facebook.com>
32 */
33public class TThreadPoolServer extends TServer {
34
Mark Sleeffcddd62006-09-06 20:37:03 +000035 // Executor service for handling client connections
36 private ExecutorService executorService_;
37
Mark Slee0502e612007-11-03 05:30:32 +000038 // Flag for stopping the server
39 private volatile boolean stopped_;
40
41 // Server options
42 private Options options_;
43
Mark Sleeffcddd62006-09-06 20:37:03 +000044 // Customizable server options
Mark Slee456b7a82006-10-25 20:53:37 +000045 public static class Options {
Mark Sleeffcddd62006-09-06 20:37:03 +000046 public int minWorkerThreads = 5;
47 public int maxWorkerThreads = Integer.MAX_VALUE;
Mark Slee0502e612007-11-03 05:30:32 +000048 public int stopTimeoutVal = 60;
49 public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
Mark Sleeffcddd62006-09-06 20:37:03 +000050 }
51
52 public TThreadPoolServer(TProcessor processor,
53 TServerTransport serverTransport) {
Mark Slee19c97772008-01-10 19:57:47 +000054 this(processor, serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +000055 new TTransportFactory(), new TTransportFactory(),
Mark Slee737ce022008-01-15 02:59:12 +000056 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
Aditya Agarwal5a429582007-02-06 02:51:15 +000057 }
58
Mark Slee448849d2007-05-31 01:30:22 +000059 public TThreadPoolServer(TProcessorFactory processorFactory,
Mark Slee737ce022008-01-15 02:59:12 +000060 TServerTransport serverTransport) {
Mark Slee19c97772008-01-10 19:57:47 +000061 this(processorFactory, serverTransport,
Mark Slee448849d2007-05-31 01:30:22 +000062 new TTransportFactory(), new TTransportFactory(),
Mark Slee737ce022008-01-15 02:59:12 +000063 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
Mark Slee448849d2007-05-31 01:30:22 +000064 }
65
Aditya Agarwal5a429582007-02-06 02:51:15 +000066 public TThreadPoolServer(TProcessor processor,
67 TServerTransport serverTransport,
Mark Slee808454e2007-06-20 21:51:57 +000068 TProtocolFactory protocolFactory) {
69 this(processor, serverTransport,
70 new TTransportFactory(), new TTransportFactory(),
Mark Slee737ce022008-01-15 02:59:12 +000071 protocolFactory, protocolFactory);
Mark Slee808454e2007-06-20 21:51:57 +000072 }
73
74 public TThreadPoolServer(TProcessor processor,
75 TServerTransport serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +000076 TTransportFactory transportFactory,
77 TProtocolFactory protocolFactory) {
Mark Slee19c97772008-01-10 19:57:47 +000078 this(processor, serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +000079 transportFactory, transportFactory,
Mark Slee737ce022008-01-15 02:59:12 +000080 protocolFactory, protocolFactory);
Mark Sleeffcddd62006-09-06 20:37:03 +000081 }
Mark Slee448849d2007-05-31 01:30:22 +000082
83 public TThreadPoolServer(TProcessorFactory processorFactory,
Mark Slee737ce022008-01-15 02:59:12 +000084 TServerTransport serverTransport,
85 TTransportFactory transportFactory,
86 TProtocolFactory protocolFactory) {
Mark Slee19c97772008-01-10 19:57:47 +000087 this(processorFactory, serverTransport,
Mark Slee448849d2007-05-31 01:30:22 +000088 transportFactory, transportFactory,
Mark Slee737ce022008-01-15 02:59:12 +000089 protocolFactory, protocolFactory);
Mark Slee448849d2007-05-31 01:30:22 +000090 }
91
Mark Slee737ce022008-01-15 02:59:12 +000092 public TThreadPoolServer(TProcessor processor,
93 TServerTransport serverTransport,
94 TTransportFactory inputTransportFactory,
95 TTransportFactory outputTransportFactory,
96 TProtocolFactory inputProtocolFactory,
97 TProtocolFactory outputProtocolFactory) {
98 this(new TProcessorFactory(processor), serverTransport,
99 inputTransportFactory, outputTransportFactory,
100 inputProtocolFactory, outputProtocolFactory);
101 }
102
103 public TThreadPoolServer(TProcessorFactory processorFactory,
104 TServerTransport serverTransport,
105 TTransportFactory inputTransportFactory,
106 TTransportFactory outputTransportFactory,
107 TProtocolFactory inputProtocolFactory,
108 TProtocolFactory outputProtocolFactory) {
109 super(processorFactory, serverTransport,
110 inputTransportFactory, outputTransportFactory,
111 inputProtocolFactory, outputProtocolFactory);
112 options_ = new Options();
113 executorService_ = Executors.newCachedThreadPool();
114 }
Mark Slee19c97772008-01-10 19:57:47 +0000115
Mark Sleeffcddd62006-09-06 20:37:03 +0000116 public TThreadPoolServer(TProcessor processor,
Mark Slee448849d2007-05-31 01:30:22 +0000117 TServerTransport serverTransport,
118 TTransportFactory inputTransportFactory,
119 TTransportFactory outputTransportFactory,
120 TProtocolFactory inputProtocolFactory,
121 TProtocolFactory outputProtocolFactory,
122 Options options) {
123 this(new TProcessorFactory(processor), serverTransport,
124 inputTransportFactory, outputTransportFactory,
125 inputProtocolFactory, outputProtocolFactory,
126 options);
127 }
Mark Slee19c97772008-01-10 19:57:47 +0000128
Mark Slee448849d2007-05-31 01:30:22 +0000129 public TThreadPoolServer(TProcessorFactory processorFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +0000130 TServerTransport serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +0000131 TTransportFactory inputTransportFactory,
132 TTransportFactory outputTransportFactory,
133 TProtocolFactory inputProtocolFactory,
134 TProtocolFactory outputProtocolFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +0000135 Options options) {
Mark Slee19c97772008-01-10 19:57:47 +0000136 super(processorFactory, serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +0000137 inputTransportFactory, outputTransportFactory,
138 inputProtocolFactory, outputProtocolFactory);
Mark Slee456b7a82006-10-25 20:53:37 +0000139
Mark Sleeffcddd62006-09-06 20:37:03 +0000140 executorService_ = null;
141
Mark Slee19c97772008-01-10 19:57:47 +0000142 SynchronousQueue<Runnable> executorQueue =
143 new SynchronousQueue<Runnable>();
Mark Sleeffcddd62006-09-06 20:37:03 +0000144
145 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
146 options.maxWorkerThreads,
147 60,
148 TimeUnit.SECONDS,
149 executorQueue);
Mark Slee0502e612007-11-03 05:30:32 +0000150
151 options_ = options;
Mark Sleeffcddd62006-09-06 20:37:03 +0000152 }
153
154
Mark Slee4e755ca2006-09-12 00:46:08 +0000155 public void serve() {
Mark Sleeffcddd62006-09-06 20:37:03 +0000156 try {
157 serverTransport_.listen();
158 } catch (TTransportException ttx) {
159 ttx.printStackTrace();
160 return;
161 }
162
Mark Slee0502e612007-11-03 05:30:32 +0000163 stopped_ = false;
164 while (!stopped_) {
Mark Slee4e755ca2006-09-12 00:46:08 +0000165 int failureCount = 0;
Mark Sleeffcddd62006-09-06 20:37:03 +0000166 try {
167 TTransport client = serverTransport_.accept();
168 WorkerProcess wp = new WorkerProcess(client);
169 executorService_.execute(wp);
170 } catch (TTransportException ttx) {
Mark Slee0502e612007-11-03 05:30:32 +0000171 if (!stopped_) {
172 ++failureCount;
173 ttx.printStackTrace();
174 }
Mark Slee19c97772008-01-10 19:57:47 +0000175 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000176 }
Mark Slee0502e612007-11-03 05:30:32 +0000177
178 executorService_.shutdown();
179 try {
180 executorService_.awaitTermination(options_.stopTimeoutVal,
181 options_.stopTimeoutUnit);
182 } catch (InterruptedException ix) {
183 // Ignore and more on
184 }
185 }
186
187 public void stop() {
188 stopped_ = true;
189 serverTransport_.interrupt();
Mark Sleeffcddd62006-09-06 20:37:03 +0000190 }
191
192 private class WorkerProcess implements Runnable {
193
194 /**
195 * Client that this services.
196 */
197 private TTransport client_;
198
199 /**
200 * Default constructor.
201 *
202 * @param client Transport to process
203 */
204 private WorkerProcess(TTransport client) {
205 client_ = client;
206 }
207
208 /**
209 * Loops on processing a client forever
210 */
211 public void run() {
Mark Slee448849d2007-05-31 01:30:22 +0000212 TProcessor processor = null;
Aditya Agarwal5a429582007-02-06 02:51:15 +0000213 TTransport inputTransport = null;
214 TTransport outputTransport = null;
215 TProtocol inputProtocol = null;
216 TProtocol outputProtocol = null;
Mark Sleeffcddd62006-09-06 20:37:03 +0000217 try {
Mark Slee448849d2007-05-31 01:30:22 +0000218 processor = processorFactory_.getProcessor(client_);
Aditya Agarwal5a429582007-02-06 02:51:15 +0000219 inputTransport = inputTransportFactory_.getTransport(client_);
220 outputTransport = outputTransportFactory_.getTransport(client_);
221 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
222 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
Mark Slee448849d2007-05-31 01:30:22 +0000223 while (processor.process(inputProtocol, outputProtocol)) {}
Mark Sleeade2c832006-09-08 03:41:50 +0000224 } catch (TTransportException ttx) {
225 // Assume the client died and continue silently
Mark Sleeffcddd62006-09-06 20:37:03 +0000226 } catch (TException tx) {
227 tx.printStackTrace();
Mark Sleeade2c832006-09-08 03:41:50 +0000228 } catch (Exception x) {
229 x.printStackTrace();
Mark Sleeffcddd62006-09-06 20:37:03 +0000230 }
Mark Sleed788b2e2006-09-07 01:26:35 +0000231
Aditya Agarwal5a429582007-02-06 02:51:15 +0000232 if (inputTransport != null) {
233 inputTransport.close();
234 }
235
236 if (outputTransport != null) {
237 outputTransport.close();
Mark Sleed788b2e2006-09-07 01:26:35 +0000238 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000239 }
240 }
241}