blob: 560a3cc2fc6c2f2e3e9f7f362ea80a7bf80fff9b [file] [log] [blame]
Mark Sleeffcddd62006-09-06 20:37:03 +00001package com.facebook.thrift.server;
2
3import com.facebook.thrift.TException;
4import com.facebook.thrift.TProcessor;
Mark Slee456b7a82006-10-25 20:53:37 +00005import com.facebook.thrift.protocol.TProtocol;
6import com.facebook.thrift.protocol.TProtocolFactory;
Aditya Agarwal5a429582007-02-06 02:51:15 +00007import com.facebook.thrift.protocol.TBinaryProtocol;
Mark Sleeffcddd62006-09-06 20:37:03 +00008import com.facebook.thrift.transport.TServerTransport;
9import com.facebook.thrift.transport.TTransport;
10import com.facebook.thrift.transport.TTransportException;
Mark Sleed788b2e2006-09-07 01:26:35 +000011import com.facebook.thrift.transport.TTransportFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +000012
13import java.util.concurrent.ExecutorService;
14import java.util.concurrent.LinkedBlockingQueue;
15import java.util.concurrent.ThreadPoolExecutor;
16import java.util.concurrent.TimeUnit;
17
18
19/**
20 * Server which uses Java's built in ThreadPool management to spawn off
21 * a worker pool that
22 *
23 * @author Mark Slee <mcslee@facebook.com>
24 */
25public class TThreadPoolServer extends TServer {
26
Mark Sleeffcddd62006-09-06 20:37:03 +000027 // Executor service for handling client connections
28 private ExecutorService executorService_;
29
30 // Customizable server options
Mark Slee456b7a82006-10-25 20:53:37 +000031 public static class Options {
Mark Sleeffcddd62006-09-06 20:37:03 +000032 public int minWorkerThreads = 5;
33 public int maxWorkerThreads = Integer.MAX_VALUE;
34 }
35
36 public TThreadPoolServer(TProcessor processor,
37 TServerTransport serverTransport) {
Aditya Agarwal5a429582007-02-06 02:51:15 +000038 this(processor, serverTransport,
39 new TTransportFactory(), new TTransportFactory(),
40 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
41 new Options());
42 }
43
44 public TThreadPoolServer(TProcessor processor,
45 TServerTransport serverTransport,
46 TTransportFactory transportFactory,
47 TProtocolFactory protocolFactory) {
48 this(processor, serverTransport,
49 transportFactory, transportFactory,
50 protocolFactory, protocolFactory,
51 new Options());
Mark Sleeffcddd62006-09-06 20:37:03 +000052 }
Mark Sleed788b2e2006-09-07 01:26:35 +000053
Mark Sleeffcddd62006-09-06 20:37:03 +000054 public TThreadPoolServer(TProcessor processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000055 TServerTransport serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +000056 TTransportFactory inputTransportFactory,
57 TTransportFactory outputTransportFactory,
58 TProtocolFactory inputProtocolFactory,
59 TProtocolFactory outputProtocolFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +000060 Options options) {
Aditya Agarwal5a429582007-02-06 02:51:15 +000061 super(processor, serverTransport,
62 inputTransportFactory, outputTransportFactory,
63 inputProtocolFactory, outputProtocolFactory);
Mark Slee456b7a82006-10-25 20:53:37 +000064
Mark Sleeffcddd62006-09-06 20:37:03 +000065 executorService_ = null;
66
67 LinkedBlockingQueue<Runnable> executorQueue =
68 new LinkedBlockingQueue<Runnable>();
69
70 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
71 options.maxWorkerThreads,
72 60,
73 TimeUnit.SECONDS,
74 executorQueue);
75 }
76
77
Mark Slee4e755ca2006-09-12 00:46:08 +000078 public void serve() {
Mark Sleeffcddd62006-09-06 20:37:03 +000079 try {
80 serverTransport_.listen();
81 } catch (TTransportException ttx) {
82 ttx.printStackTrace();
83 return;
84 }
85
86 while (true) {
Mark Slee4e755ca2006-09-12 00:46:08 +000087 int failureCount = 0;
Mark Sleeffcddd62006-09-06 20:37:03 +000088 try {
89 TTransport client = serverTransport_.accept();
90 WorkerProcess wp = new WorkerProcess(client);
91 executorService_.execute(wp);
92 } catch (TTransportException ttx) {
Mark Slee4e755ca2006-09-12 00:46:08 +000093 ++failureCount;
Mark Sleeffcddd62006-09-06 20:37:03 +000094 ttx.printStackTrace();
Mark Slee4e755ca2006-09-12 00:46:08 +000095 }
Mark Sleeffcddd62006-09-06 20:37:03 +000096 }
97 }
98
99 private class WorkerProcess implements Runnable {
100
101 /**
102 * Client that this services.
103 */
104 private TTransport client_;
105
106 /**
107 * Default constructor.
108 *
109 * @param client Transport to process
110 */
111 private WorkerProcess(TTransport client) {
112 client_ = client;
113 }
114
115 /**
116 * Loops on processing a client forever
117 */
118 public void run() {
Aditya Agarwal5a429582007-02-06 02:51:15 +0000119 TTransport inputTransport = null;
120 TTransport outputTransport = null;
121 TProtocol inputProtocol = null;
122 TProtocol outputProtocol = null;
Mark Sleeffcddd62006-09-06 20:37:03 +0000123 try {
Aditya Agarwal5a429582007-02-06 02:51:15 +0000124 inputTransport = inputTransportFactory_.getTransport(client_);
125 outputTransport = outputTransportFactory_.getTransport(client_);
126 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
127 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
128 while (processor_.process(inputProtocol, outputProtocol)) {}
Mark Sleeade2c832006-09-08 03:41:50 +0000129 } catch (TTransportException ttx) {
130 // Assume the client died and continue silently
Mark Sleeffcddd62006-09-06 20:37:03 +0000131 } catch (TException tx) {
132 tx.printStackTrace();
Mark Sleeade2c832006-09-08 03:41:50 +0000133 } catch (Exception x) {
134 x.printStackTrace();
Mark Sleeffcddd62006-09-06 20:37:03 +0000135 }
Mark Sleed788b2e2006-09-07 01:26:35 +0000136
Aditya Agarwal5a429582007-02-06 02:51:15 +0000137 if (inputTransport != null) {
138 inputTransport.close();
139 }
140
141 if (outputTransport != null) {
142 outputTransport.close();
Mark Sleed788b2e2006-09-07 01:26:35 +0000143 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000144 }
145 }
146}