blob: 7a5254ae8e6363628bffeef9a9d62546e45adf33 [file] [log] [blame]
Jens Geyeraa0c8b32019-01-28 23:27:45 +01001/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 * Contains some contributions under the Thrift Software License.
20 * Please see doc/old-thrift-license.txt in the Thrift distribution for
21 * details.
22 */
23
24using System;
25using System.Threading;
26using Thrift.Protocol;
27using Thrift.Transport;
28using Thrift.Processor;
29using System.Threading.Tasks;
30using Microsoft.Extensions.Logging;
31
Jens Geyer63d114d2021-05-25 23:42:35 +020032#pragma warning disable IDE0079 // remove unnecessary pragmas
33#pragma warning disable IDE0063 // using can be simplified, we don't
34
Jens Geyeraa0c8b32019-01-28 23:27:45 +010035namespace Thrift.Server
36{
37 /// <summary>
38 /// Server that uses C# built-in ThreadPool to spawn threads when handling requests.
39 /// </summary>
40 public class TThreadPoolAsyncServer : TServer
41 {
42 private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults
43 private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults
44 private volatile bool stop = false;
45
46 private CancellationToken ServerCancellationToken;
47
48 public struct Configuration
49 {
50 public int MinWorkerThreads;
51 public int MaxWorkerThreads;
52 public int MinIOThreads;
53 public int MaxIOThreads;
54
55 public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS)
56 {
57 MinWorkerThreads = min;
58 MaxWorkerThreads = max;
59 MinIOThreads = min;
60 MaxIOThreads = max;
61 }
62
63 public Configuration(int minWork, int maxWork, int minIO, int maxIO)
64 {
65 MinWorkerThreads = minWork;
66 MaxWorkerThreads = maxWork;
67 MinIOThreads = minIO;
68 MaxIOThreads = maxIO;
69 }
70 }
71
72 public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null)
73 : this(new TSingletonProcessorFactory(processor), serverTransport,
Kyle Smith7b94dd42019-03-23 17:26:56 +010074 null, null, // defaults to TTransportFactory()
Jens Geyeraa0c8b32019-01-28 23:27:45 +010075 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
76 new Configuration(), logger)
77 {
78 }
79
80 public TThreadPoolAsyncServer(ITAsyncProcessor processor,
81 TServerTransport serverTransport,
82 TTransportFactory transportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +010083 TProtocolFactory protocolFactory)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010084 : this(new TSingletonProcessorFactory(processor), serverTransport,
85 transportFactory, transportFactory,
86 protocolFactory, protocolFactory,
87 new Configuration())
88 {
89 }
90
91 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
92 TServerTransport serverTransport,
93 TTransportFactory transportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +010094 TProtocolFactory protocolFactory)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010095 : this(processorFactory, serverTransport,
96 transportFactory, transportFactory,
97 protocolFactory, protocolFactory,
98 new Configuration())
99 {
100 }
101
102 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
103 TServerTransport serverTransport,
104 TTransportFactory inputTransportFactory,
105 TTransportFactory outputTransportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +0100106 TProtocolFactory inputProtocolFactory,
107 TProtocolFactory outputProtocolFactory,
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100108 int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
109 : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
110 inputProtocolFactory, outputProtocolFactory,
111 new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
112 logger)
113 {
114 }
115
116 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
117 TServerTransport serverTransport,
118 TTransportFactory inputTransportFactory,
119 TTransportFactory outputTransportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +0100120 TProtocolFactory inputProtocolFactory,
121 TProtocolFactory outputProtocolFactory,
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100122 Configuration threadConfig,
123 ILogger logger = null)
124 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
125 inputProtocolFactory, outputProtocolFactory, logger)
126 {
127 lock (typeof(TThreadPoolAsyncServer))
128 {
129 if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
130 {
Jens Geyer63d114d2021-05-25 23:42:35 +0200131 ThreadPool.GetMaxThreads(out int work, out int comm);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100132 if (threadConfig.MaxWorkerThreads > 0)
133 work = threadConfig.MaxWorkerThreads;
134 if (threadConfig.MaxIOThreads > 0)
135 comm = threadConfig.MaxIOThreads;
136 if (!ThreadPool.SetMaxThreads(work, comm))
137 throw new Exception("Error: could not SetMaxThreads in ThreadPool");
138 }
139
140 if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
141 {
Jens Geyer63d114d2021-05-25 23:42:35 +0200142 ThreadPool.GetMinThreads(out int work, out int comm);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100143 if (threadConfig.MinWorkerThreads > 0)
144 work = threadConfig.MinWorkerThreads;
145 if (threadConfig.MinIOThreads > 0)
146 comm = threadConfig.MinIOThreads;
147 if (!ThreadPool.SetMinThreads(work, comm))
148 throw new Exception("Error: could not SetMinThreads in ThreadPool");
149 }
150 }
151 }
152
153
154 /// <summary>
155 /// Use new ThreadPool thread for each new client connection.
156 /// </summary>
157 public override async Task ServeAsync(CancellationToken cancellationToken)
158 {
159 ServerCancellationToken = cancellationToken;
160 try
161 {
162 try
163 {
164 ServerTransport.Listen();
165 }
166 catch (TTransportException ttx)
167 {
168 LogError("Error, could not listen on ServerTransport: " + ttx);
169 return;
170 }
171
172 //Fire the preServe server event when server is up but before any client connections
173 if (ServerEventHandler != null)
174 await ServerEventHandler.PreServeAsync(cancellationToken);
175
Jens Geyerd4e1eb92021-04-15 16:48:21 +0200176 while (!(stop || ServerCancellationToken.IsCancellationRequested))
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100177 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100178 try
179 {
180 TTransport client = await ServerTransport.AcceptAsync(cancellationToken);
181 ThreadPool.QueueUserWorkItem(this.Execute, client);
182 }
Jens Geyerd4e1eb92021-04-15 16:48:21 +0200183 catch (TaskCanceledException)
184 {
185 stop = true;
186 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100187 catch (TTransportException ttx)
188 {
189 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
190 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100191 LogError(ttx.ToString());
192 }
193
194 }
195 }
196
197 if (stop)
198 {
199 try
200 {
201 ServerTransport.Close();
202 }
203 catch (TTransportException ttx)
204 {
205 LogError("TServerTransport failed on close: " + ttx.Message);
206 }
207 stop = false;
208 }
209
210 }
211 finally
212 {
Jens Geyer63d114d2021-05-25 23:42:35 +0200213 ServerCancellationToken = default;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100214 }
215 }
216
217 /// <summary>
218 /// Loops on processing a client forever
219 /// threadContext will be a TTransport instance
220 /// </summary>
221 /// <param name="threadContext"></param>
222 private void Execute(object threadContext)
223 {
224 var cancellationToken = ServerCancellationToken;
225
226 using (TTransport client = (TTransport)threadContext)
227 {
228 ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this);
229 TTransport inputTransport = null;
230 TTransport outputTransport = null;
231 TProtocol inputProtocol = null;
232 TProtocol outputProtocol = null;
233 object connectionContext = null;
234 try
235 {
236 try
237 {
238 inputTransport = InputTransportFactory.GetTransport(client);
239 outputTransport = OutputTransportFactory.GetTransport(client);
240 inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
241 outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
242
243 //Recover event handler (if any) and fire createContext server event when a client connects
244 if (ServerEventHandler != null)
245 connectionContext = ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken).Result;
246
247 //Process client requests until client disconnects
248 while (!stop)
249 {
250 if (! inputTransport.PeekAsync(cancellationToken).Result)
251 break;
252
253 //Fire processContext server event
254 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
255 //That is to say it may be many minutes between the event firing and the client request
256 //actually arriving or the client may hang up without ever makeing a request.
257 if (ServerEventHandler != null)
258 ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait();
Jens Geyer63d114d2021-05-25 23:42:35 +0200259
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100260 //Process client request (blocks until transport is readable)
261 if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result)
262 break;
263 }
264 }
265 catch (TTransportException)
266 {
267 //Usually a client disconnect, expected
268 }
269 catch (Exception x)
270 {
271 //Unexpected
272 LogError("Error: " + x);
273 }
274
275 //Fire deleteContext server event after client disconnects
276 if (ServerEventHandler != null)
277 ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken).Wait();
278
279 }
280 finally
281 {
282 //Close transports
283 inputTransport?.Close();
284 outputTransport?.Close();
285
286 // disposable stuff should be disposed
287 inputProtocol?.Dispose();
288 outputProtocol?.Dispose();
289 inputTransport?.Dispose();
290 outputTransport?.Dispose();
291 }
292 }
293 }
294
295 public override void Stop()
296 {
297 stop = true;
298 ServerTransport?.Close();
299 }
300 }
301}