blob: 090859d479936dc715b8ac8edd5750d60edda4c0 [file] [log] [blame]
Mark Sleeffcddd62006-09-06 20:37:03 +00001package com.facebook.thrift.server;
2
3import com.facebook.thrift.TException;
4import com.facebook.thrift.TProcessor;
Mark Slee456b7a82006-10-25 20:53:37 +00005import com.facebook.thrift.protocol.TProtocol;
6import com.facebook.thrift.protocol.TProtocolFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +00007import com.facebook.thrift.transport.TServerTransport;
8import com.facebook.thrift.transport.TTransport;
9import com.facebook.thrift.transport.TTransportException;
Mark Sleed788b2e2006-09-07 01:26:35 +000010import com.facebook.thrift.transport.TTransportFactory;
Mark Sleeffcddd62006-09-06 20:37:03 +000011
12import java.util.concurrent.ExecutorService;
13import java.util.concurrent.LinkedBlockingQueue;
14import java.util.concurrent.ThreadPoolExecutor;
15import java.util.concurrent.TimeUnit;
16
17
18/**
19 * Server which uses Java's built in ThreadPool management to spawn off
20 * a worker pool that
21 *
22 * @author Mark Slee <mcslee@facebook.com>
23 */
24public class TThreadPoolServer extends TServer {
25
Mark Sleeffcddd62006-09-06 20:37:03 +000026 // Executor service for handling client connections
27 private ExecutorService executorService_;
28
29 // Customizable server options
Mark Slee456b7a82006-10-25 20:53:37 +000030 public static class Options {
Mark Sleeffcddd62006-09-06 20:37:03 +000031 public int minWorkerThreads = 5;
32 public int maxWorkerThreads = Integer.MAX_VALUE;
33 }
34
35 public TThreadPoolServer(TProcessor processor,
36 TServerTransport serverTransport) {
Mark Slee456b7a82006-10-25 20:53:37 +000037 this(processor, serverTransport, new Options());
Mark Sleeffcddd62006-09-06 20:37:03 +000038 }
Mark Sleed788b2e2006-09-07 01:26:35 +000039
Mark Sleeffcddd62006-09-06 20:37:03 +000040 public TThreadPoolServer(TProcessor processor,
Mark Sleed788b2e2006-09-07 01:26:35 +000041 TServerTransport serverTransport,
Mark Sleed788b2e2006-09-07 01:26:35 +000042 Options options) {
Mark Slee456b7a82006-10-25 20:53:37 +000043 super(processor, serverTransport);
44
Mark Sleeffcddd62006-09-06 20:37:03 +000045 executorService_ = null;
46
47 LinkedBlockingQueue<Runnable> executorQueue =
48 new LinkedBlockingQueue<Runnable>();
49
50 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
51 options.maxWorkerThreads,
52 60,
53 TimeUnit.SECONDS,
54 executorQueue);
55 }
56
57
Mark Slee4e755ca2006-09-12 00:46:08 +000058 public void serve() {
Mark Sleeffcddd62006-09-06 20:37:03 +000059 try {
60 serverTransport_.listen();
61 } catch (TTransportException ttx) {
62 ttx.printStackTrace();
63 return;
64 }
65
66 while (true) {
Mark Slee4e755ca2006-09-12 00:46:08 +000067 int failureCount = 0;
Mark Sleeffcddd62006-09-06 20:37:03 +000068 try {
69 TTransport client = serverTransport_.accept();
70 WorkerProcess wp = new WorkerProcess(client);
71 executorService_.execute(wp);
72 } catch (TTransportException ttx) {
Mark Slee4e755ca2006-09-12 00:46:08 +000073 ++failureCount;
Mark Sleeffcddd62006-09-06 20:37:03 +000074 ttx.printStackTrace();
Mark Slee4e755ca2006-09-12 00:46:08 +000075 }
Mark Sleeffcddd62006-09-06 20:37:03 +000076 }
77 }
78
79 private class WorkerProcess implements Runnable {
80
81 /**
82 * Client that this services.
83 */
84 private TTransport client_;
85
86 /**
87 * Default constructor.
88 *
89 * @param client Transport to process
90 */
91 private WorkerProcess(TTransport client) {
92 client_ = client;
93 }
94
95 /**
96 * Loops on processing a client forever
97 */
98 public void run() {
Mark Slee456b7a82006-10-25 20:53:37 +000099 TTransport[] iot = null;
100 TProtocol[] iop = null;
Mark Sleeffcddd62006-09-06 20:37:03 +0000101 try {
Mark Slee456b7a82006-10-25 20:53:37 +0000102 iot = transportFactory_.getIOTransports(client_);
103 iop = protocolFactory_.getIOProtocols(iot[0], iot[1]);
104 while (processor_.process(iop[0], iop[1])) {}
Mark Sleeade2c832006-09-08 03:41:50 +0000105 } catch (TTransportException ttx) {
106 // Assume the client died and continue silently
Mark Sleeffcddd62006-09-06 20:37:03 +0000107 } catch (TException tx) {
108 tx.printStackTrace();
Mark Sleeade2c832006-09-08 03:41:50 +0000109 } catch (Exception x) {
110 x.printStackTrace();
Mark Sleeffcddd62006-09-06 20:37:03 +0000111 }
Mark Sleed788b2e2006-09-07 01:26:35 +0000112
Mark Slee456b7a82006-10-25 20:53:37 +0000113 if (iot != null) {
114 if (iot[0] != null) {
115 iot[0].close();
Mark Sleed788b2e2006-09-07 01:26:35 +0000116 }
Mark Slee456b7a82006-10-25 20:53:37 +0000117 if (iot[1] != null) {
118 iot[1].close();
Mark Sleed788b2e2006-09-07 01:26:35 +0000119 }
120 }
Mark Sleeffcddd62006-09-06 20:37:03 +0000121 }
122 }
123}