Merge pull request #2654 from BioDataAnalysis/bda_minor_improvements
Various minor improvements
diff --git a/lib/delphi/src/Thrift.Protocol.Compact.pas b/lib/delphi/src/Thrift.Protocol.Compact.pas
index 02a19ea..80f1ce5 100644
--- a/lib/delphi/src/Thrift.Protocol.Compact.pas
+++ b/lib/delphi/src/Thrift.Protocol.Compact.pas
@@ -546,7 +546,7 @@
var network : TGuid; // in network order (Big Endian)
begin
ASSERT( SizeOf(uuid) = 16);
- network := uuid.SwapByteOrder;
+ network := GuidUtils.SwapByteOrder(uuid);
Transport.Write( @network, 0, SizeOf(network));
end;
@@ -868,7 +868,7 @@
begin
ASSERT( SizeOf(result) = 16);
FTrans.ReadAll( @network, SizeOf(network), 0, SizeOf(network));
- result := network.SwapByteOrder;
+ result := GuidUtils.SwapByteOrder(network);
end;
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index 636f201..c6b1a00 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -889,7 +889,7 @@
begin
ASSERT( SizeOf(result) = 16);
FTrans.ReadAll( @network, SizeOf(network), 0, SizeOf(network));
- result := network.SwapByteOrder;
+ result := GuidUtils.SwapByteOrder(network);
end;
function TBinaryProtocolImpl.ReadBool: Boolean;
@@ -1064,7 +1064,7 @@
var network : TGuid; // in network order (Big Endian)
begin
ASSERT( SizeOf(uuid) = 16);
- network := uuid.SwapByteOrder;
+ network := GuidUtils.SwapByteOrder(uuid);
Transport.Write( @network, 0, SizeOf(network));
end;
diff --git a/lib/delphi/src/Thrift.Utils.pas b/lib/delphi/src/Thrift.Utils.pas
index 1226535..fff6b86 100644
--- a/lib/delphi/src/Thrift.Utils.pas
+++ b/lib/delphi/src/Thrift.Utils.pas
@@ -96,9 +96,12 @@
end;
- TGuidHelper = record helper for System.TGuid
+ // problem: inheritance possible for class helpers ONLY but not with record helpers
+ // workaround: use static class method instead of record helper :-(
+ GuidUtils = class sealed
public
- function SwapByteOrder : TGuid;
+ // new stuff
+ class function SwapByteOrder( const aGuid : TGuid) : TGuid; static;
{$IFDEF Debug}
class procedure SelfTest; static;
@@ -355,16 +358,16 @@
end;
-{ TGuidHelper }
+{ GuidUtils }
-function TGuidHelper.SwapByteOrder : TGuid;
+class function GuidUtils.SwapByteOrder( const aGuid : TGuid) : TGuid;
// convert to/from network byte order
// - https://www.ietf.org/rfc/rfc4122.txt
// - https://stackoverflow.com/questions/10850075/guid-uuid-compatibility-issue-between-net-and-linux
// - https://lists.gnu.org/archive/html/bug-parted/2002-01/msg00099.html
begin
- result := Self;
+ result := aGuid;
IntegerUtils.SwapByteOrder( @result.D1, SizeOf(result.D1));
IntegerUtils.SwapByteOrder( @result.D2, SizeOf(result.D2));
@@ -374,7 +377,7 @@
{$IFDEF Debug}
-class procedure TGuidHelper.SelfTest;
+class procedure GuidUtils.SelfTest;
var guid : TGuid;
pBytes : PByteArray;
i, expected : Integer;
@@ -382,7 +385,7 @@
begin
// host to network
guid := TEST_GUID;
- guid := guid.SwapByteOrder;
+ guid := GuidUtils.SwapByteOrder(guid);
// validate network order
pBytes := @guid;
@@ -392,8 +395,11 @@
end;
// network to host and final validation
- guid := guid.SwapByteOrder;
+ guid := GuidUtils.SwapByteOrder(guid);
ASSERT( IsEqualGuid( guid, TEST_GUID));
+
+ // prevent collisions with SysUtils.TGuidHelper
+ guid := TGuid.NewGuid;
end;
{$ENDIF}
@@ -494,6 +500,6 @@
begin
{$IFDEF Debug}
- TGuid.SelfTest;
+ GuidUtils.SelfTest;
{$ENDIF}
end.
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index c57db9d..071c660 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -28,7 +28,7 @@
{
private NamedPipeClientStream PipeStream;
private readonly int ConnectTimeout;
- private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000; // Timeout.Infinite is not a good default
+ private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000; // Timeout.Infinite is not a good default
public TNamedPipeTransport(string pipe, TConfiguration config, int timeout = DEFAULT_CONNECT_TIMEOUT)
: this(".", pipe, config, timeout)
@@ -61,6 +61,8 @@
{
if (PipeStream != null)
{
+ if (PipeStream.IsConnected)
+ PipeStream.Close();
PipeStream.Dispose();
PipeStream = null;
}
@@ -107,20 +109,24 @@
}
}
- public override Task FlushAsync(CancellationToken cancellationToken)
+ public override async Task FlushAsync(CancellationToken cancellationToken)
{
- cancellationToken.ThrowIfCancellationRequested();
-
+ await PipeStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
- return Task.CompletedTask;
}
protected override void Dispose(bool disposing)
{
- if(disposing)
+ if (disposing)
{
- PipeStream?.Dispose();
+ if (PipeStream != null)
+ {
+ if (PipeStream.IsConnected)
+ PipeStream.Close();
+ PipeStream.Dispose();
+ PipeStream = null;
+ }
}
}
}
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 7c94300..8ad62aa 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -24,41 +24,78 @@
using System.ComponentModel;
using System.Security.AccessControl;
using System.Security.Principal;
+using System.Collections.Generic;
+using System.IO;
+using System.Diagnostics;
+
+#pragma warning disable CS1998 // async no await
namespace Thrift.Transport.Server
{
+ [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
[Flags]
- public enum NamedPipeClientFlags {
+ public enum NamedPipeClientFlags { // bad name
None = 0x00,
OnlyLocalClients = 0x01
};
+ [Flags]
+ public enum NamedPipeServerFlags
+ {
+ None = 0x00,
+ OnlyLocalClients = 0x01,
+ };
+
+
// ReSharper disable once InconsistentNaming
public class TNamedPipeServerTransport : TServerTransport
{
+ // to manage incoming connections, we set up a task for each stream to listen on
+ private struct TaskStreamPair
+ {
+ public NamedPipeServerStream Stream;
+ public Task Task;
+
+ public TaskStreamPair(NamedPipeServerStream stream, Task task)
+ {
+ Stream = stream;
+ Task = task;
+ }
+ }
+
/// <summary>
/// This is the address of the Pipe on the localhost.
/// </summary>
private readonly string _pipeAddress;
private bool _asyncMode = true;
private volatile bool _isPending = true;
- private NamedPipeServerStream _stream = null;
+ private readonly List<TaskStreamPair> _streams = new List<TaskStreamPair>();
private readonly bool _onlyLocalClients = false; // compatibility default
+ private readonly byte _numListenPipes = 1; // compatibility default
- public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags)
+ public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes)
: base(config)
{
+ if ((numListenPipes < 1) || (numListenPipes > 254))
+ throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
+
+ _pipeAddress = pipeAddress;
+ _onlyLocalClients = flags.HasFlag(NamedPipeServerFlags.OnlyLocalClients);
+ _numListenPipes = (byte)numListenPipes;
+ }
+
+ [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
+ public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1)
+ : base(config)
+ {
+ if ((numListenPipes < 1) || (numListenPipes > 254))
+ throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
+
_pipeAddress = pipeAddress;
_onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients);
+ _numListenPipes = (byte)numListenPipes;
}
- [Obsolete("This CTOR is deprecated, please use the other one instead.")]
- public TNamedPipeServerTransport(string pipeAddress, TConfiguration config)
- : base(config)
- {
- _pipeAddress = pipeAddress;
- _onlyLocalClients = false;
- }
public override bool IsOpen() {
return true;
@@ -69,75 +106,112 @@
// nothing to do here
}
- public override void Close()
+ private static void Close(NamedPipeServerStream pipe)
{
- if (_stream != null)
+ if (pipe != null)
{
try
{
- if (_stream.IsConnected)
- _stream.Disconnect();
- _stream.Dispose();
+ if (pipe.IsConnected)
+ pipe.Disconnect();
}
finally
{
- _stream = null;
- _isPending = false;
+ pipe.Dispose();
}
}
}
+ public override void Close()
+ {
+ try
+ {
+ if (_streams != null)
+ {
+ while(_streams.Count > 0)
+ {
+ Close(_streams[0].Stream);
+ _streams.RemoveAt(0);
+ }
+ }
+ }
+ finally
+ {
+ _streams.Clear();
+ _isPending = false;
+ }
+ }
+
public override bool IsClientPending()
{
return _isPending;
}
- private void EnsurePipeInstance()
+ private void EnsurePipeInstances()
{
- if (_stream == null)
+ // set up a pool for accepting multiple calls when in multithread mode
+ // once connected, we hand that stream over to the processor and create a fresh one
+ try
{
- const PipeDirection direction = PipeDirection.InOut;
- const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
- const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
- const int inbuf = 4096;
- const int outbuf = 4096;
- var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
+ while (_streams.Count < _numListenPipes)
+ _streams.Add(CreatePipeInstance());
+ }
+ catch
+ {
+ // we might not be able to create all requested instances, e.g. due to some existing instances already processing calls
+ // if we have at least one pipe to listen on -> Good Enough(tm)
+ if (_streams.Count < 1)
+ throw; // no pipes is really bad
+ }
+ }
+
+ private TaskStreamPair CreatePipeInstance()
+ {
+ const PipeDirection direction = PipeDirection.InOut;
+ const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
+ const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
+ const int inbuf = 4096;
+ const int outbuf = 4096;
+ var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
- // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
- // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
- // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
- // EITHER WAY,
- // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
+ // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
+ // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
+ // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
+ // EITHER WAY,
+ // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
- try
+ NamedPipeServerStream instance;
+ try
+ {
+ var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
+ if ((handle != null) && (!handle.IsInvalid))
{
- var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
- if ((handle != null) && (!handle.IsInvalid))
- {
- _stream = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
- handle = null; // we don't own it any longer
- }
- else
- {
- handle?.Dispose();
- _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
- }
+ instance = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
+ handle = null; // we don't own it any longer
}
- catch (NotImplementedException) // Mono still does not support async, fallback to sync
+ else
{
- if (_asyncMode)
- {
- options &= (~PipeOptions.Asynchronous);
- _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
- _asyncMode = false;
- }
- else
- {
- throw;
- }
+ handle?.Dispose();
+ instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
}
}
+ catch (NotImplementedException) // Mono still does not support async, fallback to sync
+ {
+ if (_asyncMode)
+ {
+ options &= (~PipeOptions.Asynchronous);
+ instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
+ _asyncMode = false;
+ }
+ else
+ {
+ throw;
+ }
+ }
+
+ // the task gets added later
+ return new TaskStreamPair( instance, null);
}
@@ -248,14 +322,28 @@
{
try
{
- EnsurePipeInstance();
+ EnsurePipeInstances();
- await _stream.WaitForConnectionAsync(cancellationToken);
+ // fill the list and wait for any task to be completed
+ var tasks = new List<Task>();
+ for (var i = 0; i < _streams.Count; ++i)
+ {
+ if (_streams[i].Task == null)
+ {
+ var pair = _streams[i];
+ pair.Task = Task.Run(async () => await pair.Stream.WaitForConnectionAsync(cancellationToken), cancellationToken);
+ _streams[i] = pair;
+ }
- var trans = new ServerTransport(_stream, Configuration);
- _stream = null; // pass ownership to ServerTransport
+ tasks.Add(_streams[i].Task);
+ }
- //_isPending = false;
+ // there must be an exact mapping between task index and stream index
+ Debug.Assert(_streams.Count == tasks.Count);
+ var index = Task.WaitAny(tasks.ToArray(), cancellationToken);
+
+ var trans = new ServerTransport(_streams[index].Stream, Configuration);
+ _streams.RemoveAt(index); // pass stream ownership to ServerTransport
return trans;
}
@@ -296,8 +384,13 @@
public override void Close()
{
- PipeStream?.Dispose();
- PipeStream = null;
+ if (PipeStream != null)
+ {
+ if (PipeStream.IsConnected)
+ PipeStream.Disconnect();
+ PipeStream.Dispose();
+ PipeStream = null;
+ }
}
public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
@@ -341,19 +434,23 @@
}
}
- public override Task FlushAsync(CancellationToken cancellationToken)
+ public override async Task FlushAsync(CancellationToken cancellationToken)
{
- cancellationToken.ThrowIfCancellationRequested();
-
+ await PipeStream.FlushAsync(cancellationToken);
ResetConsumedMessageSize();
- return Task.CompletedTask;
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
- PipeStream?.Dispose();
+ if (PipeStream != null)
+ {
+ if (PipeStream.IsConnected)
+ PipeStream.Disconnect();
+ PipeStream.Dispose();
+ PipeStream = null;
+ }
}
}
}
diff --git a/test/netstd/Server/TestServer.cs b/test/netstd/Server/TestServer.cs
index 27577ee..1eb5030 100644
--- a/test/netstd/Server/TestServer.cs
+++ b/test/netstd/Server/TestServer.cs
@@ -204,7 +204,6 @@
{
//public TServer Server { get; set; }
private readonly int handlerID;
- private readonly StringBuilder sb = new();
private readonly TestLogDelegate logger;
public TestHandlerAsync()
@@ -216,11 +215,12 @@
public void TestConsoleLogger(string msg, params object[] values)
{
- sb.Clear();
+ var sb = new StringBuilder();
sb.AppendFormat("handler{0:D3}:", handlerID);
sb.AppendFormat(msg, values);
sb.AppendLine();
- Console.Write(sb.ToString());
+ lock (typeof(Console))
+ Console.Write(sb.ToString());
}
public Task testVoid(CancellationToken cancellationToken)
@@ -298,7 +298,7 @@
public Task<Dictionary<int, int>> testMap(Dictionary<int, int>? thing, CancellationToken cancellationToken)
{
- sb.Clear();
+ var sb = new StringBuilder();
sb.Append("testMap({{");
if (thing != null)
{
@@ -323,7 +323,7 @@
public Task<Dictionary<string, string>> testStringMap(Dictionary<string, string>? thing, CancellationToken cancellationToken)
{
- sb.Clear();
+ var sb = new StringBuilder();
sb.Append("testStringMap({{");
if (thing != null)
{
@@ -348,7 +348,7 @@
public Task<HashSet<int>> testSet(HashSet<int>? thing, CancellationToken cancellationToken)
{
- sb.Clear();
+ var sb = new StringBuilder();
sb.Append("testSet({{");
if (thing != null)
{
@@ -373,7 +373,7 @@
public Task<List<int>> testList(List<int>? thing, CancellationToken cancellationToken)
{
- sb.Clear();
+ var sb = new StringBuilder();
sb.Append("testList({{");
if (thing != null)
{
@@ -590,7 +590,8 @@
{
case TransportChoice.NamedPipe:
Debug.Assert(param.pipe != null);
- trans = new TNamedPipeServerTransport(param.pipe, Configuration, NamedPipeClientFlags.OnlyLocalClients);
+ var numListen = (param.server == ServerChoice.Simple) ? 1 : 16;
+ trans = new TNamedPipeServerTransport(param.pipe, Configuration, NamedPipeServerFlags.OnlyLocalClients, numListen);
break;