blob: 94eb5a6b8ab816ac016c1b485d16f98100b40bd2 [file] [log] [blame]
Mark Slee7eb0d632007-03-01 00:00:27 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeffcddd62006-09-06 20:37:03 +00007package com.facebook.thrift.server;
8
9import com.facebook.thrift.TException;
10import com.facebook.thrift.TProcessor;
Mark Slee448849d2007-05-31 01:30:22 +000011import com.facebook.thrift.TProcessorFactory;
Mark Slee456b7a82006-10-25 20:53:37 +000012import com.facebook.thrift.protocol.TProtocol;
13import com.facebook.thrift.protocol.TProtocolFactory;
Aditya Agarwal5a429582007-02-06 02:51:15 +000014import com.facebook.thrift.protocol.TBinaryProtocol;
Mark Sleeffcddd62006-09-06 20:37:03 +000015import com.facebook.thrift.transport.TServerTransport;
16import com.facebook.thrift.transport.TTransport;
17import com.facebook.thrift.transport.TTransportException;
Mark Sleed788b2e2006-09-07 01:26:35 +000018import com.facebook.thrift.transport.TTransportFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +000019
20import java.util.concurrent.ExecutorService;
21import java.util.concurrent.LinkedBlockingQueue;
22import java.util.concurrent.ThreadPoolExecutor;
23import java.util.concurrent.TimeUnit;
24
25
26/**
27 * Server which uses Java's built in ThreadPool management to spawn off
28 * a worker pool that
29 *
30 * @author Mark Slee <mcslee@facebook.com>
31 */
32public class TThreadPoolServer extends TServer {
33
Mark Sleeffcddd62006-09-06 20:37:03 +000034 // Executor service for handling client connections
35 private ExecutorService executorService_;
36
Mark Slee0502e612007-11-03 05:30:32 +000037 // Flag for stopping the server
38 private volatile boolean stopped_;
39
40 // Server options
41 private Options options_;
42
Mark Sleeffcddd62006-09-06 20:37:03 +000043 // Customizable server options
Mark Slee456b7a82006-10-25 20:53:37 +000044 public static class Options {
Mark Sleeffcddd62006-09-06 20:37:03 +000045 public int minWorkerThreads = 5;
46 public int maxWorkerThreads = Integer.MAX_VALUE;
Mark Slee0502e612007-11-03 05:30:32 +000047 public int stopTimeoutVal = 60;
48 public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
Mark Sleeffcddd62006-09-06 20:37:03 +000049 }
50
51 public TThreadPoolServer(TProcessor processor,
52 TServerTransport serverTransport) {
Aditya Agarwal5a429582007-02-06 02:51:15 +000053 this(processor, serverTransport,
54 new TTransportFactory(), new TTransportFactory(),
55 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
56 new Options());
57 }
58
Mark Slee448849d2007-05-31 01:30:22 +000059 public TThreadPoolServer(TProcessorFactory processorFactory,
60 TServerTransport serverTransport) {
61 this(processorFactory, serverTransport,
62 new TTransportFactory(), new TTransportFactory(),
63 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
64 new Options());
65 }
66
Aditya Agarwal5a429582007-02-06 02:51:15 +000067 public TThreadPoolServer(TProcessor processor,
68 TServerTransport serverTransport,
Mark Slee808454e2007-06-20 21:51:57 +000069 TProtocolFactory protocolFactory) {
70 this(processor, serverTransport,
71 new TTransportFactory(), new TTransportFactory(),
72 protocolFactory, protocolFactory,
73 new Options());
74 }
75
76 public TThreadPoolServer(TProcessor processor,
77 TServerTransport serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +000078 TTransportFactory transportFactory,
79 TProtocolFactory protocolFactory) {
80 this(processor, serverTransport,
81 transportFactory, transportFactory,
82 protocolFactory, protocolFactory,
83 new Options());
Mark Sleeffcddd62006-09-06 20:37:03 +000084 }
Mark Slee448849d2007-05-31 01:30:22 +000085
86 public TThreadPoolServer(TProcessorFactory processorFactory,
87 TServerTransport serverTransport,
88 TTransportFactory transportFactory,
89 TProtocolFactory protocolFactory) {
90 this(processorFactory, serverTransport,
91 transportFactory, transportFactory,
92 protocolFactory, protocolFactory,
93 new Options());
94 }
95
Mark Sleed788b2e2006-09-07 01:26:35 +000096
Mark Sleeffcddd62006-09-06 20:37:03 +000097 public TThreadPoolServer(TProcessor processor,
Mark Slee448849d2007-05-31 01:30:22 +000098 TServerTransport serverTransport,
99 TTransportFactory inputTransportFactory,
100 TTransportFactory outputTransportFactory,
101 TProtocolFactory inputProtocolFactory,
102 TProtocolFactory outputProtocolFactory,
103 Options options) {
104 this(new TProcessorFactory(processor), serverTransport,
105 inputTransportFactory, outputTransportFactory,
106 inputProtocolFactory, outputProtocolFactory,
107 options);
108 }
109
110 public TThreadPoolServer(TProcessorFactory processorFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +0000111 TServerTransport serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +0000112 TTransportFactory inputTransportFactory,
113 TTransportFactory outputTransportFactory,
114 TProtocolFactory inputProtocolFactory,
115 TProtocolFactory outputProtocolFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +0000116 Options options) {
Mark Slee448849d2007-05-31 01:30:22 +0000117 super(processorFactory, serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +0000118 inputTransportFactory, outputTransportFactory,
119 inputProtocolFactory, outputProtocolFactory);
Mark Slee456b7a82006-10-25 20:53:37 +0000120
Mark Sleeffcddd62006-09-06 20:37:03 +0000121 executorService_ = null;
122
123 LinkedBlockingQueue<Runnable> executorQueue =
124 new LinkedBlockingQueue<Runnable>();
125
126 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
127 options.maxWorkerThreads,
128 60,
129 TimeUnit.SECONDS,
130 executorQueue);
Mark Slee0502e612007-11-03 05:30:32 +0000131
132 options_ = options;
Mark Sleeffcddd62006-09-06 20:37:03 +0000133 }
134
135
Mark Slee4e755ca2006-09-12 00:46:08 +0000136 public void serve() {
Mark Sleeffcddd62006-09-06 20:37:03 +0000137 try {
138 serverTransport_.listen();
139 } catch (TTransportException ttx) {
140 ttx.printStackTrace();
141 return;
142 }
143
Mark Slee0502e612007-11-03 05:30:32 +0000144 stopped_ = false;
145 while (!stopped_) {
Mark Slee4e755ca2006-09-12 00:46:08 +0000146 int failureCount = 0;
Mark Sleeffcddd62006-09-06 20:37:03 +0000147 try {
148 TTransport client = serverTransport_.accept();
149 WorkerProcess wp = new WorkerProcess(client);
150 executorService_.execute(wp);
151 } catch (TTransportException ttx) {
Mark Slee0502e612007-11-03 05:30:32 +0000152 if (!stopped_) {
153 ++failureCount;
154 ttx.printStackTrace();
155 }
Mark Slee4e755ca2006-09-12 00:46:08 +0000156 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000157 }
Mark Slee0502e612007-11-03 05:30:32 +0000158
159 executorService_.shutdown();
160 try {
161 executorService_.awaitTermination(options_.stopTimeoutVal,
162 options_.stopTimeoutUnit);
163 } catch (InterruptedException ix) {
164 // Ignore and more on
165 }
166 }
167
168 public void stop() {
169 stopped_ = true;
170 serverTransport_.interrupt();
Mark Sleeffcddd62006-09-06 20:37:03 +0000171 }
172
173 private class WorkerProcess implements Runnable {
174
175 /**
176 * Client that this services.
177 */
178 private TTransport client_;
179
180 /**
181 * Default constructor.
182 *
183 * @param client Transport to process
184 */
185 private WorkerProcess(TTransport client) {
186 client_ = client;
187 }
188
189 /**
190 * Loops on processing a client forever
191 */
192 public void run() {
Mark Slee448849d2007-05-31 01:30:22 +0000193 TProcessor processor = null;
Aditya Agarwal5a429582007-02-06 02:51:15 +0000194 TTransport inputTransport = null;
195 TTransport outputTransport = null;
196 TProtocol inputProtocol = null;
197 TProtocol outputProtocol = null;
Mark Sleeffcddd62006-09-06 20:37:03 +0000198 try {
Mark Slee448849d2007-05-31 01:30:22 +0000199 processor = processorFactory_.getProcessor(client_);
Aditya Agarwal5a429582007-02-06 02:51:15 +0000200 inputTransport = inputTransportFactory_.getTransport(client_);
201 outputTransport = outputTransportFactory_.getTransport(client_);
202 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
203 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
Mark Slee448849d2007-05-31 01:30:22 +0000204 while (processor.process(inputProtocol, outputProtocol)) {}
Mark Sleeade2c832006-09-08 03:41:50 +0000205 } catch (TTransportException ttx) {
206 // Assume the client died and continue silently
Mark Sleeffcddd62006-09-06 20:37:03 +0000207 } catch (TException tx) {
208 tx.printStackTrace();
Mark Sleeade2c832006-09-08 03:41:50 +0000209 } catch (Exception x) {
210 x.printStackTrace();
Mark Sleeffcddd62006-09-06 20:37:03 +0000211 }
Mark Sleed788b2e2006-09-07 01:26:35 +0000212
Aditya Agarwal5a429582007-02-06 02:51:15 +0000213 if (inputTransport != null) {
214 inputTransport.close();
215 }
216
217 if (outputTransport != null) {
218 outputTransport.close();
Mark Sleed788b2e2006-09-07 01:26:35 +0000219 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000220 }
221 }
222}