THRIFT-5020 Refactoring & minor fixes for netstd library
Client: netstd
Patch: Jens Geyer
This closes #1941
diff --git a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
index bdaa348..45e5513 100644
--- a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
@@ -66,7 +66,8 @@
outputTransportFactory,
inputProtocolFactory,
outputProtocolFactory,
- loggerFactory.CreateLogger<TSimpleAsyncServer>())
+ loggerFactory.CreateLogger<TSimpleAsyncServer>(),
+ clientWaitingDelay)
{
}
diff --git a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
index c84df83..4f8454c 100644
--- a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
@@ -99,19 +99,14 @@
public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
- {
return await Task.FromCanceled<int>(cancellationToken);
- }
if (_inputStream == null)
- {
throw new TTransportException(TTransportException.ExceptionType.NotOpen, "No request has been sent");
- }
try
{
var ret = await _inputStream.ReadAsync(buffer, offset, length, cancellationToken);
-
if (ret == -1)
{
throw new TTransportException(TTransportException.ExceptionType.EndOfFile, "No more data available");
diff --git a/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs b/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
index 25895c2..cdbbc0d 100644
--- a/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
@@ -30,14 +30,9 @@
private byte[] Bytes;
private int _bytesUsed;
- public TMemoryBufferTransport()
+ public TMemoryBufferTransport(int initialCapacity = 2048)
{
- Bytes = new byte[2048]; // default size
- }
-
- public TMemoryBufferTransport(int initialCapacity)
- {
- Bytes = new byte[initialCapacity]; // default size
+ Bytes = new byte[initialCapacity];
}
public TMemoryBufferTransport(byte[] buf)
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index 7dfe013..1ae6074 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -26,7 +26,7 @@
public class TNamedPipeTransport : TTransport
{
private NamedPipeClientStream PipeStream;
- private int ConnectTimeout;
+ private readonly int ConnectTimeout;
public TNamedPipeTransport(string pipe, int timeout = Timeout.Infinite)
: this(".", pipe, timeout)
@@ -102,7 +102,10 @@
protected override void Dispose(bool disposing)
{
- PipeStream.Dispose();
+ if(disposing)
+ {
+ PipeStream?.Dispose();
+ }
}
}
}
diff --git a/lib/netstd/Thrift/Transport/Client/TSocketTransport.cs b/lib/netstd/Thrift/Transport/Client/TSocketTransport.cs
index 00da045..dd506bc 100644
--- a/lib/netstd/Thrift/Transport/Client/TSocketTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TSocketTransport.cs
@@ -36,12 +36,7 @@
SetInputOutputStream();
}
- public TSocketTransport(IPAddress host, int port)
- : this(host, port, 0)
- {
- }
-
- public TSocketTransport(IPAddress host, int port, int timeout)
+ public TSocketTransport(IPAddress host, int port, int timeout = 0)
{
Host = host;
Port = port;
@@ -84,7 +79,7 @@
}
}
- public TcpClient TcpClient { get; private set; }
+ public TcpClient TcpClient { get; private set; }
public IPAddress Host { get; }
public int Port { get; }
@@ -159,4 +154,4 @@
_isDisposed = true;
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Transport/Client/TTlsSocketTransport.cs b/lib/netstd/Thrift/Transport/Client/TTlsSocketTransport.cs
index 9295bb0..a926a38 100644
--- a/lib/netstd/Thrift/Transport/Client/TTlsSocketTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TTlsSocketTransport.cs
@@ -42,7 +42,8 @@
private SslStream _secureStream;
private int _timeout;
- public TTlsSocketTransport(TcpClient client, X509Certificate2 certificate, bool isServer = false,
+ public TTlsSocketTransport(TcpClient client,
+ X509Certificate2 certificate, bool isServer = false,
RemoteCertificateValidationCallback certValidator = null,
LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
SslProtocols sslProtocols = SslProtocols.Tls12)
@@ -67,7 +68,8 @@
}
}
- public TTlsSocketTransport(IPAddress host, int port, string certificatePath,
+ public TTlsSocketTransport(IPAddress host, int port,
+ string certificatePath,
RemoteCertificateValidationCallback certValidator = null,
LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
SslProtocols sslProtocols = SslProtocols.Tls12)
@@ -79,7 +81,7 @@
{
}
- public TTlsSocketTransport(IPAddress host, int port,
+ public TTlsSocketTransport(IPAddress host, int port,
X509Certificate2 certificate = null,
RemoteCertificateValidationCallback certValidator = null,
LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
@@ -92,7 +94,7 @@
{
}
- public TTlsSocketTransport(IPAddress host, int port, int timeout,
+ public TTlsSocketTransport(IPAddress host, int port, int timeout,
X509Certificate2 certificate,
RemoteCertificateValidationCallback certValidator = null,
LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
@@ -109,7 +111,7 @@
InitSocket();
}
- public TTlsSocketTransport(string host, int port, int timeout,
+ public TTlsSocketTransport(string host, int port, int timeout,
X509Certificate2 certificate,
RemoteCertificateValidationCallback certValidator = null,
LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
diff --git a/lib/netstd/Thrift/Transport/Server/THttpServerTransport.cs b/lib/netstd/Thrift/Transport/Server/THttpServerTransport.cs
index 056300c..2a40db3 100644
--- a/lib/netstd/Thrift/Transport/Server/THttpServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/THttpServerTransport.cs
@@ -43,13 +43,16 @@
protected ITAsyncProcessor Processor;
- public THttpServerTransport(ITAsyncProcessor processor, RequestDelegate next = null, ILoggerFactory loggerFactory = null)
+ public THttpServerTransport(
+ ITAsyncProcessor processor,
+ RequestDelegate next = null,
+ ILoggerFactory loggerFactory = null)
: this(processor, new TBinaryProtocol.Factory(), null, next, loggerFactory)
{
}
public THttpServerTransport(
- ITAsyncProcessor processor,
+ ITAsyncProcessor processor,
TProtocolFactory protocolFactory,
TTransportFactory transFactory = null,
RequestDelegate next = null,
@@ -59,7 +62,7 @@
}
public THttpServerTransport(
- ITAsyncProcessor processor,
+ ITAsyncProcessor processor,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
TTransportFactory inputTransFactory = null,
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 77b8251..b2f29b4 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -92,10 +92,16 @@
try
{
var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf);
- if( (handle != null) && (!handle.IsInvalid))
+ if ((handle != null) && (!handle.IsInvalid))
+ {
_stream = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
+ handle = null; // we don't own it any longer
+ }
else
+ {
+ handle?.Dispose();
_stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
+ }
}
catch (NotImplementedException) // Mono still does not support async, fallback to sync
{
@@ -301,7 +307,10 @@
protected override void Dispose(bool disposing)
{
- PipeStream?.Dispose();
+ if (disposing)
+ {
+ PipeStream?.Dispose();
+ }
}
}
}
diff --git a/lib/netstd/Thrift/Transport/TServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TServerTransport.cs
similarity index 96%
rename from lib/netstd/Thrift/Transport/TServerTransport.cs
rename to lib/netstd/Thrift/Transport/Server/TServerTransport.cs
index 74c54cd..dd60f6a 100644
--- a/lib/netstd/Thrift/Transport/TServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TServerTransport.cs
@@ -34,7 +34,7 @@
protected abstract ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken);
- public async ValueTask<TTransport> AcceptAsync()
+ public async ValueTask<TTransport> AcceptAsync()
{
return await AcceptAsync(CancellationToken.None);
}
@@ -51,4 +51,4 @@
return transport;
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs b/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs
index 1286805..231b83f 100644
--- a/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs
@@ -63,7 +63,7 @@
RemoteCertificateValidationCallback clientCertValidator = null,
LocalCertificateSelectionCallback localCertificateSelectionCallback = null,
SslProtocols sslProtocols = SslProtocols.Tls12)
- : this(null, certificate, clientCertValidator, localCertificateSelectionCallback)
+ : this(null, certificate, clientCertValidator, localCertificateSelectionCallback, sslProtocols)
{
try
{
@@ -117,7 +117,9 @@
client.SendTimeout = client.ReceiveTimeout = _clientTimeout;
//wrap the client in an SSL Socket passing in the SSL cert
- var tTlsSocket = new TTlsSocketTransport(client, _serverCertificate, true, _clientCertValidator,
+ var tTlsSocket = new TTlsSocketTransport(
+ client,
+ _serverCertificate, true, _clientCertValidator,
_localCertificateSelectionCallback, _sslProtocols);
await tTlsSocket.SetupTlsAsync();
diff --git a/test/netstd/Client/TestClient.cs b/test/netstd/Client/TestClient.cs
index 0f58f95..13ae313 100644
--- a/test/netstd/Client/TestClient.cs
+++ b/test/netstd/Client/TestClient.cs
@@ -448,7 +448,7 @@
public static byte[] PrepareTestData(bool randomDist, BinaryTestSize testcase)
{
- int amount = -1;
+ int amount;
switch (testcase)
{
case BinaryTestSize.Empty:
@@ -622,26 +622,29 @@
{
Console.WriteLine("*** FAILED ***");
returnCode |= ErrorContainers;
- throw new Exception("CrazyNesting.Equals failed");
}
}
// TODO: Validate received message
Console.Write("testStruct({\"Zero\", 1, -3, -5})");
- var o = new Xtruct();
- o.String_thing = "Zero";
- o.Byte_thing = (sbyte)1;
- o.I32_thing = -3;
- o.I64_thing = -5;
+ var o = new Xtruct
+ {
+ String_thing = "Zero",
+ Byte_thing = (sbyte)1,
+ I32_thing = -3,
+ I64_thing = -5
+ };
var i = await client.testStructAsync(o, MakeTimeoutToken());
Console.WriteLine(" = {\"" + i.String_thing + "\", " + i.Byte_thing + ", " + i.I32_thing + ", " + i.I64_thing + "}");
// TODO: Validate received message
Console.Write("testNest({1, {\"Zero\", 1, -3, -5}, 5})");
- var o2 = new Xtruct2();
- o2.Byte_thing = (sbyte)1;
- o2.Struct_thing = o;
- o2.I32_thing = 5;
+ var o2 = new Xtruct2
+ {
+ Byte_thing = (sbyte)1,
+ Struct_thing = o,
+ I32_thing = 5
+ };
var i2 = await client.testNestAsync(o2, MakeTimeoutToken());
i = i2.Struct_thing;
Console.WriteLine(" = {" + i2.Byte_thing + ", {\"" + i.String_thing + "\", " + i.Byte_thing + ", " + i.I32_thing + ", " + i.I64_thing + "}, " + i2.I32_thing + "}");
@@ -838,16 +841,24 @@
Console.WriteLine("}");
// TODO: Validate received message
- var insane = new Insanity();
- insane.UserMap = new Dictionary<Numberz, long>();
- insane.UserMap[Numberz.FIVE] = 5000L;
- var truck = new Xtruct();
- truck.String_thing = "Truck";
- truck.Byte_thing = (sbyte)8;
- truck.I32_thing = 8;
- truck.I64_thing = 8;
- insane.Xtructs = new List<Xtruct>();
- insane.Xtructs.Add(truck);
+ var insane = new Insanity
+ {
+ UserMap = new Dictionary<Numberz, long>
+ {
+ [Numberz.FIVE] = 5000L
+ }
+ };
+ var truck = new Xtruct
+ {
+ String_thing = "Truck",
+ Byte_thing = (sbyte)8,
+ I32_thing = 8,
+ I64_thing = 8
+ };
+ insane.Xtructs = new List<Xtruct>
+ {
+ truck
+ };
Console.Write("testInsanity()");
var whoa = await client.testInsanityAsync(insane, MakeTimeoutToken());
Console.Write(" = {");
@@ -902,8 +913,10 @@
sbyte arg0 = 1;
var arg1 = 2;
var arg2 = long.MaxValue;
- var multiDict = new Dictionary<short, string>();
- multiDict[1] = "one";
+ var multiDict = new Dictionary<short, string>
+ {
+ [1] = "one"
+ };
var tmpMultiDict = new List<string>();
foreach (var pair in multiDict)
diff --git a/test/netstd/Server/TestServer.cs b/test/netstd/Server/TestServer.cs
index 25c2afc..280f4e9 100644
--- a/test/netstd/Server/TestServer.cs
+++ b/test/netstd/Server/TestServer.cs
@@ -181,19 +181,19 @@
public class TestHandlerAsync : ThriftTest.IAsync
{
- public TServer server { get; set; }
- private int handlerID;
- private StringBuilder sb = new StringBuilder();
- private TestLogDelegate logger;
+ public TServer Server { get; set; }
+ private readonly int handlerID;
+ private readonly StringBuilder sb = new StringBuilder();
+ private readonly TestLogDelegate logger;
public TestHandlerAsync()
{
handlerID = Interlocked.Increment(ref _clientID);
- logger += testConsoleLogger;
+ logger += TestConsoleLogger;
logger.Invoke("New TestHandler instance created");
}
- public void testConsoleLogger(string msg, params object[] values)
+ public void TestConsoleLogger(string msg, params object[] values)
{
sb.Clear();
sb.AppendFormat("handler{0:D3}:", handlerID);
@@ -525,117 +525,121 @@
public static int Execute(List<string> args)
{
- var loggerFactory = new LoggerFactory();//.AddConsole().AddDebug();
- var logger = new LoggerFactory().CreateLogger("Test");
-
- try
+ using (var loggerFactory = new LoggerFactory()) //.AddConsole().AddDebug();
{
- var param = new ServerParam();
+ var logger = loggerFactory.CreateLogger("Test");
try
{
- param.Parse(args);
+ var param = new ServerParam();
+
+ try
+ {
+ param.Parse(args);
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine("*** FAILED ***");
+ Console.WriteLine("Error while parsing arguments");
+ Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+ return 1;
+ }
+
+
+ // Endpoint transport (mandatory)
+ TServerTransport trans;
+ switch (param.transport)
+ {
+ case TransportChoice.NamedPipe:
+ Debug.Assert(param.pipe != null);
+ trans = new TNamedPipeServerTransport(param.pipe);
+ break;
+
+
+ case TransportChoice.TlsSocket:
+ var cert = GetServerCert();
+ if (cert == null || !cert.HasPrivateKey)
+ {
+ cert?.Dispose();
+ throw new InvalidOperationException("Certificate doesn't contain private key");
+ }
+
+ trans = new TTlsServerSocketTransport( param.port, cert,
+ (sender, certificate, chain, errors) => true,
+ null, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12);
+ break;
+
+ case TransportChoice.Socket:
+ default:
+ trans = new TServerSocketTransport(param.port, 0);
+ break;
+ }
+
+ // Layered transport (mandatory)
+ TTransportFactory transFactory = null;
+ switch (param.buffering)
+ {
+ case BufferChoice.Framed:
+ transFactory = new TFramedTransport.Factory();
+ break;
+ case BufferChoice.Buffered:
+ transFactory = new TBufferedTransport.Factory();
+ break;
+ default:
+ Debug.Assert(param.buffering == BufferChoice.None, "unhandled case");
+ transFactory = null; // no layered transprt
+ break;
+ }
+
+ // Protocol (mandatory)
+ TProtocolFactory proto;
+ switch (param.protocol)
+ {
+ case ProtocolChoice.Compact:
+ proto = new TCompactProtocol.Factory();
+ break;
+ case ProtocolChoice.Json:
+ proto = new TJsonProtocol.Factory();
+ break;
+ case ProtocolChoice.Binary:
+ default:
+ proto = new TBinaryProtocol.Factory();
+ break;
+ }
+
+ // Processor
+ var testHandler = new TestHandlerAsync();
+ var testProcessor = new ThriftTest.AsyncProcessor(testHandler);
+ var processorFactory = new TSingletonProcessorFactory(testProcessor);
+
+ TServer serverEngine = new TSimpleAsyncServer(processorFactory, trans, transFactory, transFactory, proto, proto, logger);
+
+ //Server event handler
+ var serverEvents = new MyServerEventHandler();
+ serverEngine.SetEventHandler(serverEvents);
+
+ // Run it
+ var where = (!string.IsNullOrEmpty(param.pipe)) ? "on pipe " + param.pipe : "on port " + param.port;
+ Console.WriteLine("Starting the AsyncBaseServer " + where +
+ " with processor TPrototypeProcessorFactory prototype factory " +
+ (param.buffering == BufferChoice.Buffered ? " with buffered transport" : "") +
+ (param.buffering == BufferChoice.Framed ? " with framed transport" : "") +
+ (param.transport == TransportChoice.TlsSocket ? " with encryption" : "") +
+ (param.protocol == ProtocolChoice.Compact ? " with compact protocol" : "") +
+ (param.protocol == ProtocolChoice.Json ? " with json protocol" : "") +
+ "...");
+ serverEngine.ServeAsync(CancellationToken.None).GetAwaiter().GetResult();
+ Console.ReadLine();
}
- catch (Exception ex)
+ catch (Exception x)
{
- Console.WriteLine("*** FAILED ***");
- Console.WriteLine("Error while parsing arguments");
- Console.WriteLine(ex.Message + " ST: " + ex.StackTrace);
+ Console.Error.Write(x);
return 1;
}
-
- // Endpoint transport (mandatory)
- TServerTransport trans;
- switch (param.transport)
- {
- case TransportChoice.NamedPipe:
- Debug.Assert(param.pipe != null);
- trans = new TNamedPipeServerTransport(param.pipe);
- break;
-
-
- case TransportChoice.TlsSocket:
- var cert = GetServerCert();
- if (cert == null || !cert.HasPrivateKey)
- {
- throw new InvalidOperationException("Certificate doesn't contain private key");
- }
-
- trans = new TTlsServerSocketTransport( param.port, cert,
- (sender, certificate, chain, errors) => true,
- null, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12);
- break;
-
- case TransportChoice.Socket:
- default:
- trans = new TServerSocketTransport(param.port, 0);
- break;
- }
-
- // Layered transport (mandatory)
- TTransportFactory transFactory = null;
- switch (param.buffering)
- {
- case BufferChoice.Framed:
- transFactory = new TFramedTransport.Factory();
- break;
- case BufferChoice.Buffered:
- transFactory = new TBufferedTransport.Factory();
- break;
- default:
- Debug.Assert(param.buffering == BufferChoice.None, "unhandled case");
- transFactory = null; // no layered transprt
- break;
- }
-
- // Protocol (mandatory)
- TProtocolFactory proto;
- switch (param.protocol)
- {
- case ProtocolChoice.Compact:
- proto = new TCompactProtocol.Factory();
- break;
- case ProtocolChoice.Json:
- proto = new TJsonProtocol.Factory();
- break;
- case ProtocolChoice.Binary:
- default:
- proto = new TBinaryProtocol.Factory();
- break;
- }
-
- // Processor
- var testHandler = new TestHandlerAsync();
- var testProcessor = new ThriftTest.AsyncProcessor(testHandler);
- var processorFactory = new TSingletonProcessorFactory(testProcessor);
-
- TServer serverEngine = new TSimpleAsyncServer(processorFactory, trans, transFactory, transFactory, proto, proto, logger);
-
- //Server event handler
- var serverEvents = new MyServerEventHandler();
- serverEngine.SetEventHandler(serverEvents);
-
- // Run it
- var where = (! string.IsNullOrEmpty(param.pipe)) ? "on pipe " + param.pipe : "on port " + param.port;
- Console.WriteLine("Starting the AsyncBaseServer " + where +
- " with processor TPrototypeProcessorFactory prototype factory " +
- (param.buffering == BufferChoice.Buffered ? " with buffered transport" : "") +
- (param.buffering == BufferChoice.Framed ? " with framed transport" : "") +
- (param.transport == TransportChoice.TlsSocket ? " with encryption" : "") +
- (param.protocol == ProtocolChoice.Compact ? " with compact protocol" : "") +
- (param.protocol == ProtocolChoice.Json ? " with json protocol" : "") +
- "...");
- serverEngine.ServeAsync(CancellationToken.None).GetAwaiter().GetResult();
- Console.ReadLine();
+ Console.WriteLine("done.");
+ return 0;
}
- catch (Exception x)
- {
- Console.Error.Write(x);
- return 1;
- }
- Console.WriteLine("done.");
- return 0;
}
}