blob: c63d1e185057ec01b75a53a0599c023dbd57e8e7 [file] [log] [blame]
Mark Sleeffcddd62006-09-06 20:37:03 +00001package com.facebook.thrift.server;
2
3import com.facebook.thrift.TException;
4import com.facebook.thrift.TProcessor;
5import com.facebook.thrift.transport.TServerTransport;
6import com.facebook.thrift.transport.TTransport;
7import com.facebook.thrift.transport.TTransportException;
Mark Sleed788b2e2006-09-07 01:26:35 +00008import com.facebook.thrift.transport.TTransportFactory;
9import com.facebook.thrift.transport.TBaseTransportFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +000010
11import java.util.concurrent.ExecutorService;
12import java.util.concurrent.LinkedBlockingQueue;
13import java.util.concurrent.ThreadPoolExecutor;
14import java.util.concurrent.TimeUnit;
15
16
17/**
18 * Server which uses Java's built in ThreadPool management to spawn off
19 * a worker pool that
20 *
21 * @author Mark Slee <mcslee@facebook.com>
22 */
23public class TThreadPoolServer extends TServer {
24
Mark Sleeffcddd62006-09-06 20:37:03 +000025 // Executor service for handling client connections
26 private ExecutorService executorService_;
27
28 // Customizable server options
29 public static class Options extends TServer.Options {
Mark Sleeffcddd62006-09-06 20:37:03 +000030 public int minWorkerThreads = 5;
31 public int maxWorkerThreads = Integer.MAX_VALUE;
32 }
33
34 public TThreadPoolServer(TProcessor processor,
35 TServerTransport serverTransport) {
Mark Sleed788b2e2006-09-07 01:26:35 +000036 this(processor,
37 serverTransport,
38 new TBaseTransportFactory(),
39 new Options());
Mark Sleeffcddd62006-09-06 20:37:03 +000040 }
Mark Sleed788b2e2006-09-07 01:26:35 +000041
Mark Sleeffcddd62006-09-06 20:37:03 +000042 public TThreadPoolServer(TProcessor processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000043 TServerTransport serverTransport,
44 TTransportFactory transportFactory,
45 Options options) {
46 super(processor, serverTransport, transportFactory, options);
Mark Sleeffcddd62006-09-06 20:37:03 +000047 serverTransport_ = serverTransport;
48 executorService_ = null;
49
50 LinkedBlockingQueue<Runnable> executorQueue =
51 new LinkedBlockingQueue<Runnable>();
52
53 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
54 options.maxWorkerThreads,
55 60,
56 TimeUnit.SECONDS,
57 executorQueue);
58 }
59
60
Mark Slee4e755ca2006-09-12 00:46:08 +000061 public void serve() {
Mark Sleeffcddd62006-09-06 20:37:03 +000062 try {
63 serverTransport_.listen();
64 } catch (TTransportException ttx) {
65 ttx.printStackTrace();
66 return;
67 }
68
69 while (true) {
Mark Slee4e755ca2006-09-12 00:46:08 +000070 int failureCount = 0;
Mark Sleeffcddd62006-09-06 20:37:03 +000071 try {
72 TTransport client = serverTransport_.accept();
73 WorkerProcess wp = new WorkerProcess(client);
74 executorService_.execute(wp);
75 } catch (TTransportException ttx) {
Mark Slee4e755ca2006-09-12 00:46:08 +000076 ++failureCount;
Mark Sleeffcddd62006-09-06 20:37:03 +000077 ttx.printStackTrace();
Mark Slee4e755ca2006-09-12 00:46:08 +000078 }
Mark Sleeffcddd62006-09-06 20:37:03 +000079 }
80 }
81
82 private class WorkerProcess implements Runnable {
83
84 /**
85 * Client that this services.
86 */
87 private TTransport client_;
88
89 /**
90 * Default constructor.
91 *
92 * @param client Transport to process
93 */
94 private WorkerProcess(TTransport client) {
95 client_ = client;
96 }
97
98 /**
99 * Loops on processing a client forever
100 */
101 public void run() {
Mark Sleed788b2e2006-09-07 01:26:35 +0000102 TTransport[] io = null;
Mark Sleeffcddd62006-09-06 20:37:03 +0000103 try {
Mark Sleed788b2e2006-09-07 01:26:35 +0000104 io = transportFactory_.getIOTransports(client_);
105 while (processor_.process(io[0], io[1])) {}
Mark Sleeade2c832006-09-08 03:41:50 +0000106 } catch (TTransportException ttx) {
107 // Assume the client died and continue silently
Mark Sleeffcddd62006-09-06 20:37:03 +0000108 } catch (TException tx) {
109 tx.printStackTrace();
Mark Sleeade2c832006-09-08 03:41:50 +0000110 } catch (Exception x) {
111 x.printStackTrace();
Mark Sleeffcddd62006-09-06 20:37:03 +0000112 }
Mark Sleed788b2e2006-09-07 01:26:35 +0000113
114 if (io != null) {
115 if (io[0] != null) {
116 io[0].close();
117 }
118 if (io[1] != null) {
119 io[1].close();
120 }
121 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000122 }
123 }
124}