blob: a4d33a5ed65b8769d0043289a2436ab670dc4514 [file] [log] [blame]
David Reiss63191332009-01-06 19:49:22 +00001//
2// TThreadPoolServer.cs
3//
4// Begin: Apr 21, 2008
5// Authors:
6// Will Palmeri <wpalmeri@imeem.com>
7//
8// Distributed under the Thrift Software License
9//
10// See accompanying file LICENSE or visit the Thrift site at:
11// http://developers.facebook.com/thrift/using
12using System;
13using System.Collections.Generic;
14using System.Text;
15using System.Threading;
16using Thrift.Protocol;
17using Thrift.Transport;
18
19namespace Thrift.Server
20{
21 /// <summary>
22 /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests
23 /// </summary>
24 public class TThreadedServer : TServer
25 {
26 private const int DEFAULT_MAX_THREADS = 100;
27 private volatile bool stop = false;
28 private readonly int maxThreads;
29
30 private Queue<TTransport> clientQueue;
31 private HashSet<Thread> clientThreads;
32 private object clientLock;
33 private Thread workerThread;
34
35 public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
36 : this(processor, serverTransport,
37 new TTransportFactory(), new TTransportFactory(),
38 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
39 DEFAULT_MAX_THREADS, DefaultLogDelegate)
40 {
41 }
42
43 public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
44 : this(processor, serverTransport,
45 new TTransportFactory(), new TTransportFactory(),
46 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
47 DEFAULT_MAX_THREADS, logDelegate)
48 {
49 }
50
51
52 public TThreadedServer(TProcessor processor,
53 TServerTransport serverTransport,
54 TTransportFactory transportFactory,
55 TProtocolFactory protocolFactory)
56 : this(processor, serverTransport,
57 transportFactory, transportFactory,
58 protocolFactory, protocolFactory,
59 DEFAULT_MAX_THREADS, DefaultLogDelegate)
60 {
61 }
62
63 public TThreadedServer(TProcessor processor,
64 TServerTransport serverTransport,
65 TTransportFactory inputTransportFactory,
66 TTransportFactory outputTransportFactory,
67 TProtocolFactory inputProtocolFactory,
68 TProtocolFactory outputProtocolFactory,
69 int maxThreads, LogDelegate logDel)
70 : base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
71 inputProtocolFactory, outputProtocolFactory, logDel)
72 {
73 this.maxThreads = maxThreads;
74 clientQueue = new Queue<TTransport>();
75 clientLock = new object();
76 clientThreads = new HashSet<Thread>();
77 }
78
79 /// <summary>
80 /// Use new Thread for each new client connection. block until numConnections < maxTHreads
81 /// </summary>
82 public override void Serve()
83 {
84 try
85 {
86 //start worker thread
87 workerThread = new Thread(new ThreadStart(Execute));
88 workerThread.Start();
89 serverTransport.Listen();
90 }
91 catch (TTransportException ttx)
92 {
93 logDelegate("Error, could not listen on ServerTransport: " + ttx);
94 return;
95 }
96
97 while (!stop)
98 {
99 int failureCount = 0;
100 try
101 {
102 TTransport client = serverTransport.Accept();
103 lock (clientLock)
104 {
105 clientQueue.Enqueue(client);
106 Monitor.Pulse(clientLock);
107 }
108 }
109 catch (TTransportException ttx)
110 {
111 if (stop)
112 {
113 logDelegate("TThreadPoolServer was shutting down, caught " + ttx);
114 }
115 else
116 {
117 ++failureCount;
118 logDelegate(ttx.ToString());
119 }
120
121 }
122 }
123
124 if (stop)
125 {
126 try
127 {
128 serverTransport.Close();
129 }
130 catch (TTransportException ttx)
131 {
132 logDelegate("TServeTransport failed on close: " + ttx.Message);
133 }
134 stop = false;
135 }
136 }
137
138 /// <summary>
139 /// Loops on processing a client forever
140 /// threadContext will be a TTransport instance
141 /// </summary>
142 /// <param name="threadContext"></param>
143 private void Execute()
144 {
145 while (!stop)
146 {
147 TTransport client;
148 Thread t;
149 lock (clientLock)
150 {
151 //don't dequeue if too many connections
152 while (clientThreads.Count >= maxThreads)
153 {
154 Monitor.Wait(clientLock);
155 }
156
157 while (clientQueue.Count == 0)
158 {
159 Monitor.Wait(clientLock);
160 }
161
162 client = clientQueue.Dequeue();
163 t = new Thread(new ParameterizedThreadStart(ClientWorker));
164 clientThreads.Add(t);
165 }
166 //start processing requests from client on new thread
167 t.Start(client);
168 }
169 }
170
171 private void ClientWorker(Object context)
172 {
173 TTransport client = (TTransport)context;
174 TTransport inputTransport = null;
175 TTransport outputTransport = null;
176 TProtocol inputProtocol = null;
177 TProtocol outputProtocol = null;
178 try
179 {
180 inputTransport = inputTransportFactory.GetTransport(client);
181 outputTransport = outputTransportFactory.GetTransport(client);
182 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
183 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
184 while (processor.Process(inputProtocol, outputProtocol))
185 {
186 //keep processing requests until client disconnects
187 }
188 }
189 catch (TTransportException)
190 {
191 }
192 catch (Exception x)
193 {
194 logDelegate("Error: " + x);
195 }
196
197 if (inputTransport != null)
198 {
199 inputTransport.Close();
200 }
201 if (outputTransport != null)
202 {
203 outputTransport.Close();
204 }
205
206 lock (clientLock)
207 {
208 clientThreads.Remove(Thread.CurrentThread);
209 Monitor.Pulse(clientLock);
210 }
211 return;
212 }
213
214 public override void Stop()
215 {
216 stop = true;
217 serverTransport.Close();
218 //clean up all the threads myself
219 workerThread.Abort();
220 foreach (Thread t in clientThreads)
221 {
222 t.Abort();
223 }
224 }
225 }
226}