THRIFT-2953 TNamedPipeServerTransport is not Stop()able
Client: C#
Patch: Jens Geyer
This closes #362
diff --git a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
index b3f34eb..c1e8400 100644
--- a/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
+++ b/lib/csharp/src/Transport/TNamedPipeServerTransport.cs
@@ -24,6 +24,7 @@
using System;
using System.Collections.Generic;
using System.IO.Pipes;
+using System.Threading;
namespace Thrift.Transport
{
@@ -33,7 +34,8 @@
/// This is the address of the Pipe on the localhost.
/// </summary>
private readonly string pipeAddress;
- NamedPipeServerStream stream = null;
+ private NamedPipeServerStream stream = null;
+ private bool asyncMode = true;
public TNamedPipeServerTransport(string pipeAddress)
{
@@ -63,11 +65,35 @@
private void EnsurePipeInstance()
{
- if( stream == null)
- stream = new NamedPipeServerStream(
- pipeAddress, PipeDirection.InOut, 254,
- PipeTransmissionMode.Byte,
- PipeOptions.None, 4096, 4096 /*TODO: security*/);
+ if (stream == null)
+ {
+ var direction = PipeDirection.InOut;
+ var maxconn = 254;
+ var mode = PipeTransmissionMode.Byte;
+ var options = asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
+ var inbuf = 4096;
+ var outbuf = 4096;
+ // TODO: security
+
+ try
+ {
+ stream = new NamedPipeServerStream(pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
+ }
+ catch (NotImplementedException) // Mono still does not support async, fallback to sync
+ {
+ if (asyncMode)
+ {
+ options &= (~PipeOptions.Asynchronous);
+ stream = new NamedPipeServerStream(pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
+ asyncMode = false;
+ }
+ else
+ {
+ throw;
+ }
+ }
+
+ }
}
protected override TTransport AcceptImpl()
@@ -75,11 +101,50 @@
try
{
EnsurePipeInstance();
- stream.WaitForConnection();
- var trans = new ServerTransport(stream);
+
+ if (asyncMode)
+ {
+ var evt = new ManualResetEvent(false);
+ Exception eOuter = null;
+
+ stream.BeginWaitForConnection(asyncResult =>
+ {
+ try
+ {
+ if (stream != null)
+ stream.EndWaitForConnection(asyncResult);
+ else
+ eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
+ }
+ catch (Exception e)
+ {
+ if (stream != null)
+ eOuter = e;
+ else
+ eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
+ }
+ evt.Set();
+ }, null);
+
+ evt.WaitOne();
+
+ if (eOuter != null)
+ throw eOuter; // rethrow exception
+ }
+ else
+ {
+ stream.WaitForConnection();
+ }
+
+ var trans = new ServerTransport(stream,asyncMode);
stream = null; // pass ownership to ServerTransport
return trans;
}
+ catch (TTransportException)
+ {
+ Close();
+ throw;
+ }
catch (Exception e)
{
Close();
@@ -89,15 +154,18 @@
private class ServerTransport : TTransport
{
- private NamedPipeServerStream server;
- public ServerTransport(NamedPipeServerStream server)
+ private NamedPipeServerStream stream;
+ private bool asyncMode;
+
+ public ServerTransport(NamedPipeServerStream stream, bool asyncMode)
{
- this.server = server;
+ this.stream = stream;
+ this.asyncMode = asyncMode;
}
public override bool IsOpen
{
- get { return server != null && server.IsConnected; }
+ get { return stream != null && stream.IsConnected; }
}
public override void Open()
@@ -106,30 +174,102 @@
public override void Close()
{
- if (server != null) server.Close();
+ if (stream != null)
+ stream.Close();
}
public override int Read(byte[] buf, int off, int len)
{
- if (server == null)
+ if (stream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- return server.Read(buf, off, len);
+
+ if (asyncMode)
+ {
+ Exception eOuter = null;
+ var evt = new ManualResetEvent(false);
+ int retval = 0;
+
+ stream.BeginRead(buf, off, len, asyncResult =>
+ {
+ try
+ {
+ if (stream != null)
+ retval = stream.EndRead(asyncResult);
+ else
+ eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
+ }
+ catch (Exception e)
+ {
+ if (stream != null)
+ eOuter = e;
+ else
+ eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
+ }
+ evt.Set();
+ }, null);
+
+ evt.WaitOne();
+
+ if (eOuter != null)
+ throw eOuter; // rethrow exception
+ else
+ return retval;
+ }
+ else
+ {
+ return stream.Read(buf, off, len);
+ }
}
public override void Write(byte[] buf, int off, int len)
{
- if (server == null)
+ if (stream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- server.Write(buf, off, len);
+
+ if (asyncMode)
+ {
+ Exception eOuter = null;
+ var evt = new ManualResetEvent(false);
+
+ stream.BeginWrite(buf, off, len, asyncResult =>
+ {
+ try
+ {
+ if (stream != null)
+ stream.EndWrite(asyncResult);
+ else
+ eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted);
+ }
+ catch (Exception e)
+ {
+ if (stream != null)
+ eOuter = e;
+ else
+ eOuter = new TTransportException(TTransportException.ExceptionType.Interrupted, e.Message);
+ }
+ evt.Set();
+ }, null);
+
+ evt.WaitOne();
+
+ if (eOuter != null)
+ throw eOuter; // rethrow exception
+ }
+ else
+ {
+ stream.Write(buf, off, len);
+ }
+
}
protected override void Dispose(bool disposing)
{
- server.Dispose();
+ if (stream != null)
+ stream.Dispose();
}
}
}