blob: 22930d542ea9f27ded6317701be89aaa33c02809 [file] [log] [blame]
Mark Slee7eb0d632007-03-01 00:00:27 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeffcddd62006-09-06 20:37:03 +00007package com.facebook.thrift.server;
8
9import com.facebook.thrift.TException;
10import com.facebook.thrift.TProcessor;
Mark Slee448849d2007-05-31 01:30:22 +000011import com.facebook.thrift.TProcessorFactory;
Mark Slee456b7a82006-10-25 20:53:37 +000012import com.facebook.thrift.protocol.TProtocol;
13import com.facebook.thrift.protocol.TProtocolFactory;
Aditya Agarwal5a429582007-02-06 02:51:15 +000014import com.facebook.thrift.protocol.TBinaryProtocol;
Mark Sleeffcddd62006-09-06 20:37:03 +000015import com.facebook.thrift.transport.TServerTransport;
16import com.facebook.thrift.transport.TTransport;
17import com.facebook.thrift.transport.TTransportException;
Mark Sleed788b2e2006-09-07 01:26:35 +000018import com.facebook.thrift.transport.TTransportFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +000019
20import java.util.concurrent.ExecutorService;
21import java.util.concurrent.LinkedBlockingQueue;
22import java.util.concurrent.ThreadPoolExecutor;
23import java.util.concurrent.TimeUnit;
24
25
26/**
27 * Server which uses Java's built in ThreadPool management to spawn off
28 * a worker pool that
29 *
30 * @author Mark Slee <mcslee@facebook.com>
31 */
32public class TThreadPoolServer extends TServer {
33
Mark Sleeffcddd62006-09-06 20:37:03 +000034 // Executor service for handling client connections
35 private ExecutorService executorService_;
36
37 // Customizable server options
Mark Slee456b7a82006-10-25 20:53:37 +000038 public static class Options {
Mark Sleeffcddd62006-09-06 20:37:03 +000039 public int minWorkerThreads = 5;
40 public int maxWorkerThreads = Integer.MAX_VALUE;
41 }
42
43 public TThreadPoolServer(TProcessor processor,
44 TServerTransport serverTransport) {
Aditya Agarwal5a429582007-02-06 02:51:15 +000045 this(processor, serverTransport,
46 new TTransportFactory(), new TTransportFactory(),
47 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
48 new Options());
49 }
50
Mark Slee448849d2007-05-31 01:30:22 +000051 public TThreadPoolServer(TProcessorFactory processorFactory,
52 TServerTransport serverTransport) {
53 this(processorFactory, serverTransport,
54 new TTransportFactory(), new TTransportFactory(),
55 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
56 new Options());
57 }
58
Aditya Agarwal5a429582007-02-06 02:51:15 +000059 public TThreadPoolServer(TProcessor processor,
60 TServerTransport serverTransport,
61 TTransportFactory transportFactory,
62 TProtocolFactory protocolFactory) {
63 this(processor, serverTransport,
64 transportFactory, transportFactory,
65 protocolFactory, protocolFactory,
66 new Options());
Mark Sleeffcddd62006-09-06 20:37:03 +000067 }
Mark Slee448849d2007-05-31 01:30:22 +000068
69 public TThreadPoolServer(TProcessorFactory processorFactory,
70 TServerTransport serverTransport,
71 TTransportFactory transportFactory,
72 TProtocolFactory protocolFactory) {
73 this(processorFactory, serverTransport,
74 transportFactory, transportFactory,
75 protocolFactory, protocolFactory,
76 new Options());
77 }
78
Mark Sleed788b2e2006-09-07 01:26:35 +000079
Mark Sleeffcddd62006-09-06 20:37:03 +000080 public TThreadPoolServer(TProcessor processor,
Mark Slee448849d2007-05-31 01:30:22 +000081 TServerTransport serverTransport,
82 TTransportFactory inputTransportFactory,
83 TTransportFactory outputTransportFactory,
84 TProtocolFactory inputProtocolFactory,
85 TProtocolFactory outputProtocolFactory,
86 Options options) {
87 this(new TProcessorFactory(processor), serverTransport,
88 inputTransportFactory, outputTransportFactory,
89 inputProtocolFactory, outputProtocolFactory,
90 options);
91 }
92
93 public TThreadPoolServer(TProcessorFactory processorFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +000094 TServerTransport serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +000095 TTransportFactory inputTransportFactory,
96 TTransportFactory outputTransportFactory,
97 TProtocolFactory inputProtocolFactory,
98 TProtocolFactory outputProtocolFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +000099 Options options) {
Mark Slee448849d2007-05-31 01:30:22 +0000100 super(processorFactory, serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +0000101 inputTransportFactory, outputTransportFactory,
102 inputProtocolFactory, outputProtocolFactory);
Mark Slee456b7a82006-10-25 20:53:37 +0000103
Mark Sleeffcddd62006-09-06 20:37:03 +0000104 executorService_ = null;
105
106 LinkedBlockingQueue<Runnable> executorQueue =
107 new LinkedBlockingQueue<Runnable>();
108
109 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
110 options.maxWorkerThreads,
111 60,
112 TimeUnit.SECONDS,
113 executorQueue);
114 }
115
116
Mark Slee4e755ca2006-09-12 00:46:08 +0000117 public void serve() {
Mark Sleeffcddd62006-09-06 20:37:03 +0000118 try {
119 serverTransport_.listen();
120 } catch (TTransportException ttx) {
121 ttx.printStackTrace();
122 return;
123 }
124
125 while (true) {
Mark Slee4e755ca2006-09-12 00:46:08 +0000126 int failureCount = 0;
Mark Sleeffcddd62006-09-06 20:37:03 +0000127 try {
128 TTransport client = serverTransport_.accept();
129 WorkerProcess wp = new WorkerProcess(client);
130 executorService_.execute(wp);
131 } catch (TTransportException ttx) {
Mark Slee4e755ca2006-09-12 00:46:08 +0000132 ++failureCount;
Mark Sleeffcddd62006-09-06 20:37:03 +0000133 ttx.printStackTrace();
Mark Slee4e755ca2006-09-12 00:46:08 +0000134 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000135 }
136 }
137
138 private class WorkerProcess implements Runnable {
139
140 /**
141 * Client that this services.
142 */
143 private TTransport client_;
144
145 /**
146 * Default constructor.
147 *
148 * @param client Transport to process
149 */
150 private WorkerProcess(TTransport client) {
151 client_ = client;
152 }
153
154 /**
155 * Loops on processing a client forever
156 */
157 public void run() {
Mark Slee448849d2007-05-31 01:30:22 +0000158 TProcessor processor = null;
Aditya Agarwal5a429582007-02-06 02:51:15 +0000159 TTransport inputTransport = null;
160 TTransport outputTransport = null;
161 TProtocol inputProtocol = null;
162 TProtocol outputProtocol = null;
Mark Sleeffcddd62006-09-06 20:37:03 +0000163 try {
Mark Slee448849d2007-05-31 01:30:22 +0000164 processor = processorFactory_.getProcessor(client_);
Aditya Agarwal5a429582007-02-06 02:51:15 +0000165 inputTransport = inputTransportFactory_.getTransport(client_);
166 outputTransport = outputTransportFactory_.getTransport(client_);
167 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
168 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
Mark Slee448849d2007-05-31 01:30:22 +0000169 while (processor.process(inputProtocol, outputProtocol)) {}
Mark Sleeade2c832006-09-08 03:41:50 +0000170 } catch (TTransportException ttx) {
171 // Assume the client died and continue silently
Mark Sleeffcddd62006-09-06 20:37:03 +0000172 } catch (TException tx) {
173 tx.printStackTrace();
Mark Sleeade2c832006-09-08 03:41:50 +0000174 } catch (Exception x) {
175 x.printStackTrace();
Mark Sleeffcddd62006-09-06 20:37:03 +0000176 }
Mark Sleed788b2e2006-09-07 01:26:35 +0000177
Aditya Agarwal5a429582007-02-06 02:51:15 +0000178 if (inputTransport != null) {
179 inputTransport.close();
180 }
181
182 if (outputTransport != null) {
183 outputTransport.close();
Mark Sleed788b2e2006-09-07 01:26:35 +0000184 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000185 }
186 }
187}