blob: 0945fbe3a8d6066262ceb804261187872e665861 [file] [log] [blame]
Mark Slee7eb0d632007-03-01 00:00:27 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeffcddd62006-09-06 20:37:03 +00007package com.facebook.thrift.server;
8
9import com.facebook.thrift.TException;
10import com.facebook.thrift.TProcessor;
Mark Slee448849d2007-05-31 01:30:22 +000011import com.facebook.thrift.TProcessorFactory;
Mark Slee456b7a82006-10-25 20:53:37 +000012import com.facebook.thrift.protocol.TProtocol;
13import com.facebook.thrift.protocol.TProtocolFactory;
Aditya Agarwal5a429582007-02-06 02:51:15 +000014import com.facebook.thrift.protocol.TBinaryProtocol;
Mark Sleeffcddd62006-09-06 20:37:03 +000015import com.facebook.thrift.transport.TServerTransport;
16import com.facebook.thrift.transport.TTransport;
17import com.facebook.thrift.transport.TTransportException;
Mark Sleed788b2e2006-09-07 01:26:35 +000018import com.facebook.thrift.transport.TTransportFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +000019
20import java.util.concurrent.ExecutorService;
21import java.util.concurrent.LinkedBlockingQueue;
22import java.util.concurrent.ThreadPoolExecutor;
23import java.util.concurrent.TimeUnit;
24
25
26/**
27 * Server which uses Java's built in ThreadPool management to spawn off
28 * a worker pool that
29 *
30 * @author Mark Slee <mcslee@facebook.com>
31 */
32public class TThreadPoolServer extends TServer {
33
Mark Sleeffcddd62006-09-06 20:37:03 +000034 // Executor service for handling client connections
35 private ExecutorService executorService_;
36
37 // Customizable server options
Mark Slee456b7a82006-10-25 20:53:37 +000038 public static class Options {
Mark Sleeffcddd62006-09-06 20:37:03 +000039 public int minWorkerThreads = 5;
40 public int maxWorkerThreads = Integer.MAX_VALUE;
41 }
42
43 public TThreadPoolServer(TProcessor processor,
44 TServerTransport serverTransport) {
Aditya Agarwal5a429582007-02-06 02:51:15 +000045 this(processor, serverTransport,
46 new TTransportFactory(), new TTransportFactory(),
47 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
48 new Options());
49 }
50
Mark Slee448849d2007-05-31 01:30:22 +000051 public TThreadPoolServer(TProcessorFactory processorFactory,
52 TServerTransport serverTransport) {
53 this(processorFactory, serverTransport,
54 new TTransportFactory(), new TTransportFactory(),
55 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
56 new Options());
57 }
58
Aditya Agarwal5a429582007-02-06 02:51:15 +000059 public TThreadPoolServer(TProcessor processor,
60 TServerTransport serverTransport,
Mark Slee808454e2007-06-20 21:51:57 +000061 TProtocolFactory protocolFactory) {
62 this(processor, serverTransport,
63 new TTransportFactory(), new TTransportFactory(),
64 protocolFactory, protocolFactory,
65 new Options());
66 }
67
68 public TThreadPoolServer(TProcessor processor,
69 TServerTransport serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +000070 TTransportFactory transportFactory,
71 TProtocolFactory protocolFactory) {
72 this(processor, serverTransport,
73 transportFactory, transportFactory,
74 protocolFactory, protocolFactory,
75 new Options());
Mark Sleeffcddd62006-09-06 20:37:03 +000076 }
Mark Slee448849d2007-05-31 01:30:22 +000077
78 public TThreadPoolServer(TProcessorFactory processorFactory,
79 TServerTransport serverTransport,
80 TTransportFactory transportFactory,
81 TProtocolFactory protocolFactory) {
82 this(processorFactory, serverTransport,
83 transportFactory, transportFactory,
84 protocolFactory, protocolFactory,
85 new Options());
86 }
87
Mark Sleed788b2e2006-09-07 01:26:35 +000088
Mark Sleeffcddd62006-09-06 20:37:03 +000089 public TThreadPoolServer(TProcessor processor,
Mark Slee448849d2007-05-31 01:30:22 +000090 TServerTransport serverTransport,
91 TTransportFactory inputTransportFactory,
92 TTransportFactory outputTransportFactory,
93 TProtocolFactory inputProtocolFactory,
94 TProtocolFactory outputProtocolFactory,
95 Options options) {
96 this(new TProcessorFactory(processor), serverTransport,
97 inputTransportFactory, outputTransportFactory,
98 inputProtocolFactory, outputProtocolFactory,
99 options);
100 }
101
102 public TThreadPoolServer(TProcessorFactory processorFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +0000103 TServerTransport serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +0000104 TTransportFactory inputTransportFactory,
105 TTransportFactory outputTransportFactory,
106 TProtocolFactory inputProtocolFactory,
107 TProtocolFactory outputProtocolFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +0000108 Options options) {
Mark Slee448849d2007-05-31 01:30:22 +0000109 super(processorFactory, serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +0000110 inputTransportFactory, outputTransportFactory,
111 inputProtocolFactory, outputProtocolFactory);
Mark Slee456b7a82006-10-25 20:53:37 +0000112
Mark Sleeffcddd62006-09-06 20:37:03 +0000113 executorService_ = null;
114
115 LinkedBlockingQueue<Runnable> executorQueue =
116 new LinkedBlockingQueue<Runnable>();
117
118 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
119 options.maxWorkerThreads,
120 60,
121 TimeUnit.SECONDS,
122 executorQueue);
123 }
124
125
Mark Slee4e755ca2006-09-12 00:46:08 +0000126 public void serve() {
Mark Sleeffcddd62006-09-06 20:37:03 +0000127 try {
128 serverTransport_.listen();
129 } catch (TTransportException ttx) {
130 ttx.printStackTrace();
131 return;
132 }
133
134 while (true) {
Mark Slee4e755ca2006-09-12 00:46:08 +0000135 int failureCount = 0;
Mark Sleeffcddd62006-09-06 20:37:03 +0000136 try {
137 TTransport client = serverTransport_.accept();
138 WorkerProcess wp = new WorkerProcess(client);
139 executorService_.execute(wp);
140 } catch (TTransportException ttx) {
Mark Slee4e755ca2006-09-12 00:46:08 +0000141 ++failureCount;
Mark Sleeffcddd62006-09-06 20:37:03 +0000142 ttx.printStackTrace();
Mark Slee4e755ca2006-09-12 00:46:08 +0000143 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000144 }
145 }
146
147 private class WorkerProcess implements Runnable {
148
149 /**
150 * Client that this services.
151 */
152 private TTransport client_;
153
154 /**
155 * Default constructor.
156 *
157 * @param client Transport to process
158 */
159 private WorkerProcess(TTransport client) {
160 client_ = client;
161 }
162
163 /**
164 * Loops on processing a client forever
165 */
166 public void run() {
Mark Slee448849d2007-05-31 01:30:22 +0000167 TProcessor processor = null;
Aditya Agarwal5a429582007-02-06 02:51:15 +0000168 TTransport inputTransport = null;
169 TTransport outputTransport = null;
170 TProtocol inputProtocol = null;
171 TProtocol outputProtocol = null;
Mark Sleeffcddd62006-09-06 20:37:03 +0000172 try {
Mark Slee448849d2007-05-31 01:30:22 +0000173 processor = processorFactory_.getProcessor(client_);
Aditya Agarwal5a429582007-02-06 02:51:15 +0000174 inputTransport = inputTransportFactory_.getTransport(client_);
175 outputTransport = outputTransportFactory_.getTransport(client_);
176 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
177 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
Mark Slee448849d2007-05-31 01:30:22 +0000178 while (processor.process(inputProtocol, outputProtocol)) {}
Mark Sleeade2c832006-09-08 03:41:50 +0000179 } catch (TTransportException ttx) {
180 // Assume the client died and continue silently
Mark Sleeffcddd62006-09-06 20:37:03 +0000181 } catch (TException tx) {
182 tx.printStackTrace();
Mark Sleeade2c832006-09-08 03:41:50 +0000183 } catch (Exception x) {
184 x.printStackTrace();
Mark Sleeffcddd62006-09-06 20:37:03 +0000185 }
Mark Sleed788b2e2006-09-07 01:26:35 +0000186
Aditya Agarwal5a429582007-02-06 02:51:15 +0000187 if (inputTransport != null) {
188 inputTransport.close();
189 }
190
191 if (outputTransport != null) {
192 outputTransport.close();
Mark Sleed788b2e2006-09-07 01:26:35 +0000193 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000194 }
195 }
196}