blob: 7201cd3ba3775675a86429c3c70f9a13acd8233e [file] [log] [blame]
Mark Sleeffcddd62006-09-06 20:37:03 +00001package com.facebook.thrift.server;
2
3import com.facebook.thrift.TException;
4import com.facebook.thrift.TProcessor;
5import com.facebook.thrift.transport.TServerTransport;
6import com.facebook.thrift.transport.TTransport;
7import com.facebook.thrift.transport.TTransportException;
Mark Sleed788b2e2006-09-07 01:26:35 +00008import com.facebook.thrift.transport.TTransportFactory;
9import com.facebook.thrift.transport.TBaseTransportFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +000010
11import java.util.concurrent.ExecutorService;
12import java.util.concurrent.LinkedBlockingQueue;
13import java.util.concurrent.ThreadPoolExecutor;
14import java.util.concurrent.TimeUnit;
15
16
17/**
18 * Server which uses Java's built in ThreadPool management to spawn off
19 * a worker pool that
20 *
21 * @author Mark Slee <mcslee@facebook.com>
22 */
23public class TThreadPoolServer extends TServer {
24
Mark Sleeffcddd62006-09-06 20:37:03 +000025 // Executor service for handling client connections
26 private ExecutorService executorService_;
27
28 // Customizable server options
29 public static class Options extends TServer.Options {
Mark Sleeffcddd62006-09-06 20:37:03 +000030 public int minWorkerThreads = 5;
31 public int maxWorkerThreads = Integer.MAX_VALUE;
32 }
33
34 public TThreadPoolServer(TProcessor processor,
35 TServerTransport serverTransport) {
Mark Sleed788b2e2006-09-07 01:26:35 +000036 this(processor,
37 serverTransport,
38 new TBaseTransportFactory(),
39 new Options());
Mark Sleeffcddd62006-09-06 20:37:03 +000040 }
Mark Sleed788b2e2006-09-07 01:26:35 +000041
Mark Sleeffcddd62006-09-06 20:37:03 +000042 public TThreadPoolServer(TProcessor processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000043 TServerTransport serverTransport,
44 TTransportFactory transportFactory,
45 Options options) {
46 super(processor, serverTransport, transportFactory, options);
Mark Sleeffcddd62006-09-06 20:37:03 +000047 serverTransport_ = serverTransport;
48 executorService_ = null;
49
50 LinkedBlockingQueue<Runnable> executorQueue =
51 new LinkedBlockingQueue<Runnable>();
52
53 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
54 options.maxWorkerThreads,
55 60,
56 TimeUnit.SECONDS,
57 executorQueue);
58 }
59
60
61 public void run() {
62 try {
63 serverTransport_.listen();
64 } catch (TTransportException ttx) {
65 ttx.printStackTrace();
66 return;
67 }
68
69 while (true) {
70 try {
71 TTransport client = serverTransport_.accept();
72 WorkerProcess wp = new WorkerProcess(client);
73 executorService_.execute(wp);
74 } catch (TTransportException ttx) {
75 ttx.printStackTrace();
76 }
77 }
78 }
79
80 private class WorkerProcess implements Runnable {
81
82 /**
83 * Client that this services.
84 */
85 private TTransport client_;
86
87 /**
88 * Default constructor.
89 *
90 * @param client Transport to process
91 */
92 private WorkerProcess(TTransport client) {
93 client_ = client;
94 }
95
96 /**
97 * Loops on processing a client forever
98 */
99 public void run() {
Mark Sleed788b2e2006-09-07 01:26:35 +0000100 TTransport[] io = null;
Mark Sleeffcddd62006-09-06 20:37:03 +0000101 try {
Mark Sleed788b2e2006-09-07 01:26:35 +0000102 io = transportFactory_.getIOTransports(client_);
103 while (processor_.process(io[0], io[1])) {}
Mark Sleeade2c832006-09-08 03:41:50 +0000104 } catch (TTransportException ttx) {
105 // Assume the client died and continue silently
Mark Sleeffcddd62006-09-06 20:37:03 +0000106 } catch (TException tx) {
107 tx.printStackTrace();
Mark Sleeade2c832006-09-08 03:41:50 +0000108 } catch (Exception x) {
109 x.printStackTrace();
Mark Sleeffcddd62006-09-06 20:37:03 +0000110 }
Mark Sleed788b2e2006-09-07 01:26:35 +0000111
112 if (io != null) {
113 if (io[0] != null) {
114 io[0].close();
115 }
116 if (io[1] != null) {
117 io[1].close();
118 }
119 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000120 }
121 }
122}