THRIFT-25. csharp: Various compiler and library improvements

Compiler:
- Thrift structures are serializable.
- The member fields of thrift structures are now private and only accessible
  through Properties, which keep the appropriate __isset up to date.

Library
- Addition of TBufferedTransport, which can be used to wrap other Transports.
- Addition of TThreadedServer, which manually manages threads instead of
  relying on .NET ThreadPool.
- Servers use a log delegate that defaults to System.Console but allows
  servers to use log4net without introducing the dependency.

ThriftTest Visual Studio Project
- Test client and server that use ThriftTest.thrift. The project references
  thrift.exe and Thrift.dll from the subversion tree and automatically builds
  generated code. This makes it very easy to test changes in both the compiler
  and library.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@732079 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/csharp/src/Server/TServer.cs b/lib/csharp/src/Server/TServer.cs
index a9b2816..afb2b9f 100644
--- a/lib/csharp/src/Server/TServer.cs
+++ b/lib/csharp/src/Server/TServer.cs
@@ -14,6 +14,7 @@
 using System.Collections.Generic;
 using Thrift.Protocol;
 using Thrift.Transport;
+using System.IO;
 
 namespace Thrift.Server
 {
@@ -48,6 +49,8 @@
 		 * Output Protocol Factory
 		 */
 		protected TProtocolFactory outputProtocolFactory;
+		public delegate void LogDelegate(string str);
+		protected LogDelegate logDelegate;
 
 		/**
 		 * Default constructors.
@@ -55,7 +58,14 @@
 
 		public TServer(TProcessor processor,
 						  TServerTransport serverTransport)
-			:this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory())
+			:this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
+		{
+		}
+
+		public TServer(TProcessor processor,
+						TServerTransport serverTransport,
+						LogDelegate logDelegate)
+			: this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
 		{
 		}
 
@@ -67,7 +77,8 @@
 				 transportFactory,
 				 transportFactory,
 				 new TBinaryProtocol.Factory(),
-				 new TBinaryProtocol.Factory())
+				 new TBinaryProtocol.Factory(),
+				 DefaultLogDelegate)
 		{
 		}
 
@@ -80,7 +91,8 @@
 				 transportFactory,
 				 transportFactory,
 				 protocolFactory,
-				 protocolFactory)
+				 protocolFactory,
+			     DefaultLogDelegate)
 		{
 		}
 
@@ -89,7 +101,8 @@
 						  TTransportFactory inputTransportFactory,
 						  TTransportFactory outputTransportFactory,
 						  TProtocolFactory inputProtocolFactory,
-						  TProtocolFactory outputProtocolFactory)
+						  TProtocolFactory outputProtocolFactory,
+						  LogDelegate logDelegate)
 		{
 			this.processor = processor;
 			this.serverTransport = serverTransport;
@@ -97,6 +110,7 @@
 			this.outputTransportFactory = outputTransportFactory;
 			this.inputProtocolFactory = inputProtocolFactory;
 			this.outputProtocolFactory = outputProtocolFactory;
+			this.logDelegate = logDelegate;
 		}
 
 		/**
@@ -105,6 +119,11 @@
 		public abstract void Serve();
 
 		public abstract void Stop();
+
+		protected static void DefaultLogDelegate(string s)
+		{
+			Console.Error.WriteLine(s);
+		}
 	}
 }
 
diff --git a/lib/csharp/src/Server/TSimpleServer.cs b/lib/csharp/src/Server/TSimpleServer.cs
index 0679bb2..95f3962 100644
--- a/lib/csharp/src/Server/TSimpleServer.cs
+++ b/lib/csharp/src/Server/TSimpleServer.cs
@@ -23,9 +23,17 @@
 	public class TSimpleServer : TServer
 	{
 		private bool stop = false;
+
 		public TSimpleServer(TProcessor processor,
 						  TServerTransport serverTransport)
-			:base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory())
+			:base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
+		{
+		}
+
+		public TSimpleServer(TProcessor processor,
+							TServerTransport serverTransport,
+							LogDelegate logDel)
+			: base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), logDel)
 		{
 		}
 
@@ -37,7 +45,8 @@
 				 transportFactory,
 				 transportFactory,
 				 new TBinaryProtocol.Factory(),
-				 new TBinaryProtocol.Factory())
+				 new TBinaryProtocol.Factory(),
+			     DefaultLogDelegate)
 		{
 		}
 
@@ -50,7 +59,8 @@
 				 transportFactory,
 				 transportFactory,
 				 protocolFactory,
-				 protocolFactory)
+				 protocolFactory,
+				 DefaultLogDelegate)
 		{
 		}
 
@@ -62,7 +72,7 @@
 			}
 			catch (TTransportException ttx)
 			{
-				Console.Error.WriteLine(ttx);
+				logDelegate(ttx.ToString());
 				return;
 			}
 
@@ -85,13 +95,17 @@
 						while (processor.Process(inputProtocol, outputProtocol)) { }
 					}
 				}
-				catch (TTransportException)
+				catch (TTransportException ttx)
 				{
 					// Client died, just move on
+					if (stop)
+					{
+						logDelegate("TSimpleServer was shutting down, caught " + ttx.GetType().Name);
+					}
 				}
 				catch (Exception x)
 				{
-					Console.Error.WriteLine(x);
+					logDelegate(x.ToString());
 				}
 
 				if (inputTransport != null)
@@ -113,7 +127,7 @@
 				}
 				catch (TTransportException ttx)
 				{
-					Console.Error.WriteLine("TServerTrasnport failed on close: " + ttx.Message);
+					logDelegate("TServerTranport failed on close: " + ttx.Message);
 				}
 				stop = false;
 			}
@@ -122,6 +136,7 @@
 		public override void Stop()
 		{
 			stop = true;
+			serverTransport.Close();
 		}
 	}
 }
diff --git a/lib/csharp/src/Server/TThreadPoolServer.cs b/lib/csharp/src/Server/TThreadPoolServer.cs
index ac7aa8b..b44487f 100644
--- a/lib/csharp/src/Server/TThreadPoolServer.cs
+++ b/lib/csharp/src/Server/TThreadPoolServer.cs
@@ -31,10 +31,19 @@
 			:this(processor, serverTransport,
 				 new TTransportFactory(), new TTransportFactory(),
 				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
-				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS)
+				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
 		{
 		}
 
+		public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
+			: this(processor, serverTransport,
+				 new TTransportFactory(), new TTransportFactory(),
+				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, logDelegate)
+		{
+		}
+
+
 		public TThreadPoolServer(TProcessor processor,
 								 TServerTransport serverTransport,
 								 TTransportFactory transportFactory,
@@ -42,7 +51,7 @@
 			:this(processor, serverTransport,
 				 transportFactory, transportFactory,
 				 protocolFactory, protocolFactory,
-				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS)
+				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
 		{
 		}
 
@@ -52,9 +61,9 @@
 								 TTransportFactory outputTransportFactory,
 								 TProtocolFactory inputProtocolFactory,
 								 TProtocolFactory outputProtocolFactory,
-								 int minThreadPoolThreads, int maxThreadPoolThreads)
+								 int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel)
 			:base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
-				  inputProtocolFactory, outputProtocolFactory)
+				  inputProtocolFactory, outputProtocolFactory, logDel)
 		{
 			if (!ThreadPool.SetMinThreads(minThreadPoolThreads, minThreadPoolThreads))
 			{
@@ -64,7 +73,6 @@
 			{
 				throw new Exception("Error: could not SetMaxThreads in ThreadPool");
 			}
-
 		}
 
 		/// <summary>
@@ -78,7 +86,7 @@
 			}
 			catch (TTransportException ttx)
 			{
-				Console.Error.WriteLine("Error, could not listen on ServerTransport: " + ttx);
+				logDelegate("Error, could not listen on ServerTransport: " + ttx);
 				return;
 			}
 
@@ -92,8 +100,16 @@
 				}
 				catch (TTransportException ttx)
 				{
-					++failureCount;
-					Console.Error.WriteLine(ttx);
+					if (stop)
+					{
+						logDelegate("TThreadPoolServer was shutting down, caught " + ttx.GetType().Name);
+					}
+					else
+					{
+						++failureCount;
+						logDelegate(ttx.ToString());
+					}
+
 				}
 			}
 
@@ -105,18 +121,12 @@
 				}
 				catch (TTransportException ttx)
 				{
-					Console.Error.WriteLine("TServerTrasnport failed on close: " + ttx.Message);
+					logDelegate("TServerTransport failed on close: " + ttx.Message);
 				}
 				stop = false;
 			}
 		}
 
-
-		public override void Stop()
-		{
-			stop = true;
-		}
-
 		/// <summary>
 		/// Loops on processing a client forever
 		/// threadContext will be a TTransport instance
@@ -143,11 +153,12 @@
 			catch (TTransportException)
 			{
 				// Assume the client died and continue silently
+				//Console.WriteLine(ttx);
 			}
 			
 			catch (Exception x)
 			{
-				Console.Error.WriteLine("Error: " + x);
+				logDelegate("Error: " + x);
 			}
 
 			if (inputTransport != null)
@@ -159,5 +170,11 @@
 				outputTransport.Close();
 			}
 		}
+
+		public override void Stop()
+		{
+			stop = true;
+			serverTransport.Close();
+		}
 	}
 }
diff --git a/lib/csharp/src/Server/TThreadedServer.cs b/lib/csharp/src/Server/TThreadedServer.cs
new file mode 100644
index 0000000..a4d33a5
--- /dev/null
+++ b/lib/csharp/src/Server/TThreadedServer.cs
@@ -0,0 +1,226 @@
+//
+//  TThreadPoolServer.cs
+//
+//  Begin:  Apr 21, 2008
+//  Authors:
+//		Will Palmeri <wpalmeri@imeem.com>
+//
+//  Distributed under the Thrift Software License
+//
+//  See accompanying file LICENSE or visit the Thrift site at:
+//  http://developers.facebook.com/thrift/using
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using Thrift.Protocol;
+using Thrift.Transport;
+
+namespace Thrift.Server
+{
+	/// <summary>
+	/// Server that uses C# threads (as opposed to the ThreadPool) when handling requests
+	/// </summary>
+	public class TThreadedServer : TServer
+	{
+		private const int DEFAULT_MAX_THREADS = 100;
+		private volatile bool stop = false;
+		private readonly int maxThreads;
+
+		private Queue<TTransport> clientQueue;
+		private HashSet<Thread> clientThreads;
+		private object clientLock;
+		private Thread workerThread;
+
+		public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
+			: this(processor, serverTransport,
+				 new TTransportFactory(), new TTransportFactory(),
+				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+				 DEFAULT_MAX_THREADS, DefaultLogDelegate)
+		{
+		}
+
+		public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
+			: this(processor, serverTransport,
+				 new TTransportFactory(), new TTransportFactory(),
+				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+				 DEFAULT_MAX_THREADS, logDelegate)
+		{
+		}
+
+
+		public TThreadedServer(TProcessor processor,
+								 TServerTransport serverTransport,
+								 TTransportFactory transportFactory,
+								 TProtocolFactory protocolFactory)
+			: this(processor, serverTransport,
+				 transportFactory, transportFactory,
+				 protocolFactory, protocolFactory,
+				 DEFAULT_MAX_THREADS, DefaultLogDelegate)
+		{
+		}
+
+		public TThreadedServer(TProcessor processor,
+								 TServerTransport serverTransport,
+								 TTransportFactory inputTransportFactory,
+								 TTransportFactory outputTransportFactory,
+								 TProtocolFactory inputProtocolFactory,
+								 TProtocolFactory outputProtocolFactory,
+								 int maxThreads, LogDelegate logDel)
+			: base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+				  inputProtocolFactory, outputProtocolFactory, logDel)
+		{
+			this.maxThreads = maxThreads;
+			clientQueue = new Queue<TTransport>();
+			clientLock = new object();
+			clientThreads = new HashSet<Thread>();
+		}
+
+		/// <summary>
+		/// Use new Thread for each new client connection. block until numConnections < maxTHreads
+		/// </summary>
+		public override void Serve()
+		{
+			try
+			{
+				//start worker thread
+				workerThread = new Thread(new ThreadStart(Execute));
+				workerThread.Start();
+				serverTransport.Listen();
+			}
+			catch (TTransportException ttx)
+			{
+				logDelegate("Error, could not listen on ServerTransport: " + ttx);
+				return;
+			}
+
+			while (!stop)
+			{
+				int failureCount = 0;
+				try
+				{
+					TTransport client = serverTransport.Accept();
+					lock (clientLock)
+					{
+						clientQueue.Enqueue(client);
+						Monitor.Pulse(clientLock);
+					}
+				}
+				catch (TTransportException ttx)
+				{
+					if (stop)
+					{
+						logDelegate("TThreadPoolServer was shutting down, caught " + ttx);
+					}
+					else
+					{
+						++failureCount;
+						logDelegate(ttx.ToString());
+					}
+
+				}
+			}
+
+			if (stop)
+			{
+				try
+				{
+					serverTransport.Close();
+				}
+				catch (TTransportException ttx)
+				{
+					logDelegate("TServeTransport failed on close: " + ttx.Message);
+				}
+				stop = false;
+			}
+		}
+
+		/// <summary>
+		/// Loops on processing a client forever
+		/// threadContext will be a TTransport instance
+		/// </summary>
+		/// <param name="threadContext"></param>
+		private void Execute()
+		{
+			while (!stop)
+			{
+				TTransport client;
+				Thread t;
+				lock (clientLock)
+				{
+					//don't dequeue if too many connections
+					while (clientThreads.Count >= maxThreads)
+					{
+						Monitor.Wait(clientLock);
+					}
+
+					while (clientQueue.Count == 0)
+					{
+						Monitor.Wait(clientLock);
+					}
+
+					client = clientQueue.Dequeue();
+					t = new Thread(new ParameterizedThreadStart(ClientWorker));
+					clientThreads.Add(t);
+				}
+				//start processing requests from client on new thread
+				t.Start(client);
+			}
+		}
+
+		private void ClientWorker(Object context)
+		{
+			TTransport client = (TTransport)context;
+			TTransport inputTransport = null;
+			TTransport outputTransport = null;
+			TProtocol inputProtocol = null;
+			TProtocol outputProtocol = null;
+			try
+			{
+				inputTransport = inputTransportFactory.GetTransport(client);
+				outputTransport = outputTransportFactory.GetTransport(client);
+				inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
+				outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
+				while (processor.Process(inputProtocol, outputProtocol))
+				{
+					//keep processing requests until client disconnects
+				}
+			}
+			catch (TTransportException)
+			{
+			}
+			catch (Exception x)
+			{
+				logDelegate("Error: " + x);
+			}
+
+			if (inputTransport != null)
+			{
+				inputTransport.Close();
+			}
+			if (outputTransport != null)
+			{
+				outputTransport.Close();
+			}
+
+			lock (clientLock)
+			{
+				clientThreads.Remove(Thread.CurrentThread);
+				Monitor.Pulse(clientLock);
+			}
+			return;
+		}
+
+		public override void Stop()
+		{
+			stop = true;
+			serverTransport.Close();
+			//clean up all the threads myself
+			workerThread.Abort();
+			foreach (Thread t in clientThreads)
+			{
+				t.Abort();
+			}
+		}
+	}
+}