blob: 3a1734acab181732b046faa1ba008dc64285ef70 [file] [log] [blame]
Kevin Clarkab4460d2009-03-20 02:28:41 +00001/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
David Reiss63191332009-01-06 19:49:22 +000020using System;
21using System.Collections.Generic;
David Reiss63191332009-01-06 19:49:22 +000022using System.Threading;
David Reissd831a212009-02-13 03:09:52 +000023using Thrift.Collections;
David Reiss63191332009-01-06 19:49:22 +000024using Thrift.Protocol;
25using Thrift.Transport;
26
27namespace Thrift.Server
28{
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070029 /// <summary>
30 /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests
31 /// </summary>
32 public class TThreadedServer : TServer
33 {
34 private const int DEFAULT_MAX_THREADS = 100;
35 private volatile bool stop = false;
36 private readonly int maxThreads;
David Reiss63191332009-01-06 19:49:22 +000037
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070038 private Queue<TTransport> clientQueue;
39 private THashSet<Thread> clientThreads;
40 private object clientLock;
41 private Thread workerThread;
David Reiss63191332009-01-06 19:49:22 +000042
Jens Geyerc7cf3792015-03-07 13:18:02 +010043 public int ClientThreadsCount {
44 get { return clientThreads.Count; }
45 }
46
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070047 public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
Jonathan Heard2bfd7df2015-10-28 17:34:27 +000048 : this(new TSingletonProcessorFactory(processor), serverTransport,
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070049 new TTransportFactory(), new TTransportFactory(),
50 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
51 DEFAULT_MAX_THREADS, DefaultLogDelegate)
52 {
53 }
David Reiss63191332009-01-06 19:49:22 +000054
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070055 public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
Jonathan Heard2bfd7df2015-10-28 17:34:27 +000056 : this(new TSingletonProcessorFactory(processor), serverTransport,
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070057 new TTransportFactory(), new TTransportFactory(),
58 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
59 DEFAULT_MAX_THREADS, logDelegate)
60 {
61 }
David Reiss63191332009-01-06 19:49:22 +000062
63
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070064 public TThreadedServer(TProcessor processor,
65 TServerTransport serverTransport,
66 TTransportFactory transportFactory,
67 TProtocolFactory protocolFactory)
Jonathan Heard2bfd7df2015-10-28 17:34:27 +000068 : this(new TSingletonProcessorFactory(processor), serverTransport,
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070069 transportFactory, transportFactory,
70 protocolFactory, protocolFactory,
71 DEFAULT_MAX_THREADS, DefaultLogDelegate)
72 {
73 }
David Reiss63191332009-01-06 19:49:22 +000074
Jonathan Heard2bfd7df2015-10-28 17:34:27 +000075 public TThreadedServer(TProcessorFactory processorFactory,
76 TServerTransport serverTransport,
77 TTransportFactory transportFactory,
78 TProtocolFactory protocolFactory)
79 : this(processorFactory, serverTransport,
80 transportFactory, transportFactory,
81 protocolFactory, protocolFactory,
82 DEFAULT_MAX_THREADS, DefaultLogDelegate)
83 {
84 }
85 public TThreadedServer(TProcessorFactory processorFactory,
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070086 TServerTransport serverTransport,
87 TTransportFactory inputTransportFactory,
88 TTransportFactory outputTransportFactory,
89 TProtocolFactory inputProtocolFactory,
90 TProtocolFactory outputProtocolFactory,
91 int maxThreads, LogDelegate logDel)
Jonathan Heard2bfd7df2015-10-28 17:34:27 +000092 : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070093 inputProtocolFactory, outputProtocolFactory, logDel)
94 {
95 this.maxThreads = maxThreads;
96 clientQueue = new Queue<TTransport>();
97 clientLock = new object();
98 clientThreads = new THashSet<Thread>();
99 }
David Reiss63191332009-01-06 19:49:22 +0000100
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700101 /// <summary>
102 /// Use new Thread for each new client connection. block until numConnections < maxThreads
103 /// </summary>
104 public override void Serve()
105 {
106 try
107 {
108 //start worker thread
109 workerThread = new Thread(new ThreadStart(Execute));
110 workerThread.Start();
111 serverTransport.Listen();
112 }
113 catch (TTransportException ttx)
114 {
115 logDelegate("Error, could not listen on ServerTransport: " + ttx);
116 return;
117 }
David Reiss63191332009-01-06 19:49:22 +0000118
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700119 //Fire the preServe server event when server is up but before any client connections
120 if (serverEventHandler != null)
121 serverEventHandler.preServe();
David Reiss63191332009-01-06 19:49:22 +0000122
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700123 while (!stop)
124 {
125 int failureCount = 0;
126 try
127 {
128 TTransport client = serverTransport.Accept();
129 lock (clientLock)
130 {
131 clientQueue.Enqueue(client);
132 Monitor.Pulse(clientLock);
133 }
134 }
135 catch (TTransportException ttx)
136 {
Jens Geyer7d882082015-01-27 22:08:44 +0100137 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700138 {
139 ++failureCount;
140 logDelegate(ttx.ToString());
141 }
David Reiss63191332009-01-06 19:49:22 +0000142
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700143 }
144 }
David Reiss63191332009-01-06 19:49:22 +0000145
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700146 if (stop)
147 {
148 try
149 {
150 serverTransport.Close();
151 }
152 catch (TTransportException ttx)
153 {
154 logDelegate("TServeTransport failed on close: " + ttx.Message);
155 }
156 stop = false;
157 }
158 }
David Reiss63191332009-01-06 19:49:22 +0000159
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700160 /// <summary>
161 /// Loops on processing a client forever
162 /// threadContext will be a TTransport instance
163 /// </summary>
164 /// <param name="threadContext"></param>
165 private void Execute()
166 {
167 while (!stop)
168 {
169 TTransport client;
170 Thread t;
171 lock (clientLock)
172 {
173 //don't dequeue if too many connections
174 while (clientThreads.Count >= maxThreads)
175 {
176 Monitor.Wait(clientLock);
177 }
David Reiss63191332009-01-06 19:49:22 +0000178
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700179 while (clientQueue.Count == 0)
180 {
181 Monitor.Wait(clientLock);
182 }
David Reiss63191332009-01-06 19:49:22 +0000183
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700184 client = clientQueue.Dequeue();
185 t = new Thread(new ParameterizedThreadStart(ClientWorker));
186 clientThreads.Add(t);
187 }
188 //start processing requests from client on new thread
189 t.Start(client);
190 }
191 }
192
193 private void ClientWorker(Object context)
194 {
Jens Geyer1d5113e2018-01-13 01:29:15 +0100195 using( TTransport client = (TTransport)context)
196 {
197 TProcessor processor = processorFactory.GetProcessor(client);
198 TTransport inputTransport = null;
199 TTransport outputTransport = null;
200 TProtocol inputProtocol = null;
201 TProtocol outputProtocol = null;
202 Object connectionContext = null;
203 try
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000204 {
Jens Geyer1d5113e2018-01-13 01:29:15 +0100205 try
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000206 {
Jens Geyer1d5113e2018-01-13 01:29:15 +0100207 inputTransport = inputTransportFactory.GetTransport(client);
208 outputTransport = outputTransportFactory.GetTransport(client);
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000209 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
210 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
Jens Geyer1d5113e2018-01-13 01:29:15 +0100211
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700212 //Recover event handler (if any) and fire createContext server event when a client connects
213 if (serverEventHandler != null)
214 connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
Jens Geyer1d5113e2018-01-13 01:29:15 +0100215
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700216 //Process client requests until client disconnects
Jens Geyer7d882082015-01-27 22:08:44 +0100217 while (!stop)
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000218 {
Jens Geyerd5436f52014-10-03 19:50:38 +0200219 if (!inputTransport.Peek())
Jens Geyereb8e5ad2014-09-29 21:50:15 +0200220 break;
Jens Geyer1d5113e2018-01-13 01:29:15 +0100221
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700222 //Fire processContext server event
223 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
224 //That is to say it may be many minutes between the event firing and the client request
225 //actually arriving or the client may hang up without ever makeing a request.
226 if (serverEventHandler != null)
227 serverEventHandler.processContext(connectionContext, inputTransport);
228 //Process client request (blocks until transport is readable)
229 if (!processor.Process(inputProtocol, outputProtocol))
230 break;
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000231 }
232 }
Jens Geyer1d5113e2018-01-13 01:29:15 +0100233 catch (TTransportException)
234 {
235 //Usually a client disconnect, expected
236 }
237 catch (Exception x)
238 {
239 //Unexpected
240 logDelegate("Error: " + x);
241 }
242
243 //Fire deleteContext server event after client disconnects
244 if (serverEventHandler != null)
245 serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
246
247 lock (clientLock)
248 {
249 clientThreads.Remove(Thread.CurrentThread);
250 Monitor.Pulse(clientLock);
251 }
252
253 }
254 finally
255 {
256 //Close transports
257 if (inputTransport != null)
258 inputTransport.Close();
259 if (outputTransport != null)
260 outputTransport.Close();
261
262 // disposable stuff should be disposed
263 if (inputProtocol != null)
264 inputProtocol.Dispose();
265 if (outputProtocol != null)
266 outputProtocol.Dispose();
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000267 }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700268 }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700269 }
270
271 public override void Stop()
272 {
273 stop = true;
274 serverTransport.Close();
275 //clean up all the threads myself
276 workerThread.Abort();
277 foreach (Thread t in clientThreads)
278 {
279 t.Abort();
280 }
281 }
282 }
David Reiss63191332009-01-06 19:49:22 +0000283}