blob: 877d595e6c1fd248c63835dced478467e553ed98 [file] [log] [blame]
Jens Geyeraa0c8b32019-01-28 23:27:45 +01001/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 * Contains some contributions under the Thrift Software License.
20 * Please see doc/old-thrift-license.txt in the Thrift distribution for
21 * details.
22 */
23
24using System;
25using System.Threading;
26using Thrift.Protocol;
27using Thrift.Transport;
28using Thrift.Processor;
29using System.Threading.Tasks;
30using Microsoft.Extensions.Logging;
31
32namespace Thrift.Server
33{
34 /// <summary>
35 /// Server that uses C# built-in ThreadPool to spawn threads when handling requests.
36 /// </summary>
37 public class TThreadPoolAsyncServer : TServer
38 {
39 private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults
40 private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults
41 private volatile bool stop = false;
42
43 private CancellationToken ServerCancellationToken;
44
45 public struct Configuration
46 {
47 public int MinWorkerThreads;
48 public int MaxWorkerThreads;
49 public int MinIOThreads;
50 public int MaxIOThreads;
51
52 public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS)
53 {
54 MinWorkerThreads = min;
55 MaxWorkerThreads = max;
56 MinIOThreads = min;
57 MaxIOThreads = max;
58 }
59
60 public Configuration(int minWork, int maxWork, int minIO, int maxIO)
61 {
62 MinWorkerThreads = minWork;
63 MaxWorkerThreads = maxWork;
64 MinIOThreads = minIO;
65 MaxIOThreads = maxIO;
66 }
67 }
68
69 public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null)
70 : this(new TSingletonProcessorFactory(processor), serverTransport,
Kyle Smith7b94dd42019-03-23 17:26:56 +010071 null, null, // defaults to TTransportFactory()
Jens Geyeraa0c8b32019-01-28 23:27:45 +010072 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
73 new Configuration(), logger)
74 {
75 }
76
77 public TThreadPoolAsyncServer(ITAsyncProcessor processor,
78 TServerTransport serverTransport,
79 TTransportFactory transportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +010080 TProtocolFactory protocolFactory)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010081 : this(new TSingletonProcessorFactory(processor), serverTransport,
82 transportFactory, transportFactory,
83 protocolFactory, protocolFactory,
84 new Configuration())
85 {
86 }
87
88 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
89 TServerTransport serverTransport,
90 TTransportFactory transportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +010091 TProtocolFactory protocolFactory)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010092 : this(processorFactory, serverTransport,
93 transportFactory, transportFactory,
94 protocolFactory, protocolFactory,
95 new Configuration())
96 {
97 }
98
99 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
100 TServerTransport serverTransport,
101 TTransportFactory inputTransportFactory,
102 TTransportFactory outputTransportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +0100103 TProtocolFactory inputProtocolFactory,
104 TProtocolFactory outputProtocolFactory,
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100105 int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
106 : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
107 inputProtocolFactory, outputProtocolFactory,
108 new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
109 logger)
110 {
111 }
112
113 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
114 TServerTransport serverTransport,
115 TTransportFactory inputTransportFactory,
116 TTransportFactory outputTransportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +0100117 TProtocolFactory inputProtocolFactory,
118 TProtocolFactory outputProtocolFactory,
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100119 Configuration threadConfig,
120 ILogger logger = null)
121 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
122 inputProtocolFactory, outputProtocolFactory, logger)
123 {
124 lock (typeof(TThreadPoolAsyncServer))
125 {
126 if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
127 {
128 int work, comm;
129 ThreadPool.GetMaxThreads(out work, out comm);
130 if (threadConfig.MaxWorkerThreads > 0)
131 work = threadConfig.MaxWorkerThreads;
132 if (threadConfig.MaxIOThreads > 0)
133 comm = threadConfig.MaxIOThreads;
134 if (!ThreadPool.SetMaxThreads(work, comm))
135 throw new Exception("Error: could not SetMaxThreads in ThreadPool");
136 }
137
138 if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
139 {
140 int work, comm;
141 ThreadPool.GetMinThreads(out work, out comm);
142 if (threadConfig.MinWorkerThreads > 0)
143 work = threadConfig.MinWorkerThreads;
144 if (threadConfig.MinIOThreads > 0)
145 comm = threadConfig.MinIOThreads;
146 if (!ThreadPool.SetMinThreads(work, comm))
147 throw new Exception("Error: could not SetMinThreads in ThreadPool");
148 }
149 }
150 }
151
152
153 /// <summary>
154 /// Use new ThreadPool thread for each new client connection.
155 /// </summary>
156 public override async Task ServeAsync(CancellationToken cancellationToken)
157 {
158 ServerCancellationToken = cancellationToken;
159 try
160 {
161 try
162 {
163 ServerTransport.Listen();
164 }
165 catch (TTransportException ttx)
166 {
167 LogError("Error, could not listen on ServerTransport: " + ttx);
168 return;
169 }
170
171 //Fire the preServe server event when server is up but before any client connections
172 if (ServerEventHandler != null)
173 await ServerEventHandler.PreServeAsync(cancellationToken);
174
Jens Geyerd4e1eb92021-04-15 16:48:21 +0200175 while (!(stop || ServerCancellationToken.IsCancellationRequested))
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100176 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100177 try
178 {
179 TTransport client = await ServerTransport.AcceptAsync(cancellationToken);
180 ThreadPool.QueueUserWorkItem(this.Execute, client);
181 }
Jens Geyerd4e1eb92021-04-15 16:48:21 +0200182 catch (TaskCanceledException)
183 {
184 stop = true;
185 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100186 catch (TTransportException ttx)
187 {
188 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
189 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100190 LogError(ttx.ToString());
191 }
192
193 }
194 }
195
196 if (stop)
197 {
198 try
199 {
200 ServerTransport.Close();
201 }
202 catch (TTransportException ttx)
203 {
204 LogError("TServerTransport failed on close: " + ttx.Message);
205 }
206 stop = false;
207 }
208
209 }
210 finally
211 {
212 ServerCancellationToken = default(CancellationToken);
213 }
214 }
215
216 /// <summary>
217 /// Loops on processing a client forever
218 /// threadContext will be a TTransport instance
219 /// </summary>
220 /// <param name="threadContext"></param>
221 private void Execute(object threadContext)
222 {
223 var cancellationToken = ServerCancellationToken;
224
225 using (TTransport client = (TTransport)threadContext)
226 {
227 ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this);
228 TTransport inputTransport = null;
229 TTransport outputTransport = null;
230 TProtocol inputProtocol = null;
231 TProtocol outputProtocol = null;
232 object connectionContext = null;
233 try
234 {
235 try
236 {
237 inputTransport = InputTransportFactory.GetTransport(client);
238 outputTransport = OutputTransportFactory.GetTransport(client);
239 inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
240 outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
241
242 //Recover event handler (if any) and fire createContext server event when a client connects
243 if (ServerEventHandler != null)
244 connectionContext = ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken).Result;
245
246 //Process client requests until client disconnects
247 while (!stop)
248 {
249 if (! inputTransport.PeekAsync(cancellationToken).Result)
250 break;
251
252 //Fire processContext server event
253 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
254 //That is to say it may be many minutes between the event firing and the client request
255 //actually arriving or the client may hang up without ever makeing a request.
256 if (ServerEventHandler != null)
257 ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait();
258 //Process client request (blocks until transport is readable)
259 if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result)
260 break;
261 }
262 }
263 catch (TTransportException)
264 {
265 //Usually a client disconnect, expected
266 }
267 catch (Exception x)
268 {
269 //Unexpected
270 LogError("Error: " + x);
271 }
272
273 //Fire deleteContext server event after client disconnects
274 if (ServerEventHandler != null)
275 ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken).Wait();
276
277 }
278 finally
279 {
280 //Close transports
281 inputTransport?.Close();
282 outputTransport?.Close();
283
284 // disposable stuff should be disposed
285 inputProtocol?.Dispose();
286 outputProtocol?.Dispose();
287 inputTransport?.Dispose();
288 outputTransport?.Dispose();
289 }
290 }
291 }
292
293 public override void Stop()
294 {
295 stop = true;
296 ServerTransport?.Close();
297 }
298 }
299}