blob: a494ce7bddd5f616bea24ed37ef4e79e3e7bc765 [file] [log] [blame]
Kevin Clarkab4460d2009-03-20 02:28:41 +00001/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
Todd Lipcon53ae9f32009-12-07 00:42:38 +000018 *
19 * Contains some contributions under the Thrift Software License.
20 * Please see doc/old-thrift-license.txt in the Thrift distribution for
21 * details.
Kevin Clarkab4460d2009-03-20 02:28:41 +000022 */
23
David Reiss7f42bcf2008-01-11 20:59:12 +000024using System;
David Reiss7f42bcf2008-01-11 20:59:12 +000025using System.Threading;
26using Thrift.Protocol;
27using Thrift.Transport;
28
29namespace Thrift.Server
30{
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070031 /// <summary>
Christian Weiss8fb719e2018-03-30 21:26:04 +020032 /// Server that uses C# built-in ThreadPool to spawn threads when handling requests.
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070033 /// </summary>
Christian Weiss8fb719e2018-03-30 21:26:04 +020034 public class TThreadPoolServer : TServer
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070035 {
Christian Weiss8fb719e2018-03-30 21:26:04 +020036 private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults
37 private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults
38 private volatile bool stop = false;
David Reiss7f42bcf2008-01-11 20:59:12 +000039
Christian Weiss8fb719e2018-03-30 21:26:04 +020040 public struct Configuration
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070041 {
Christian Weiss8fb719e2018-03-30 21:26:04 +020042 public int MinWorkerThreads;
43 public int MaxWorkerThreads;
44 public int MinIOThreads;
45 public int MaxIOThreads;
46
47 public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS)
48 {
49 MinWorkerThreads = min;
50 MaxWorkerThreads = max;
51 MinIOThreads = min;
52 MaxIOThreads = max;
53 }
54
55 public Configuration(int minWork, int maxWork, int minIO, int maxIO)
56 {
57 MinWorkerThreads = minWork;
58 MaxWorkerThreads = maxWork;
59 MinIOThreads = minIO;
60 MaxIOThreads = maxIO;
61 }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070062 }
David Reissdc815f52008-03-02 00:58:04 +000063
Christian Weiss8fb719e2018-03-30 21:26:04 +020064 public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport)
65 : this(new TSingletonProcessorFactory(processor), serverTransport,
66 new TTransportFactory(), new TTransportFactory(),
67 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
68 new Configuration(), DefaultLogDelegate)
69 {
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070070 }
David Reissdc815f52008-03-02 00:58:04 +000071
Christian Weiss8fb719e2018-03-30 21:26:04 +020072 public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
73 : this(new TSingletonProcessorFactory(processor), serverTransport,
74 new TTransportFactory(), new TTransportFactory(),
75 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
76 new Configuration(), logDelegate)
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070077 {
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070078 }
David Reiss7f42bcf2008-01-11 20:59:12 +000079
Christian Weiss8fb719e2018-03-30 21:26:04 +020080 public TThreadPoolServer(TProcessor processor,
81 TServerTransport serverTransport,
82 TTransportFactory transportFactory,
83 TProtocolFactory protocolFactory)
84 : this(new TSingletonProcessorFactory(processor), serverTransport,
85 transportFactory, transportFactory,
86 protocolFactory, protocolFactory,
87 new Configuration(), DefaultLogDelegate)
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070088 {
Christian Weiss8fb719e2018-03-30 21:26:04 +020089 }
90
91 public TThreadPoolServer(TProcessorFactory processorFactory,
92 TServerTransport serverTransport,
93 TTransportFactory transportFactory,
94 TProtocolFactory protocolFactory)
95 : this(processorFactory, serverTransport,
96 transportFactory, transportFactory,
97 protocolFactory, protocolFactory,
98 new Configuration(), DefaultLogDelegate)
99 {
100 }
101
102 public TThreadPoolServer(TProcessorFactory processorFactory,
103 TServerTransport serverTransport,
104 TTransportFactory inputTransportFactory,
105 TTransportFactory outputTransportFactory,
106 TProtocolFactory inputProtocolFactory,
107 TProtocolFactory outputProtocolFactory,
108 int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel)
109 : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
110 inputProtocolFactory, outputProtocolFactory,
111 new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
112 logDel)
113 {
114 }
115
116 public TThreadPoolServer(TProcessorFactory processorFactory,
117 TServerTransport serverTransport,
118 TTransportFactory inputTransportFactory,
119 TTransportFactory outputTransportFactory,
120 TProtocolFactory inputProtocolFactory,
121 TProtocolFactory outputProtocolFactory,
122 Configuration threadConfig,
123 LogDelegate logDel)
124 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
125 inputProtocolFactory, outputProtocolFactory, logDel)
126 {
127 lock (typeof(TThreadPoolServer))
128 {
129 if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
130 {
131 int work, comm;
132 ThreadPool.GetMaxThreads(out work, out comm);
133 if (threadConfig.MaxWorkerThreads > 0)
134 work = threadConfig.MaxWorkerThreads;
135 if (threadConfig.MaxIOThreads > 0)
136 comm = threadConfig.MaxIOThreads;
137 if (!ThreadPool.SetMaxThreads(work, comm))
138 throw new Exception("Error: could not SetMaxThreads in ThreadPool");
139 }
140
141 if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
142 {
143 int work, comm;
144 ThreadPool.GetMinThreads(out work, out comm);
145 if (threadConfig.MinWorkerThreads > 0)
146 work = threadConfig.MinWorkerThreads;
147 if (threadConfig.MinIOThreads > 0)
148 comm = threadConfig.MinIOThreads;
149 if (!ThreadPool.SetMinThreads(work, comm))
150 throw new Exception("Error: could not SetMinThreads in ThreadPool");
151 }
152 }
153 }
154
155
156 /// <summary>
157 /// Use new ThreadPool thread for each new client connection.
158 /// </summary>
159 public override void Serve()
160 {
161 try
162 {
163 serverTransport.Listen();
164 }
165 catch (TTransportException ttx)
166 {
167 logDelegate("Error, could not listen on ServerTransport: " + ttx);
168 return;
169 }
170
171 //Fire the preServe server event when server is up but before any client connections
Jens Geyer1d5113e2018-01-13 01:29:15 +0100172 if (serverEventHandler != null)
Christian Weiss8fb719e2018-03-30 21:26:04 +0200173 serverEventHandler.preServe();
174
Jens Geyer1d5113e2018-01-13 01:29:15 +0100175 while (!stop)
176 {
Christian Weiss8fb719e2018-03-30 21:26:04 +0200177 int failureCount = 0;
178 try
179 {
180 TTransport client = serverTransport.Accept();
181 ThreadPool.QueueUserWorkItem(this.Execute, client);
182 }
183 catch (TTransportException ttx)
184 {
185 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
186 {
187 ++failureCount;
188 logDelegate(ttx.ToString());
189 }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700190
Christian Weiss8fb719e2018-03-30 21:26:04 +0200191 }
192 }
193
194 if (stop)
195 {
196 try
197 {
198 serverTransport.Close();
199 }
200 catch (TTransportException ttx)
201 {
202 logDelegate("TServerTransport failed on close: " + ttx.Message);
203 }
204 stop = false;
205 }
206 }
207
208 /// <summary>
209 /// Loops on processing a client forever
210 /// threadContext will be a TTransport instance
211 /// </summary>
212 /// <param name="threadContext"></param>
213 private void Execute(object threadContext)
214 {
215 using (TTransport client = (TTransport)threadContext)
216 {
217 TProcessor processor = processorFactory.GetProcessor(client, this);
218 TTransport inputTransport = null;
219 TTransport outputTransport = null;
220 TProtocol inputProtocol = null;
221 TProtocol outputProtocol = null;
222 object connectionContext = null;
223 try
224 {
225 try
226 {
227 inputTransport = inputTransportFactory.GetTransport(client);
228 outputTransport = outputTransportFactory.GetTransport(client);
229 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
230 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
231
232 //Recover event handler (if any) and fire createContext server event when a client connects
233 if (serverEventHandler != null)
234 connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
235
236 //Process client requests until client disconnects
237 while (!stop)
238 {
239 if (!inputTransport.Peek())
240 break;
241
242 //Fire processContext server event
243 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
244 //That is to say it may be many minutes between the event firing and the client request
245 //actually arriving or the client may hang up without ever makeing a request.
246 if (serverEventHandler != null)
247 serverEventHandler.processContext(connectionContext, inputTransport);
248 //Process client request (blocks until transport is readable)
249 if (!processor.Process(inputProtocol, outputProtocol))
250 break;
251 }
252 }
253 catch (TTransportException)
254 {
255 //Usually a client disconnect, expected
256 }
257 catch (Exception x)
258 {
259 //Unexpected
260 logDelegate("Error: " + x);
261 }
262
263 //Fire deleteContext server event after client disconnects
264 if (serverEventHandler != null)
265 serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
266
267 }
268 finally
269 {
270 //Close transports
271 if (inputTransport != null)
272 inputTransport.Close();
273 if (outputTransport != null)
274 outputTransport.Close();
275
276 // disposable stuff should be disposed
277 if (inputProtocol != null)
278 inputProtocol.Dispose();
279 if (outputProtocol != null)
280 outputProtocol.Dispose();
281 if (inputTransport != null)
282 inputTransport.Dispose();
283 if (outputTransport != null)
284 outputTransport.Dispose();
285 }
286 }
287 }
288
289 public override void Stop()
290 {
291 stop = true;
292 serverTransport.Close();
293 }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700294 }
David Reiss7f42bcf2008-01-11 20:59:12 +0000295}