blob: d19275ee591ef12e01bc05f8810100dec745b9b0 [file] [log] [blame]
Mark Sleeffcddd62006-09-06 20:37:03 +00001package com.facebook.thrift.server;
2
3import com.facebook.thrift.TException;
4import com.facebook.thrift.TProcessor;
5import com.facebook.thrift.transport.TServerTransport;
6import com.facebook.thrift.transport.TTransport;
7import com.facebook.thrift.transport.TTransportException;
8
9import java.util.concurrent.ExecutorService;
10import java.util.concurrent.LinkedBlockingQueue;
11import java.util.concurrent.ThreadPoolExecutor;
12import java.util.concurrent.TimeUnit;
13
14
15/**
16 * Server which uses Java's built in ThreadPool management to spawn off
17 * a worker pool that
18 *
19 * @author Mark Slee <mcslee@facebook.com>
20 */
21public class TThreadPoolServer extends TServer {
22
23 // Server transport
24 private TServerTransport serverTransport_;
25
26 // Executor service for handling client connections
27 private ExecutorService executorService_;
28
29 // Customizable server options
30 public static class Options extends TServer.Options {
31 public int port = 9190;
32 public int minWorkerThreads = 5;
33 public int maxWorkerThreads = Integer.MAX_VALUE;
34 }
35
36 public TThreadPoolServer(TProcessor processor,
37 TServerTransport serverTransport) {
38 this(processor, new Options(), serverTransport);
39 }
40
41 public TThreadPoolServer(TProcessor processor,
42 Options options,
43 TServerTransport serverTransport) {
44 super(processor, options);
45 serverTransport_ = serverTransport;
46 executorService_ = null;
47
48 LinkedBlockingQueue<Runnable> executorQueue =
49 new LinkedBlockingQueue<Runnable>();
50
51 executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
52 options.maxWorkerThreads,
53 60,
54 TimeUnit.SECONDS,
55 executorQueue);
56 }
57
58
59 public void run() {
60 try {
61 serverTransport_.listen();
62 } catch (TTransportException ttx) {
63 ttx.printStackTrace();
64 return;
65 }
66
67 while (true) {
68 try {
69 TTransport client = serverTransport_.accept();
70 WorkerProcess wp = new WorkerProcess(client);
71 executorService_.execute(wp);
72 } catch (TTransportException ttx) {
73 ttx.printStackTrace();
74 }
75 }
76 }
77
78 private class WorkerProcess implements Runnable {
79
80 /**
81 * Client that this services.
82 */
83 private TTransport client_;
84
85 /**
86 * Default constructor.
87 *
88 * @param client Transport to process
89 */
90 private WorkerProcess(TTransport client) {
91 client_ = client;
92 }
93
94 /**
95 * Loops on processing a client forever
96 */
97 public void run() {
98 try {
99 while (processor_.process(client_, client_)) {}
100 } catch (TException tx) {
101 tx.printStackTrace();
102 }
103 client_.close();
104 }
105 }
106}