blob: 9eaf9b8d51a122db4eb571e89607561a6c4b01f1 [file] [log] [blame]
Mark Slee7eb0d632007-03-01 00:00:27 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Mark Sleeffcddd62006-09-06 20:37:03 +00007package com.facebook.thrift.server;
8
9import com.facebook.thrift.TException;
10import com.facebook.thrift.TProcessor;
Mark Slee456b7a82006-10-25 20:53:37 +000011import com.facebook.thrift.protocol.TProtocol;
12import com.facebook.thrift.protocol.TProtocolFactory;
Aditya Agarwal5a429582007-02-06 02:51:15 +000013import com.facebook.thrift.protocol.TBinaryProtocol;
Mark Sleeffcddd62006-09-06 20:37:03 +000014import com.facebook.thrift.transport.TServerTransport;
15import com.facebook.thrift.transport.TTransport;
16import com.facebook.thrift.transport.TTransportException;
Mark Sleed788b2e2006-09-07 01:26:35 +000017import com.facebook.thrift.transport.TTransportFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +000018
19import java.util.concurrent.ExecutorService;
20import java.util.concurrent.LinkedBlockingQueue;
21import java.util.concurrent.ThreadPoolExecutor;
22import java.util.concurrent.TimeUnit;
23
24
25/**
26 * Server which uses Java's built in ThreadPool management to spawn off
27 * a worker pool that
28 *
29 * @author Mark Slee <mcslee@facebook.com>
30 */
31public class TThreadPoolServer extends TServer {
32
Mark Sleeffcddd62006-09-06 20:37:03 +000033 // Executor service for handling client connections
34 private ExecutorService executorService_;
35
36 // Customizable server options
Mark Slee456b7a82006-10-25 20:53:37 +000037 public static class Options {
Mark Sleeffcddd62006-09-06 20:37:03 +000038 public int minWorkerThreads = 5;
39 public int maxWorkerThreads = Integer.MAX_VALUE;
40 }
41
42 public TThreadPoolServer(TProcessor processor,
43 TServerTransport serverTransport) {
Aditya Agarwal5a429582007-02-06 02:51:15 +000044 this(processor, serverTransport,
45 new TTransportFactory(), new TTransportFactory(),
46 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
47 new Options());
48 }
49
50 public TThreadPoolServer(TProcessor processor,
51 TServerTransport serverTransport,
52 TTransportFactory transportFactory,
53 TProtocolFactory protocolFactory) {
54 this(processor, serverTransport,
55 transportFactory, transportFactory,
56 protocolFactory, protocolFactory,
57 new Options());
Mark Sleeffcddd62006-09-06 20:37:03 +000058 }
Mark Sleed788b2e2006-09-07 01:26:35 +000059
Mark Sleeffcddd62006-09-06 20:37:03 +000060 public TThreadPoolServer(TProcessor processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000061 TServerTransport serverTransport,
Aditya Agarwal5a429582007-02-06 02:51:15 +000062 TTransportFactory inputTransportFactory,
63 TTransportFactory outputTransportFactory,
64 TProtocolFactory inputProtocolFactory,
65 TProtocolFactory outputProtocolFactory,
Mark Sleed788b2e2006-09-07 01:26:35 +000066 Options options) {
Aditya Agarwal5a429582007-02-06 02:51:15 +000067 super(processor, serverTransport,
68 inputTransportFactory, outputTransportFactory,
69 inputProtocolFactory, outputProtocolFactory);
Mark Slee456b7a82006-10-25 20:53:37 +000070
Mark Sleeffcddd62006-09-06 20:37:03 +000071 executorService_ = null;
72
73 LinkedBlockingQueue<Runnable> executorQueue =
74 new LinkedBlockingQueue<Runnable>();
75
76 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
77 options.maxWorkerThreads,
78 60,
79 TimeUnit.SECONDS,
80 executorQueue);
81 }
82
83
Mark Slee4e755ca2006-09-12 00:46:08 +000084 public void serve() {
Mark Sleeffcddd62006-09-06 20:37:03 +000085 try {
86 serverTransport_.listen();
87 } catch (TTransportException ttx) {
88 ttx.printStackTrace();
89 return;
90 }
91
92 while (true) {
Mark Slee4e755ca2006-09-12 00:46:08 +000093 int failureCount = 0;
Mark Sleeffcddd62006-09-06 20:37:03 +000094 try {
95 TTransport client = serverTransport_.accept();
96 WorkerProcess wp = new WorkerProcess(client);
97 executorService_.execute(wp);
98 } catch (TTransportException ttx) {
Mark Slee4e755ca2006-09-12 00:46:08 +000099 ++failureCount;
Mark Sleeffcddd62006-09-06 20:37:03 +0000100 ttx.printStackTrace();
Mark Slee4e755ca2006-09-12 00:46:08 +0000101 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000102 }
103 }
104
105 private class WorkerProcess implements Runnable {
106
107 /**
108 * Client that this services.
109 */
110 private TTransport client_;
111
112 /**
113 * Default constructor.
114 *
115 * @param client Transport to process
116 */
117 private WorkerProcess(TTransport client) {
118 client_ = client;
119 }
120
121 /**
122 * Loops on processing a client forever
123 */
124 public void run() {
Aditya Agarwal5a429582007-02-06 02:51:15 +0000125 TTransport inputTransport = null;
126 TTransport outputTransport = null;
127 TProtocol inputProtocol = null;
128 TProtocol outputProtocol = null;
Mark Sleeffcddd62006-09-06 20:37:03 +0000129 try {
Aditya Agarwal5a429582007-02-06 02:51:15 +0000130 inputTransport = inputTransportFactory_.getTransport(client_);
131 outputTransport = outputTransportFactory_.getTransport(client_);
132 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
133 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
134 while (processor_.process(inputProtocol, outputProtocol)) {}
Mark Sleeade2c832006-09-08 03:41:50 +0000135 } catch (TTransportException ttx) {
136 // Assume the client died and continue silently
Mark Sleeffcddd62006-09-06 20:37:03 +0000137 } catch (TException tx) {
138 tx.printStackTrace();
Mark Sleeade2c832006-09-08 03:41:50 +0000139 } catch (Exception x) {
140 x.printStackTrace();
Mark Sleeffcddd62006-09-06 20:37:03 +0000141 }
Mark Sleed788b2e2006-09-07 01:26:35 +0000142
Aditya Agarwal5a429582007-02-06 02:51:15 +0000143 if (inputTransport != null) {
144 inputTransport.close();
145 }
146
147 if (outputTransport != null) {
148 outputTransport.close();
Mark Sleed788b2e2006-09-07 01:26:35 +0000149 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000150 }
151 }
152}