| Kevin Clark | ab4460d | 2009-03-20 02:28:41 +0000 | [diff] [blame] | 1 | /** | 
|  | 2 | * Licensed to the Apache Software Foundation (ASF) under one | 
|  | 3 | * or more contributor license agreements. See the NOTICE file | 
|  | 4 | * distributed with this work for additional information | 
|  | 5 | * regarding copyright ownership. The ASF licenses this file | 
|  | 6 | * to you under the Apache License, Version 2.0 (the | 
|  | 7 | * "License"); you may not use this file except in compliance | 
|  | 8 | * with the License. You may obtain a copy of the License at | 
|  | 9 | * | 
|  | 10 | *   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | 11 | * | 
|  | 12 | * Unless required by applicable law or agreed to in writing, | 
|  | 13 | * software distributed under the License is distributed on an | 
|  | 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | 15 | * KIND, either express or implied. See the License for the | 
|  | 16 | * specific language governing permissions and limitations | 
|  | 17 | * under the License. | 
| Todd Lipcon | 53ae9f3 | 2009-12-07 00:42:38 +0000 | [diff] [blame] | 18 | * | 
|  | 19 | * Contains some contributions under the Thrift Software License. | 
|  | 20 | * Please see doc/old-thrift-license.txt in the Thrift distribution for | 
|  | 21 | * details. | 
| Kevin Clark | ab4460d | 2009-03-20 02:28:41 +0000 | [diff] [blame] | 22 | */ | 
|  | 23 |  | 
| David Reiss | 7f42bcf | 2008-01-11 20:59:12 +0000 | [diff] [blame] | 24 | using System; | 
| David Reiss | 7f42bcf | 2008-01-11 20:59:12 +0000 | [diff] [blame] | 25 | using System.Threading; | 
|  | 26 | using Thrift.Protocol; | 
|  | 27 | using Thrift.Transport; | 
|  | 28 |  | 
|  | 29 | namespace Thrift.Server | 
|  | 30 | { | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 31 | /// <summary> | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 32 | /// Server that uses C# built-in ThreadPool to spawn threads when handling requests. | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 33 | /// </summary> | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 34 | public class TThreadPoolServer : TServer | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 35 | { | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 36 | private const int DEFAULT_MIN_THREADS = -1;  // use .NET ThreadPool defaults | 
|  | 37 | private const int DEFAULT_MAX_THREADS = -1;  // use .NET ThreadPool defaults | 
|  | 38 | private volatile bool stop = false; | 
| David Reiss | 7f42bcf | 2008-01-11 20:59:12 +0000 | [diff] [blame] | 39 |  | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 40 | public struct Configuration | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 41 | { | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 42 | public int MinWorkerThreads; | 
|  | 43 | public int MaxWorkerThreads; | 
|  | 44 | public int MinIOThreads; | 
|  | 45 | public int MaxIOThreads; | 
|  | 46 |  | 
|  | 47 | public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS) | 
|  | 48 | { | 
|  | 49 | MinWorkerThreads = min; | 
|  | 50 | MaxWorkerThreads = max; | 
|  | 51 | MinIOThreads = min; | 
|  | 52 | MaxIOThreads = max; | 
|  | 53 | } | 
|  | 54 |  | 
|  | 55 | public Configuration(int minWork, int maxWork, int minIO, int maxIO) | 
|  | 56 | { | 
|  | 57 | MinWorkerThreads = minWork; | 
|  | 58 | MaxWorkerThreads = maxWork; | 
|  | 59 | MinIOThreads = minIO; | 
|  | 60 | MaxIOThreads = maxIO; | 
|  | 61 | } | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 62 | } | 
| David Reiss | dc815f5 | 2008-03-02 00:58:04 +0000 | [diff] [blame] | 63 |  | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 64 | public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport) | 
|  | 65 | : this(new TSingletonProcessorFactory(processor), serverTransport, | 
|  | 66 | new TTransportFactory(), new TTransportFactory(), | 
|  | 67 | new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), | 
|  | 68 | new Configuration(), DefaultLogDelegate) | 
|  | 69 | { | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 70 | } | 
| David Reiss | dc815f5 | 2008-03-02 00:58:04 +0000 | [diff] [blame] | 71 |  | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 72 | public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate) | 
|  | 73 | : this(new TSingletonProcessorFactory(processor), serverTransport, | 
|  | 74 | new TTransportFactory(), new TTransportFactory(), | 
|  | 75 | new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), | 
|  | 76 | new Configuration(), logDelegate) | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 77 | { | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 78 | } | 
| David Reiss | 7f42bcf | 2008-01-11 20:59:12 +0000 | [diff] [blame] | 79 |  | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 80 | public TThreadPoolServer(TProcessor processor, | 
|  | 81 | TServerTransport serverTransport, | 
|  | 82 | TTransportFactory transportFactory, | 
|  | 83 | TProtocolFactory protocolFactory) | 
|  | 84 | : this(new TSingletonProcessorFactory(processor), serverTransport, | 
|  | 85 | transportFactory, transportFactory, | 
|  | 86 | protocolFactory, protocolFactory, | 
|  | 87 | new Configuration(), DefaultLogDelegate) | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 88 | { | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 89 | } | 
|  | 90 |  | 
|  | 91 | public TThreadPoolServer(TProcessorFactory processorFactory, | 
|  | 92 | TServerTransport serverTransport, | 
|  | 93 | TTransportFactory transportFactory, | 
|  | 94 | TProtocolFactory protocolFactory) | 
|  | 95 | : this(processorFactory, serverTransport, | 
|  | 96 | transportFactory, transportFactory, | 
|  | 97 | protocolFactory, protocolFactory, | 
|  | 98 | new Configuration(), DefaultLogDelegate) | 
|  | 99 | { | 
|  | 100 | } | 
|  | 101 |  | 
|  | 102 | public TThreadPoolServer(TProcessorFactory processorFactory, | 
|  | 103 | TServerTransport serverTransport, | 
|  | 104 | TTransportFactory inputTransportFactory, | 
|  | 105 | TTransportFactory outputTransportFactory, | 
|  | 106 | TProtocolFactory inputProtocolFactory, | 
|  | 107 | TProtocolFactory outputProtocolFactory, | 
|  | 108 | int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel) | 
|  | 109 | : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, | 
|  | 110 | inputProtocolFactory, outputProtocolFactory, | 
|  | 111 | new Configuration(minThreadPoolThreads, maxThreadPoolThreads), | 
|  | 112 | logDel) | 
|  | 113 | { | 
|  | 114 | } | 
|  | 115 |  | 
|  | 116 | public TThreadPoolServer(TProcessorFactory processorFactory, | 
|  | 117 | TServerTransport serverTransport, | 
|  | 118 | TTransportFactory inputTransportFactory, | 
|  | 119 | TTransportFactory outputTransportFactory, | 
|  | 120 | TProtocolFactory inputProtocolFactory, | 
|  | 121 | TProtocolFactory outputProtocolFactory, | 
|  | 122 | Configuration threadConfig, | 
|  | 123 | LogDelegate logDel) | 
|  | 124 | : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, | 
|  | 125 | inputProtocolFactory, outputProtocolFactory, logDel) | 
|  | 126 | { | 
|  | 127 | lock (typeof(TThreadPoolServer)) | 
|  | 128 | { | 
|  | 129 | if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0)) | 
|  | 130 | { | 
|  | 131 | int work, comm; | 
|  | 132 | ThreadPool.GetMaxThreads(out work, out comm); | 
|  | 133 | if (threadConfig.MaxWorkerThreads > 0) | 
|  | 134 | work = threadConfig.MaxWorkerThreads; | 
|  | 135 | if (threadConfig.MaxIOThreads > 0) | 
|  | 136 | comm = threadConfig.MaxIOThreads; | 
|  | 137 | if (!ThreadPool.SetMaxThreads(work, comm)) | 
|  | 138 | throw new Exception("Error: could not SetMaxThreads in ThreadPool"); | 
|  | 139 | } | 
|  | 140 |  | 
|  | 141 | if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0)) | 
|  | 142 | { | 
|  | 143 | int work, comm; | 
|  | 144 | ThreadPool.GetMinThreads(out work, out comm); | 
|  | 145 | if (threadConfig.MinWorkerThreads > 0) | 
|  | 146 | work = threadConfig.MinWorkerThreads; | 
|  | 147 | if (threadConfig.MinIOThreads > 0) | 
|  | 148 | comm = threadConfig.MinIOThreads; | 
|  | 149 | if (!ThreadPool.SetMinThreads(work, comm)) | 
|  | 150 | throw new Exception("Error: could not SetMinThreads in ThreadPool"); | 
|  | 151 | } | 
|  | 152 | } | 
|  | 153 | } | 
|  | 154 |  | 
|  | 155 |  | 
|  | 156 | /// <summary> | 
|  | 157 | /// Use new ThreadPool thread for each new client connection. | 
|  | 158 | /// </summary> | 
|  | 159 | public override void Serve() | 
|  | 160 | { | 
|  | 161 | try | 
|  | 162 | { | 
|  | 163 | serverTransport.Listen(); | 
|  | 164 | } | 
|  | 165 | catch (TTransportException ttx) | 
|  | 166 | { | 
|  | 167 | logDelegate("Error, could not listen on ServerTransport: " + ttx); | 
|  | 168 | return; | 
|  | 169 | } | 
|  | 170 |  | 
|  | 171 | //Fire the preServe server event when server is up but before any client connections | 
| Jens Geyer | 1d5113e | 2018-01-13 01:29:15 +0100 | [diff] [blame] | 172 | if (serverEventHandler != null) | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 173 | serverEventHandler.preServe(); | 
|  | 174 |  | 
| Jens Geyer | 1d5113e | 2018-01-13 01:29:15 +0100 | [diff] [blame] | 175 | while (!stop) | 
|  | 176 | { | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 177 | int failureCount = 0; | 
|  | 178 | try | 
|  | 179 | { | 
|  | 180 | TTransport client = serverTransport.Accept(); | 
|  | 181 | ThreadPool.QueueUserWorkItem(this.Execute, client); | 
|  | 182 | } | 
|  | 183 | catch (TTransportException ttx) | 
|  | 184 | { | 
|  | 185 | if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted) | 
|  | 186 | { | 
|  | 187 | ++failureCount; | 
|  | 188 | logDelegate(ttx.ToString()); | 
|  | 189 | } | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 190 |  | 
| Christian Weiss | 8fb719e | 2018-03-30 21:26:04 +0200 | [diff] [blame] | 191 | } | 
|  | 192 | } | 
|  | 193 |  | 
|  | 194 | if (stop) | 
|  | 195 | { | 
|  | 196 | try | 
|  | 197 | { | 
|  | 198 | serverTransport.Close(); | 
|  | 199 | } | 
|  | 200 | catch (TTransportException ttx) | 
|  | 201 | { | 
|  | 202 | logDelegate("TServerTransport failed on close: " + ttx.Message); | 
|  | 203 | } | 
|  | 204 | stop = false; | 
|  | 205 | } | 
|  | 206 | } | 
|  | 207 |  | 
|  | 208 | /// <summary> | 
|  | 209 | /// Loops on processing a client forever | 
|  | 210 | /// threadContext will be a TTransport instance | 
|  | 211 | /// </summary> | 
|  | 212 | /// <param name="threadContext"></param> | 
|  | 213 | private void Execute(object threadContext) | 
|  | 214 | { | 
|  | 215 | using (TTransport client = (TTransport)threadContext) | 
|  | 216 | { | 
|  | 217 | TProcessor processor = processorFactory.GetProcessor(client, this); | 
|  | 218 | TTransport inputTransport = null; | 
|  | 219 | TTransport outputTransport = null; | 
|  | 220 | TProtocol inputProtocol = null; | 
|  | 221 | TProtocol outputProtocol = null; | 
|  | 222 | object connectionContext = null; | 
|  | 223 | try | 
|  | 224 | { | 
|  | 225 | try | 
|  | 226 | { | 
|  | 227 | inputTransport = inputTransportFactory.GetTransport(client); | 
|  | 228 | outputTransport = outputTransportFactory.GetTransport(client); | 
|  | 229 | inputProtocol = inputProtocolFactory.GetProtocol(inputTransport); | 
|  | 230 | outputProtocol = outputProtocolFactory.GetProtocol(outputTransport); | 
|  | 231 |  | 
|  | 232 | //Recover event handler (if any) and fire createContext server event when a client connects | 
|  | 233 | if (serverEventHandler != null) | 
|  | 234 | connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol); | 
|  | 235 |  | 
|  | 236 | //Process client requests until client disconnects | 
|  | 237 | while (!stop) | 
|  | 238 | { | 
|  | 239 | if (!inputTransport.Peek()) | 
|  | 240 | break; | 
|  | 241 |  | 
|  | 242 | //Fire processContext server event | 
|  | 243 | //N.B. This is the pattern implemented in C++ and the event fires provisionally. | 
|  | 244 | //That is to say it may be many minutes between the event firing and the client request | 
|  | 245 | //actually arriving or the client may hang up without ever makeing a request. | 
|  | 246 | if (serverEventHandler != null) | 
|  | 247 | serverEventHandler.processContext(connectionContext, inputTransport); | 
|  | 248 | //Process client request (blocks until transport is readable) | 
|  | 249 | if (!processor.Process(inputProtocol, outputProtocol)) | 
|  | 250 | break; | 
|  | 251 | } | 
|  | 252 | } | 
|  | 253 | catch (TTransportException) | 
|  | 254 | { | 
|  | 255 | //Usually a client disconnect, expected | 
|  | 256 | } | 
|  | 257 | catch (Exception x) | 
|  | 258 | { | 
|  | 259 | //Unexpected | 
|  | 260 | logDelegate("Error: " + x); | 
|  | 261 | } | 
|  | 262 |  | 
|  | 263 | //Fire deleteContext server event after client disconnects | 
|  | 264 | if (serverEventHandler != null) | 
|  | 265 | serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); | 
|  | 266 |  | 
|  | 267 | } | 
|  | 268 | finally | 
|  | 269 | { | 
|  | 270 | //Close transports | 
|  | 271 | if (inputTransport != null) | 
|  | 272 | inputTransport.Close(); | 
|  | 273 | if (outputTransport != null) | 
|  | 274 | outputTransport.Close(); | 
|  | 275 |  | 
|  | 276 | // disposable stuff should be disposed | 
|  | 277 | if (inputProtocol != null) | 
|  | 278 | inputProtocol.Dispose(); | 
|  | 279 | if (outputProtocol != null) | 
|  | 280 | outputProtocol.Dispose(); | 
|  | 281 | if (inputTransport != null) | 
|  | 282 | inputTransport.Dispose(); | 
|  | 283 | if (outputTransport != null) | 
|  | 284 | outputTransport.Dispose(); | 
|  | 285 | } | 
|  | 286 | } | 
|  | 287 | } | 
|  | 288 |  | 
|  | 289 | public override void Stop() | 
|  | 290 | { | 
|  | 291 | stop = true; | 
|  | 292 | serverTransport.Close(); | 
|  | 293 | } | 
| Randy Abernethy | 0e86f1f | 2014-07-13 09:50:19 -0700 | [diff] [blame] | 294 | } | 
| David Reiss | 7f42bcf | 2008-01-11 20:59:12 +0000 | [diff] [blame] | 295 | } |