blob: a09337c4c3c4485aa5a724a4f31aed1899af36e8 [file] [log] [blame]
Kevin Clarkab4460d2009-03-20 02:28:41 +00001/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
David Reiss63191332009-01-06 19:49:22 +000020using System;
21using System.Collections.Generic;
David Reiss63191332009-01-06 19:49:22 +000022using System.Threading;
David Reissd831a212009-02-13 03:09:52 +000023using Thrift.Collections;
David Reiss63191332009-01-06 19:49:22 +000024using Thrift.Protocol;
25using Thrift.Transport;
26
27namespace Thrift.Server
28{
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070029 /// <summary>
30 /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests
31 /// </summary>
32 public class TThreadedServer : TServer
33 {
34 private const int DEFAULT_MAX_THREADS = 100;
35 private volatile bool stop = false;
36 private readonly int maxThreads;
David Reiss63191332009-01-06 19:49:22 +000037
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070038 private Queue<TTransport> clientQueue;
39 private THashSet<Thread> clientThreads;
40 private object clientLock;
41 private Thread workerThread;
David Reiss63191332009-01-06 19:49:22 +000042
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070043 public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
44 : this(processor, serverTransport,
45 new TTransportFactory(), new TTransportFactory(),
46 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
47 DEFAULT_MAX_THREADS, DefaultLogDelegate)
48 {
49 }
David Reiss63191332009-01-06 19:49:22 +000050
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070051 public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
52 : this(processor, serverTransport,
53 new TTransportFactory(), new TTransportFactory(),
54 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
55 DEFAULT_MAX_THREADS, logDelegate)
56 {
57 }
David Reiss63191332009-01-06 19:49:22 +000058
59
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070060 public TThreadedServer(TProcessor processor,
61 TServerTransport serverTransport,
62 TTransportFactory transportFactory,
63 TProtocolFactory protocolFactory)
64 : this(processor, serverTransport,
65 transportFactory, transportFactory,
66 protocolFactory, protocolFactory,
67 DEFAULT_MAX_THREADS, DefaultLogDelegate)
68 {
69 }
David Reiss63191332009-01-06 19:49:22 +000070
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070071 public TThreadedServer(TProcessor processor,
72 TServerTransport serverTransport,
73 TTransportFactory inputTransportFactory,
74 TTransportFactory outputTransportFactory,
75 TProtocolFactory inputProtocolFactory,
76 TProtocolFactory outputProtocolFactory,
77 int maxThreads, LogDelegate logDel)
78 : base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
79 inputProtocolFactory, outputProtocolFactory, logDel)
80 {
81 this.maxThreads = maxThreads;
82 clientQueue = new Queue<TTransport>();
83 clientLock = new object();
84 clientThreads = new THashSet<Thread>();
85 }
David Reiss63191332009-01-06 19:49:22 +000086
Randy Abernethy0e86f1f2014-07-13 09:50:19 -070087 /// <summary>
88 /// Use new Thread for each new client connection. block until numConnections < maxThreads
89 /// </summary>
90 public override void Serve()
91 {
92 try
93 {
94 //start worker thread
95 workerThread = new Thread(new ThreadStart(Execute));
96 workerThread.Start();
97 serverTransport.Listen();
98 }
99 catch (TTransportException ttx)
100 {
101 logDelegate("Error, could not listen on ServerTransport: " + ttx);
102 return;
103 }
David Reiss63191332009-01-06 19:49:22 +0000104
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700105 //Fire the preServe server event when server is up but before any client connections
106 if (serverEventHandler != null)
107 serverEventHandler.preServe();
David Reiss63191332009-01-06 19:49:22 +0000108
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700109 while (!stop)
110 {
111 int failureCount = 0;
112 try
113 {
114 TTransport client = serverTransport.Accept();
115 lock (clientLock)
116 {
117 clientQueue.Enqueue(client);
118 Monitor.Pulse(clientLock);
119 }
120 }
121 catch (TTransportException ttx)
122 {
Jens Geyer7d882082015-01-27 22:08:44 +0100123 if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700124 {
125 ++failureCount;
126 logDelegate(ttx.ToString());
127 }
David Reiss63191332009-01-06 19:49:22 +0000128
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700129 }
130 }
David Reiss63191332009-01-06 19:49:22 +0000131
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700132 if (stop)
133 {
134 try
135 {
136 serverTransport.Close();
137 }
138 catch (TTransportException ttx)
139 {
140 logDelegate("TServeTransport failed on close: " + ttx.Message);
141 }
142 stop = false;
143 }
144 }
David Reiss63191332009-01-06 19:49:22 +0000145
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700146 /// <summary>
147 /// Loops on processing a client forever
148 /// threadContext will be a TTransport instance
149 /// </summary>
150 /// <param name="threadContext"></param>
151 private void Execute()
152 {
153 while (!stop)
154 {
155 TTransport client;
156 Thread t;
157 lock (clientLock)
158 {
159 //don't dequeue if too many connections
160 while (clientThreads.Count >= maxThreads)
161 {
162 Monitor.Wait(clientLock);
163 }
David Reiss63191332009-01-06 19:49:22 +0000164
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700165 while (clientQueue.Count == 0)
166 {
167 Monitor.Wait(clientLock);
168 }
David Reiss63191332009-01-06 19:49:22 +0000169
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700170 client = clientQueue.Dequeue();
171 t = new Thread(new ParameterizedThreadStart(ClientWorker));
172 clientThreads.Add(t);
173 }
174 //start processing requests from client on new thread
175 t.Start(client);
176 }
177 }
178
179 private void ClientWorker(Object context)
180 {
181 TTransport client = (TTransport)context;
182 TTransport inputTransport = null;
183 TTransport outputTransport = null;
184 TProtocol inputProtocol = null;
185 TProtocol outputProtocol = null;
186 Object connectionContext = null;
187 try
188 {
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000189 using (inputTransport = inputTransportFactory.GetTransport(client))
190 {
191 using (outputTransport = outputTransportFactory.GetTransport(client))
192 {
193 inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
194 outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700195
196 //Recover event handler (if any) and fire createContext server event when a client connects
197 if (serverEventHandler != null)
198 connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
199
200 //Process client requests until client disconnects
Jens Geyer7d882082015-01-27 22:08:44 +0100201 while (!stop)
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000202 {
Jens Geyerd5436f52014-10-03 19:50:38 +0200203 if (!inputTransport.Peek())
Jens Geyereb8e5ad2014-09-29 21:50:15 +0200204 break;
205
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700206 //Fire processContext server event
207 //N.B. This is the pattern implemented in C++ and the event fires provisionally.
208 //That is to say it may be many minutes between the event firing and the client request
209 //actually arriving or the client may hang up without ever makeing a request.
210 if (serverEventHandler != null)
211 serverEventHandler.processContext(connectionContext, inputTransport);
212 //Process client request (blocks until transport is readable)
213 if (!processor.Process(inputProtocol, outputProtocol))
214 break;
Roger Meierb1ec4cc2012-04-11 21:21:41 +0000215 }
216 }
217 }
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700218 }
219 catch (TTransportException)
220 {
221 //Usually a client disconnect, expected
222 }
223 catch (Exception x)
224 {
225 //Unexpected
226 logDelegate("Error: " + x);
227 }
David Reiss63191332009-01-06 19:49:22 +0000228
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700229 //Fire deleteContext server event after client disconnects
230 if (serverEventHandler != null)
231 serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
David Reiss63191332009-01-06 19:49:22 +0000232
Randy Abernethy0e86f1f2014-07-13 09:50:19 -0700233 lock (clientLock)
234 {
235 clientThreads.Remove(Thread.CurrentThread);
236 Monitor.Pulse(clientLock);
237 }
238 return;
239 }
240
241 public override void Stop()
242 {
243 stop = true;
244 serverTransport.Close();
245 //clean up all the threads myself
246 workerThread.Abort();
247 foreach (Thread t in clientThreads)
248 {
249 t.Abort();
250 }
251 }
252 }
David Reiss63191332009-01-06 19:49:22 +0000253}