THRIFT-2953 TNamedPipeServerTransport is not Stop()able
Client: C#
Patch: Jens Geyer

This closes #362
diff --git a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
index b3f34eb..c1e8400 100644
--- a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
+++ b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
@@ -24,6 +24,7 @@
 using System;
 using System.Collections.Generic;
 using System.IO.Pipes;
+using System.Threading;
 
 namespace Thrift.Transport
 {
@@ -33,7 +34,8 @@
         /// This is the address of the Pipe on the localhost.
         /// </summary>
         private readonly string pipeAddress;
-        NamedPipeServerStream stream = null;
+        private NamedPipeServerStream stream = null;
+        private bool asyncMode = true;
 
         public TNamedPipeServerTransport(string pipeAddress)
         {
@@ -63,11 +65,35 @@
 
         private void EnsurePipeInstance()
         {
-            if( stream == null)
-                stream = new NamedPipeServerStream(
-                    pipeAddress, PipeDirection.InOut, 254,
-                    PipeTransmissionMode.Byte,
-                    PipeOptions.None, 4096, 4096 /*TODO: security*/);
+            if (stream == null)
+            {
+                var direction = PipeDirection.InOut;
+                var maxconn = 254;
+                var mode = PipeTransmissionMode.Byte;
+                var options = asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
+                var inbuf = 4096;
+                var outbuf = 4096;
+                // TODO: security
+
+                try
+                {
+                    stream = new NamedPipeServerStream(pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
+                }
+                catch (NotImplementedException)  // Mono still does not support async, fallback to sync
+                {
+                    if (asyncMode)
+                    {
+                        options &= (~PipeOptions.Asynchronous);
+                        stream = new NamedPipeServerStream(pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
+                        asyncMode = false;
+                    }
+                    else
+                    {
+                        throw;
+                    }
+                }
+
+            }
         }
 
         protected override TTransport AcceptImpl()
@@ -75,11 +101,50 @@
             try
             {
                 EnsurePipeInstance();
-                stream.WaitForConnection();
-                var trans = new ServerTransport(stream);
+
+                if (asyncMode)
+                {
+                    var evt = new ManualResetEvent(false);
+                    Exception eOuter = null;
+
+                    stream.BeginWaitForConnection(asyncResult =>
+                    {
+                        try
+                        {
+                            if (stream != null)
+                                stream.EndWaitForConnection(asyncResult);
+                            else
+                                eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
+                        }
+                        catch (Exception e)
+                        {
+                            if (stream != null)
+                                eOuter = e;
+                            else
+                                eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
+                        }
+                        evt.Set();
+                    }, null);
+
+                    evt.WaitOne();
+
+                    if (eOuter != null)
+                        throw eOuter; // rethrow exception
+                }
+                else
+                {
+                    stream.WaitForConnection();
+                }
+
+                var trans = new ServerTransport(stream,asyncMode);
                 stream = null;  // pass ownership to ServerTransport
                 return trans;
             }
+            catch (TTransportException)
+            {
+                Close();
+                throw;
+            }
             catch (Exception e)
             {
                 Close();
@@ -89,15 +154,18 @@
 
         private class ServerTransport : TTransport
         {
-            private NamedPipeServerStream server;
-            public ServerTransport(NamedPipeServerStream server)
+            private NamedPipeServerStream stream;
+            private bool asyncMode;
+
+            public ServerTransport(NamedPipeServerStream stream, bool asyncMode)
             {
-                this.server = server;
+                this.stream = stream;
+                this.asyncMode = asyncMode;
             }
 
             public override bool IsOpen
             {
-                get { return server != null && server.IsConnected; }
+                get { return stream != null && stream.IsConnected; }
             }
 
             public override void Open()
@@ -106,30 +174,102 @@
 
             public override void Close()
             {
-                if (server != null) server.Close();
+                if (stream != null)
+                    stream.Close();
             }
 
             public override int Read(byte[] buf, int off, int len)
             {
-                if (server == null)
+                if (stream == null)
                 {
                     throw new TTransportException(TTransportException.ExceptionType.NotOpen);
                 }
-                return server.Read(buf, off, len);
+
+                if (asyncMode)
+                {
+                    Exception eOuter = null;
+                    var evt = new ManualResetEvent(false);
+                    int retval = 0;
+
+                    stream.BeginRead(buf, off, len, asyncResult =>
+                    {
+                        try
+                        {
+                            if (stream != null)
+                                retval = stream.EndRead(asyncResult);
+                            else
+                                eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
+                        }
+                        catch (Exception e)
+                        {
+                            if (stream != null)
+                                eOuter = e;
+                            else
+                                eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
+                        }
+                        evt.Set();
+                    }, null);
+
+                    evt.WaitOne();
+
+                    if (eOuter != null)
+                        throw eOuter; // rethrow exception
+                    else
+                        return retval;
+                }
+                else
+                {
+                    return stream.Read(buf, off, len);
+                }
             }
 
             public override void Write(byte[] buf, int off, int len)
             {
-                if (server == null)
+                if (stream == null)
                 {
                     throw new TTransportException(TTransportException.ExceptionType.NotOpen);
                 }
-                server.Write(buf, off, len);
+
+                if (asyncMode)
+                {
+                    Exception eOuter = null;
+                    var evt = new ManualResetEvent(false);
+
+                    stream.BeginWrite(buf, off, len, asyncResult =>
+                    {
+                        try
+                        {
+                            if (stream != null)
+                                stream.EndWrite(asyncResult);
+                            else
+                                eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
+                        }
+                        catch (Exception e)
+                        {
+                            if (stream != null)
+                                eOuter = e;
+                            else
+                                eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
+                        }
+                        evt.Set();
+                    }, null);
+
+                    evt.WaitOne();
+
+                    if (eOuter != null)
+                        throw eOuter; // rethrow exception
+                }
+                else
+                {
+                    stream.Write(buf, off, len);
+                }
+
             }
 
             protected override void Dispose(bool disposing)
             {
-                server.Dispose();
+                if (stream != null)
+                    stream.Dispose();
             }
         }
     }