blob: 20e659d3aa2d701f59a5a139fdbcd22fd303e00d [file] [log] [blame]
Jens Geyeraa0c8b32019-01-28 23:27:45 +01001/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 * Contains some contributions under the Thrift Software License.
20 * Please see doc/old-thrift-license.txt in the Thrift distribution for
21 * details.
22 */
23
24using System;
25using System.Threading;
26using Thrift.Protocol;
27using Thrift.Transport;
28using Thrift.Processor;
29using System.Threading.Tasks;
30using Microsoft.Extensions.Logging;
31
32namespace Thrift.Server
33{
34 /// <summary>
35 /// Server that uses C# built-in ThreadPool to spawn threads when handling requests.
36 /// </summary>
37 public class TThreadPoolAsyncServer : TServer
38 {
39 private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults
40 private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults
41 private volatile bool stop = false;
42
43 private CancellationToken ServerCancellationToken;
44
45 public struct Configuration
46 {
47 public int MinWorkerThreads;
48 public int MaxWorkerThreads;
49 public int MinIOThreads;
50 public int MaxIOThreads;
51
52 public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS)
53 {
54 MinWorkerThreads = min;
55 MaxWorkerThreads = max;
56 MinIOThreads = min;
57 MaxIOThreads = max;
58 }
59
60 public Configuration(int minWork, int maxWork, int minIO, int maxIO)
61 {
62 MinWorkerThreads = minWork;
63 MaxWorkerThreads = maxWork;
64 MinIOThreads = minIO;
65 MaxIOThreads = maxIO;
66 }
67 }
68
69 public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null)
70 : this(new TSingletonProcessorFactory(processor), serverTransport,
Kyle Smith7b94dd42019-03-23 17:26:56 +010071 null, null, // defaults to TTransportFactory()
Jens Geyeraa0c8b32019-01-28 23:27:45 +010072 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
73 new Configuration(), logger)
74 {
75 }
76
77 public TThreadPoolAsyncServer(ITAsyncProcessor processor,
78 TServerTransport serverTransport,
79 TTransportFactory transportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +010080 TProtocolFactory protocolFactory)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010081 : this(new TSingletonProcessorFactory(processor), serverTransport,
82 transportFactory, transportFactory,
83 protocolFactory, protocolFactory,
84 new Configuration())
85 {
86 }
87
88 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
89 TServerTransport serverTransport,
90 TTransportFactory transportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +010091 TProtocolFactory protocolFactory)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010092 : this(processorFactory, serverTransport,
93 transportFactory, transportFactory,
94 protocolFactory, protocolFactory,
95 new Configuration())
96 {
97 }
98
99 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
100 TServerTransport serverTransport,
101 TTransportFactory inputTransportFactory,
102 TTransportFactory outputTransportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +0100103 TProtocolFactory inputProtocolFactory,
104 TProtocolFactory outputProtocolFactory,
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100105 int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
106 : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
107 inputProtocolFactory, outputProtocolFactory,
108 new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
109 logger)
110 {
111 }
112
113 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
114 TServerTransport serverTransport,
115 TTransportFactory inputTransportFactory,
116 TTransportFactory outputTransportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +0100117 TProtocolFactory inputProtocolFactory,
118 TProtocolFactory outputProtocolFactory,
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100119 Configuration threadConfig,
120 ILogger logger = null)
121 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
122 inputProtocolFactory, outputProtocolFactory, logger)
123 {
124 lock (typeof(TThreadPoolAsyncServer))
125 {
126 if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
127 {
128 int work, comm;
129 ThreadPool.GetMaxThreads(out work, out comm);
130 if (threadConfig.MaxWorkerThreads > 0)
131 work = threadConfig.MaxWorkerThreads;
132 if (threadConfig.MaxIOThreads > 0)
133 comm = threadConfig.MaxIOThreads;
134 if (!ThreadPool.SetMaxThreads(work, comm))
135 throw new Exception("Error: could not SetMaxThreads in ThreadPool");
136 }
137
138 if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
139 {
140 int work, comm;
141 ThreadPool.GetMinThreads(out work, out comm);
142 if (threadConfig.MinWorkerThreads > 0)
143 work = threadConfig.MinWorkerThreads;
144 if (threadConfig.MinIOThreads > 0)
145 comm = threadConfig.MinIOThreads;
146 if (!ThreadPool.SetMinThreads(work, comm))
147 throw new Exception("Error: could not SetMinThreads in ThreadPool");
148 }
149 }
150 }
151
152
153 /// <summary>
154 /// Use new ThreadPool thread for each new client connection.
155 /// </summary>
156 public override async Task ServeAsync(CancellationToken cancellationToken)
157 {
158 ServerCancellationToken = cancellationToken;
159 try
160 {
161 try
162 {
163 ServerTransport.Listen();
164 }
165 catch (TTransportException ttx)
166 {
167 LogError("Error, could not listen on ServerTransport: " + ttx);
168 return;
169 }
170
171 //Fire the preServe server event when server is up but before any client connections
172 if (ServerEventHandler != null)
173 await ServerEventHandler.PreServeAsync(cancellationToken);
174
175 while (!stop)
176 {
177 int failureCount = 0;
178 try
179 {
180 TTransport client = await ServerTransport.AcceptAsync(cancellationToken);
181 ThreadPool.QueueUserWorkItem(this.Execute, client);
182 }
183 catch (TTransportException ttx)
184 {
185 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
186 {
187 ++failureCount;
188 LogError(ttx.ToString());
189 }
190
191 }
192 }
193
194 if (stop)
195 {
196 try
197 {
198 ServerTransport.Close();
199 }
200 catch (TTransportException ttx)
201 {
202 LogError("TServerTransport failed on close: " + ttx.Message);
203 }
204 stop = false;
205 }
206
207 }
208 finally
209 {
210 ServerCancellationToken = default(CancellationToken);
211 }
212 }
213
214 /// <summary>
215 /// Loops on processing a client forever
216 /// threadContext will be a TTransport instance
217 /// </summary>
218 /// <param name="threadContext"></param>
219 private void Execute(object threadContext)
220 {
221 var cancellationToken = ServerCancellationToken;
222
223 using (TTransport client = (TTransport)threadContext)
224 {
225 ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this);
226 TTransport inputTransport = null;
227 TTransport outputTransport = null;
228 TProtocol inputProtocol = null;
229 TProtocol outputProtocol = null;
230 object connectionContext = null;
231 try
232 {
233 try
234 {
235 inputTransport = InputTransportFactory.GetTransport(client);
236 outputTransport = OutputTransportFactory.GetTransport(client);
237 inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
238 outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
239
240 //Recover event handler (if any) and fire createContext server event when a client connects
241 if (ServerEventHandler != null)
242 connectionContext = ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken).Result;
243
244 //Process client requests until client disconnects
245 while (!stop)
246 {
247 if (! inputTransport.PeekAsync(cancellationToken).Result)
248 break;
249
250 //Fire processContext server event
251 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
252 //That is to say it may be many minutes between the event firing and the client request
253 //actually arriving or the client may hang up without ever makeing a request.
254 if (ServerEventHandler != null)
255 ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait();
256 //Process client request (blocks until transport is readable)
257 if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result)
258 break;
259 }
260 }
261 catch (TTransportException)
262 {
263 //Usually a client disconnect, expected
264 }
265 catch (Exception x)
266 {
267 //Unexpected
268 LogError("Error: " + x);
269 }
270
271 //Fire deleteContext server event after client disconnects
272 if (ServerEventHandler != null)
273 ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken).Wait();
274
275 }
276 finally
277 {
278 //Close transports
279 inputTransport?.Close();
280 outputTransport?.Close();
281
282 // disposable stuff should be disposed
283 inputProtocol?.Dispose();
284 outputProtocol?.Dispose();
285 inputTransport?.Dispose();
286 outputTransport?.Dispose();
287 }
288 }
289 }
290
291 public override void Stop()
292 {
293 stop = true;
294 ServerTransport?.Close();
295 }
296 }
297}