blob: efa1698daff1d6198f0e542825eb953b866d95c1 [file] [log] [blame]
Jens Geyer3f1fd592021-10-28 22:31:12 +02001/*
Jens Geyeraa0c8b32019-01-28 23:27:45 +01002 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 * Contains some contributions under the Thrift Software License.
20 * Please see doc/old-thrift-license.txt in the Thrift distribution for
21 * details.
22 */
23
24using System;
25using System.Threading;
26using Thrift.Protocol;
27using Thrift.Transport;
28using Thrift.Processor;
29using System.Threading.Tasks;
30using Microsoft.Extensions.Logging;
31
Jens Geyer63d114d2021-05-25 23:42:35 +020032
Jens Geyeraa0c8b32019-01-28 23:27:45 +010033namespace Thrift.Server
34{
35 /// <summary>
36 /// Server that uses C# built-in ThreadPool to spawn threads when handling requests.
37 /// </summary>
38 public class TThreadPoolAsyncServer : TServer
39 {
40 private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults
41 private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults
42 private volatile bool stop = false;
43
44 private CancellationToken ServerCancellationToken;
45
46 public struct Configuration
47 {
48 public int MinWorkerThreads;
49 public int MaxWorkerThreads;
50 public int MinIOThreads;
51 public int MaxIOThreads;
52
53 public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS)
54 {
55 MinWorkerThreads = min;
56 MaxWorkerThreads = max;
57 MinIOThreads = min;
58 MaxIOThreads = max;
59 }
60
61 public Configuration(int minWork, int maxWork, int minIO, int maxIO)
62 {
63 MinWorkerThreads = minWork;
64 MaxWorkerThreads = maxWork;
65 MinIOThreads = minIO;
66 MaxIOThreads = maxIO;
67 }
68 }
69
70 public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null)
71 : this(new TSingletonProcessorFactory(processor), serverTransport,
Kyle Smith7b94dd42019-03-23 17:26:56 +010072 null, null, // defaults to TTransportFactory()
Jens Geyeraa0c8b32019-01-28 23:27:45 +010073 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
74 new Configuration(), logger)
75 {
76 }
77
78 public TThreadPoolAsyncServer(ITAsyncProcessor processor,
79 TServerTransport serverTransport,
80 TTransportFactory transportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +010081 TProtocolFactory protocolFactory)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010082 : this(new TSingletonProcessorFactory(processor), serverTransport,
83 transportFactory, transportFactory,
84 protocolFactory, protocolFactory,
85 new Configuration())
86 {
87 }
88
89 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
90 TServerTransport serverTransport,
91 TTransportFactory transportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +010092 TProtocolFactory protocolFactory)
Jens Geyeraa0c8b32019-01-28 23:27:45 +010093 : this(processorFactory, serverTransport,
94 transportFactory, transportFactory,
95 protocolFactory, protocolFactory,
96 new Configuration())
97 {
98 }
99
100 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
101 TServerTransport serverTransport,
102 TTransportFactory inputTransportFactory,
103 TTransportFactory outputTransportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +0100104 TProtocolFactory inputProtocolFactory,
105 TProtocolFactory outputProtocolFactory,
Jens Geyerea1e8ff2021-11-13 23:21:02 +0100106 int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger = null)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100107 : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
108 inputProtocolFactory, outputProtocolFactory,
109 new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
110 logger)
111 {
112 }
113
114 public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
115 TServerTransport serverTransport,
116 TTransportFactory inputTransportFactory,
117 TTransportFactory outputTransportFactory,
Jens Geyer421444f2019-03-20 22:13:25 +0100118 TProtocolFactory inputProtocolFactory,
119 TProtocolFactory outputProtocolFactory,
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100120 Configuration threadConfig,
121 ILogger logger = null)
122 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
123 inputProtocolFactory, outputProtocolFactory, logger)
124 {
125 lock (typeof(TThreadPoolAsyncServer))
126 {
127 if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
128 {
Jens Geyer63d114d2021-05-25 23:42:35 +0200129 ThreadPool.GetMaxThreads(out int work, out int comm);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100130 if (threadConfig.MaxWorkerThreads > 0)
131 work = threadConfig.MaxWorkerThreads;
132 if (threadConfig.MaxIOThreads > 0)
133 comm = threadConfig.MaxIOThreads;
134 if (!ThreadPool.SetMaxThreads(work, comm))
135 throw new Exception("Error: could not SetMaxThreads in ThreadPool");
136 }
137
138 if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
139 {
Jens Geyer63d114d2021-05-25 23:42:35 +0200140 ThreadPool.GetMinThreads(out int work, out int comm);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100141 if (threadConfig.MinWorkerThreads > 0)
142 work = threadConfig.MinWorkerThreads;
143 if (threadConfig.MinIOThreads > 0)
144 comm = threadConfig.MinIOThreads;
145 if (!ThreadPool.SetMinThreads(work, comm))
146 throw new Exception("Error: could not SetMinThreads in ThreadPool");
147 }
148 }
149 }
150
151
152 /// <summary>
153 /// Use new ThreadPool thread for each new client connection.
154 /// </summary>
155 public override async Task ServeAsync(CancellationToken cancellationToken)
156 {
157 ServerCancellationToken = cancellationToken;
158 try
159 {
160 try
161 {
162 ServerTransport.Listen();
163 }
164 catch (TTransportException ttx)
165 {
166 LogError("Error, could not listen on ServerTransport: " + ttx);
167 return;
168 }
169
170 //Fire the preServe server event when server is up but before any client connections
171 if (ServerEventHandler != null)
172 await ServerEventHandler.PreServeAsync(cancellationToken);
173
Jens Geyerd4e1eb92021-04-15 16:48:21 +0200174 while (!(stop || ServerCancellationToken.IsCancellationRequested))
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100175 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100176 try
177 {
178 TTransport client = await ServerTransport.AcceptAsync(cancellationToken);
phxnsharp9a4802a2021-05-21 23:36:30 +0200179 _ = Task.Run(async () => await ExecuteAsync(client), cancellationToken); // intentionally ignoring retval
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100180 }
Jens Geyerd4e1eb92021-04-15 16:48:21 +0200181 catch (TaskCanceledException)
182 {
183 stop = true;
184 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100185 catch (TTransportException ttx)
186 {
187 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
188 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100189 LogError(ttx.ToString());
190 }
191
192 }
193 }
194
195 if (stop)
196 {
197 try
198 {
199 ServerTransport.Close();
200 }
201 catch (TTransportException ttx)
202 {
203 LogError("TServerTransport failed on close: " + ttx.Message);
204 }
205 stop = false;
206 }
207
208 }
209 finally
210 {
Jens Geyer63d114d2021-05-25 23:42:35 +0200211 ServerCancellationToken = default;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100212 }
213 }
214
215 /// <summary>
216 /// Loops on processing a client forever
Jens Geyer3f1fd592021-10-28 22:31:12 +0200217 /// client will be a TTransport instance
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100218 /// </summary>
Jens Geyer3f1fd592021-10-28 22:31:12 +0200219 /// <param name="client"></param>
phxnsharp9a4802a2021-05-21 23:36:30 +0200220 private async Task ExecuteAsync(TTransport client)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100221 {
222 var cancellationToken = ServerCancellationToken;
223
phxnsharp9a4802a2021-05-21 23:36:30 +0200224 using (client)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100225 {
226 ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this);
227 TTransport inputTransport = null;
228 TTransport outputTransport = null;
229 TProtocol inputProtocol = null;
230 TProtocol outputProtocol = null;
231 object connectionContext = null;
232 try
233 {
234 try
235 {
236 inputTransport = InputTransportFactory.GetTransport(client);
237 outputTransport = OutputTransportFactory.GetTransport(client);
238 inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
239 outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
240
241 //Recover event handler (if any) and fire createContext server event when a client connects
242 if (ServerEventHandler != null)
phxnsharp9a4802a2021-05-21 23:36:30 +0200243 connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100244
245 //Process client requests until client disconnects
Jens Geyerea1e8ff2021-11-13 23:21:02 +0100246 while (!(stop || cancellationToken.IsCancellationRequested))
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100247 {
Jens Geyerea1e8ff2021-11-13 23:21:02 +0100248 if (!await inputTransport.PeekAsync(cancellationToken))
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100249 break;
250
251 //Fire processContext server event
252 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
253 //That is to say it may be many minutes between the event firing and the client request
254 //actually arriving or the client may hang up without ever makeing a request.
255 if (ServerEventHandler != null)
phxnsharp9a4802a2021-05-21 23:36:30 +0200256 await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
Jens Geyer63d114d2021-05-25 23:42:35 +0200257
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100258 //Process client request (blocks until transport is readable)
Jens Geyerea1e8ff2021-11-13 23:21:02 +0100259 if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100260 break;
261 }
262 }
263 catch (TTransportException)
264 {
265 //Usually a client disconnect, expected
266 }
267 catch (Exception x)
268 {
269 //Unexpected
270 LogError("Error: " + x);
271 }
272
273 //Fire deleteContext server event after client disconnects
274 if (ServerEventHandler != null)
phxnsharp9a4802a2021-05-21 23:36:30 +0200275 await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100276
277 }
278 finally
279 {
280 //Close transports
281 inputTransport?.Close();
282 outputTransport?.Close();
283
284 // disposable stuff should be disposed
285 inputProtocol?.Dispose();
286 outputProtocol?.Dispose();
287 inputTransport?.Dispose();
288 outputTransport?.Dispose();
289 }
290 }
291 }
292
293 public override void Stop()
294 {
295 stop = true;
296 ServerTransport?.Close();
297 }
298 }
299}