blob: cc051a33beebf09a3e7de41a950c515c7f2cab02 [file] [log] [blame]
Kevin Clarkab4460d2009-03-20 02:28:41 +00001/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
David Reiss63191332009-01-06 19:49:22 +000020using System;
21using System.Collections.Generic;
David Reiss63191332009-01-06 19:49:22 +000022using System.Threading;
David Reissd831a212009-02-13 03:09:52 +000023using Thrift.Collections;
David Reiss63191332009-01-06 19:49:22 +000024using Thrift.Protocol;
25using Thrift.Transport;
26
27namespace Thrift.Server
28{
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070029 /// <summary>
Christian Weiss8fb719e2018-03-30 21:26:04 +020030 /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests.
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070031 /// </summary>
Christian Weiss8fb719e2018-03-30 21:26:04 +020032 public class TThreadedServer : TServer
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070033 {
Christian Weiss8fb719e2018-03-30 21:26:04 +020034 private const int DEFAULT_MAX_THREADS = 100;
35 private volatile bool stop = false;
36 private readonly int maxThreads;
David Reiss63191332009-01-06 19:49:22 +000037
Christian Weiss8fb719e2018-03-30 21:26:04 +020038 private Queue<TTransport> clientQueue;
39 private THashSet<Thread> clientThreads;
40 private object clientLock;
41 private Thread workerThread;
David Reiss63191332009-01-06 19:49:22 +000042
Christian Weiss8fb719e2018-03-30 21:26:04 +020043 public int ClientThreadsCount
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070044 {
Christian Weiss8fb719e2018-03-30 21:26:04 +020045 get { return clientThreads.Count; }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070046 }
David Reiss63191332009-01-06 19:49:22 +000047
Christian Weiss8fb719e2018-03-30 21:26:04 +020048 public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
49 : this(new TSingletonProcessorFactory(processor), serverTransport,
50 new TTransportFactory(), new TTransportFactory(),
51 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
52 DEFAULT_MAX_THREADS, DefaultLogDelegate)
53 {
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070054 }
David Reiss63191332009-01-06 19:49:22 +000055
Christian Weiss8fb719e2018-03-30 21:26:04 +020056 public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
57 : this(new TSingletonProcessorFactory(processor), serverTransport,
58 new TTransportFactory(), new TTransportFactory(),
59 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
60 DEFAULT_MAX_THREADS, logDelegate)
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070061 {
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070062 }
Christian Weiss8fb719e2018-03-30 21:26:04 +020063
64
65 public TThreadedServer(TProcessor processor,
66 TServerTransport serverTransport,
67 TTransportFactory transportFactory,
68 TProtocolFactory protocolFactory)
69 : this(new TSingletonProcessorFactory(processor), serverTransport,
70 transportFactory, transportFactory,
71 protocolFactory, protocolFactory,
72 DEFAULT_MAX_THREADS, DefaultLogDelegate)
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070073 {
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070074 }
David Reiss63191332009-01-06 19:49:22 +000075
Christian Weiss8fb719e2018-03-30 21:26:04 +020076 public TThreadedServer(TProcessorFactory processorFactory,
77 TServerTransport serverTransport,
78 TTransportFactory transportFactory,
79 TProtocolFactory protocolFactory)
80 : this(processorFactory, serverTransport,
81 transportFactory, transportFactory,
82 protocolFactory, protocolFactory,
83 DEFAULT_MAX_THREADS, DefaultLogDelegate)
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070084 {
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070085 }
Christian Weiss8fb719e2018-03-30 21:26:04 +020086 public TThreadedServer(TProcessorFactory processorFactory,
87 TServerTransport serverTransport,
88 TTransportFactory inputTransportFactory,
89 TTransportFactory outputTransportFactory,
90 TProtocolFactory inputProtocolFactory,
91 TProtocolFactory outputProtocolFactory,
92 int maxThreads, LogDelegate logDel)
93 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
94 inputProtocolFactory, outputProtocolFactory, logDel)
Roger Meierb1ec4cc2012-04-11 21:21:41 +000095 {
Christian Weiss8fb719e2018-03-30 21:26:04 +020096 this.maxThreads = maxThreads;
97 clientQueue = new Queue<TTransport>();
98 clientLock = new object();
99 clientThreads = new THashSet<Thread>();
100 }
101
102 /// <summary>
103 /// Use new Thread for each new client connection. block until numConnections &lt; maxThreads.
104 /// </summary>
105 public override void Serve()
106 {
107 try
108 {
109 //start worker thread
110 workerThread = new Thread(new ThreadStart(Execute));
111 workerThread.Start();
112 serverTransport.Listen();
113 }
114 catch (TTransportException ttx)
115 {
116 logDelegate("Error, could not listen on ServerTransport: " + ttx);
117 return;
118 }
119
120 //Fire the preServe server event when server is up but before any client connections
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700121 if (serverEventHandler != null)
Christian Weiss8fb719e2018-03-30 21:26:04 +0200122 serverEventHandler.preServe();
123
Jens Geyer7d882082015-01-27 22:08:44 +0100124 while (!stop)
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000125 {
Christian Weiss8fb719e2018-03-30 21:26:04 +0200126 int failureCount = 0;
127 try
128 {
129 TTransport client = serverTransport.Accept();
130 lock (clientLock)
131 {
132 clientQueue.Enqueue(client);
133 Monitor.Pulse(clientLock);
134 }
135 }
136 catch (TTransportException ttx)
137 {
138 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
139 {
140 ++failureCount;
141 logDelegate(ttx.ToString());
142 }
143
144 }
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000145 }
Jens Geyer1d5113e2018-01-13 01:29:15 +0100146
Christian Weiss8fb719e2018-03-30 21:26:04 +0200147 if (stop)
148 {
149 try
150 {
151 serverTransport.Close();
152 }
153 catch (TTransportException ttx)
154 {
155 logDelegate("TServeTransport failed on close: " + ttx.Message);
156 }
157 stop = false;
158 }
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000159 }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700160
Christian Weiss8fb719e2018-03-30 21:26:04 +0200161 /// <summary>
162 /// Loops on processing a client forever
163 /// </summary>
164 private void Execute()
165 {
166 while (!stop)
167 {
168 TTransport client;
169 Thread t;
170 lock (clientLock)
171 {
172 //don't dequeue if too many connections
173 while (clientThreads.Count >= maxThreads)
174 {
175 Monitor.Wait(clientLock);
176 }
177
178 while (clientQueue.Count == 0)
179 {
180 Monitor.Wait(clientLock);
181 }
182
183 client = clientQueue.Dequeue();
184 t = new Thread(new ParameterizedThreadStart(ClientWorker));
185 clientThreads.Add(t);
186 }
187 //start processing requests from client on new thread
188 t.Start(client);
189 }
190 }
191
192 private void ClientWorker(object context)
193 {
194 using (TTransport client = (TTransport)context)
195 {
196 TProcessor processor = processorFactory.GetProcessor(client);
197 TTransport inputTransport = null;
198 TTransport outputTransport = null;
199 TProtocol inputProtocol = null;
200 TProtocol outputProtocol = null;
201 object connectionContext = null;
202 try
203 {
204 try
205 {
206 inputTransport = inputTransportFactory.GetTransport(client);
207 outputTransport = outputTransportFactory.GetTransport(client);
208 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
209 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
210
211 //Recover event handler (if any) and fire createContext server event when a client connects
212 if (serverEventHandler != null)
213 connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
214
215 //Process client requests until client disconnects
216 while (!stop)
217 {
218 if (!inputTransport.Peek())
219 break;
220
221 //Fire processContext server event
222 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
223 //That is to say it may be many minutes between the event firing and the client request
224 //actually arriving or the client may hang up without ever makeing a request.
225 if (serverEventHandler != null)
226 serverEventHandler.processContext(connectionContext, inputTransport);
227 //Process client request (blocks until transport is readable)
228 if (!processor.Process(inputProtocol, outputProtocol))
229 break;
230 }
231 }
232 catch (TTransportException)
233 {
234 //Usually a client disconnect, expected
235 }
236 catch (Exception x)
237 {
238 //Unexpected
239 logDelegate("Error: " + x);
240 }
241
242 //Fire deleteContext server event after client disconnects
243 if (serverEventHandler != null)
244 serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
245
246 lock (clientLock)
247 {
248 clientThreads.Remove(Thread.CurrentThread);
249 Monitor.Pulse(clientLock);
250 }
251
252 }
253 finally
254 {
255 //Close transports
256 if (inputTransport != null)
257 inputTransport.Close();
258 if (outputTransport != null)
259 outputTransport.Close();
260
261 // disposable stuff should be disposed
262 if (inputProtocol != null)
263 inputProtocol.Dispose();
264 if (outputProtocol != null)
265 outputProtocol.Dispose();
266 }
267 }
268 }
269
270 public override void Stop()
271 {
272 stop = true;
273 serverTransport.Close();
274 //clean up all the threads myself
275 workerThread.Abort();
276 foreach (Thread t in clientThreads)
277 {
278 t.Abort();
279 }
280 }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700281 }
David Reiss63191332009-01-06 19:49:22 +0000282}