blob: f8ed8e281a59dd6ca12bab25bb876b43e6b72200 [file] [log] [blame]
Kevin Clarkab4460d2009-03-20 02:28:41 +00001/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
David Reiss63191332009-01-06 19:49:22 +000020using System;
21using System.Collections.Generic;
David Reiss63191332009-01-06 19:49:22 +000022using System.Threading;
David Reissd831a212009-02-13 03:09:52 +000023using Thrift.Collections;
David Reiss63191332009-01-06 19:49:22 +000024using Thrift.Protocol;
25using Thrift.Transport;
26
27namespace Thrift.Server
28{
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070029 /// <summary>
30 /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests
31 /// </summary>
32 public class TThreadedServer : TServer
33 {
34 private const int DEFAULT_MAX_THREADS = 100;
35 private volatile bool stop = false;
36 private readonly int maxThreads;
David Reiss63191332009-01-06 19:49:22 +000037
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070038 private Queue<TTransport> clientQueue;
39 private THashSet<Thread> clientThreads;
40 private object clientLock;
41 private Thread workerThread;
David Reiss63191332009-01-06 19:49:22 +000042
Jens Geyerc7cf3792015-03-07 13:18:02 +010043 public int ClientThreadsCount {
44 get { return clientThreads.Count; }
45 }
46
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070047 public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
48 : this(processor, serverTransport,
49 new TTransportFactory(), new TTransportFactory(),
50 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
51 DEFAULT_MAX_THREADS, DefaultLogDelegate)
52 {
53 }
David Reiss63191332009-01-06 19:49:22 +000054
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070055 public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
56 : this(processor, serverTransport,
57 new TTransportFactory(), new TTransportFactory(),
58 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
59 DEFAULT_MAX_THREADS, logDelegate)
60 {
61 }
David Reiss63191332009-01-06 19:49:22 +000062
63
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070064 public TThreadedServer(TProcessor processor,
65 TServerTransport serverTransport,
66 TTransportFactory transportFactory,
67 TProtocolFactory protocolFactory)
68 : this(processor, serverTransport,
69 transportFactory, transportFactory,
70 protocolFactory, protocolFactory,
71 DEFAULT_MAX_THREADS, DefaultLogDelegate)
72 {
73 }
David Reiss63191332009-01-06 19:49:22 +000074
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070075 public TThreadedServer(TProcessor processor,
76 TServerTransport serverTransport,
77 TTransportFactory inputTransportFactory,
78 TTransportFactory outputTransportFactory,
79 TProtocolFactory inputProtocolFactory,
80 TProtocolFactory outputProtocolFactory,
81 int maxThreads, LogDelegate logDel)
82 : base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
83 inputProtocolFactory, outputProtocolFactory, logDel)
84 {
85 this.maxThreads = maxThreads;
86 clientQueue = new Queue<TTransport>();
87 clientLock = new object();
88 clientThreads = new THashSet<Thread>();
89 }
David Reiss63191332009-01-06 19:49:22 +000090
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070091 /// <summary>
92 /// Use new Thread for each new client connection. block until numConnections < maxThreads
93 /// </summary>
94 public override void Serve()
95 {
96 try
97 {
98 //start worker thread
99 workerThread = new Thread(new ThreadStart(Execute));
100 workerThread.Start();
101 serverTransport.Listen();
102 }
103 catch (TTransportException ttx)
104 {
105 logDelegate("Error, could not listen on ServerTransport: " + ttx);
106 return;
107 }
David Reiss63191332009-01-06 19:49:22 +0000108
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700109 //Fire the preServe server event when server is up but before any client connections
110 if (serverEventHandler != null)
111 serverEventHandler.preServe();
David Reiss63191332009-01-06 19:49:22 +0000112
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700113 while (!stop)
114 {
115 int failureCount = 0;
116 try
117 {
118 TTransport client = serverTransport.Accept();
119 lock (clientLock)
120 {
121 clientQueue.Enqueue(client);
122 Monitor.Pulse(clientLock);
123 }
124 }
125 catch (TTransportException ttx)
126 {
Jens Geyer7d882082015-01-27 22:08:44 +0100127 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700128 {
129 ++failureCount;
130 logDelegate(ttx.ToString());
131 }
David Reiss63191332009-01-06 19:49:22 +0000132
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700133 }
134 }
David Reiss63191332009-01-06 19:49:22 +0000135
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700136 if (stop)
137 {
138 try
139 {
140 serverTransport.Close();
141 }
142 catch (TTransportException ttx)
143 {
144 logDelegate("TServeTransport failed on close: " + ttx.Message);
145 }
146 stop = false;
147 }
148 }
David Reiss63191332009-01-06 19:49:22 +0000149
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700150 /// <summary>
151 /// Loops on processing a client forever
152 /// threadContext will be a TTransport instance
153 /// </summary>
154 /// <param name="threadContext"></param>
155 private void Execute()
156 {
157 while (!stop)
158 {
159 TTransport client;
160 Thread t;
161 lock (clientLock)
162 {
163 //don't dequeue if too many connections
164 while (clientThreads.Count >= maxThreads)
165 {
166 Monitor.Wait(clientLock);
167 }
David Reiss63191332009-01-06 19:49:22 +0000168
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700169 while (clientQueue.Count == 0)
170 {
171 Monitor.Wait(clientLock);
172 }
David Reiss63191332009-01-06 19:49:22 +0000173
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700174 client = clientQueue.Dequeue();
175 t = new Thread(new ParameterizedThreadStart(ClientWorker));
176 clientThreads.Add(t);
177 }
178 //start processing requests from client on new thread
179 t.Start(client);
180 }
181 }
182
183 private void ClientWorker(Object context)
184 {
185 TTransport client = (TTransport)context;
186 TTransport inputTransport = null;
187 TTransport outputTransport = null;
188 TProtocol inputProtocol = null;
189 TProtocol outputProtocol = null;
190 Object connectionContext = null;
191 try
192 {
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000193 using (inputTransport = inputTransportFactory.GetTransport(client))
194 {
195 using (outputTransport = outputTransportFactory.GetTransport(client))
196 {
197 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
198 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700199
200 //Recover event handler (if any) and fire createContext server event when a client connects
201 if (serverEventHandler != null)
202 connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
203
204 //Process client requests until client disconnects
Jens Geyer7d882082015-01-27 22:08:44 +0100205 while (!stop)
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000206 {
Jens Geyerd5436f52014-10-03 19:50:38 +0200207 if (!inputTransport.Peek())
Jens Geyereb8e5ad2014-09-29 21:50:15 +0200208 break;
209
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700210 //Fire processContext server event
211 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
212 //That is to say it may be many minutes between the event firing and the client request
213 //actually arriving or the client may hang up without ever makeing a request.
214 if (serverEventHandler != null)
215 serverEventHandler.processContext(connectionContext, inputTransport);
216 //Process client request (blocks until transport is readable)
217 if (!processor.Process(inputProtocol, outputProtocol))
218 break;
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000219 }
220 }
221 }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700222 }
223 catch (TTransportException)
224 {
225 //Usually a client disconnect, expected
226 }
227 catch (Exception x)
228 {
229 //Unexpected
230 logDelegate("Error: " + x);
231 }
David Reiss63191332009-01-06 19:49:22 +0000232
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700233 //Fire deleteContext server event after client disconnects
234 if (serverEventHandler != null)
235 serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
David Reiss63191332009-01-06 19:49:22 +0000236
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700237 lock (clientLock)
238 {
239 clientThreads.Remove(Thread.CurrentThread);
240 Monitor.Pulse(clientLock);
241 }
242 return;
243 }
244
245 public override void Stop()
246 {
247 stop = true;
248 serverTransport.Close();
249 //clean up all the threads myself
250 workerThread.Abort();
251 foreach (Thread t in clientThreads)
252 {
253 t.Abort();
254 }
255 }
256 }
David Reiss63191332009-01-06 19:49:22 +0000257}