Merge pull request #2654 from BioDataAnalysis/bda_minor_improvements

Various minor improvements
diff --git a/lib/delphi/src/Thrift.Protocol.Compact.pas b/lib/delphi/src/Thrift.Protocol.Compact.pas
index 02a19ea..80f1ce5 100644
--- a/lib/delphi/src/Thrift.Protocol.Compact.pas
+++ b/lib/delphi/src/Thrift.Protocol.Compact.pas
@@ -546,7 +546,7 @@
 var network : TGuid;  // in network order (Big Endian)
 begin
   ASSERT( SizeOf(uuid) = 16);
-  network := uuid.SwapByteOrder;
+  network := GuidUtils.SwapByteOrder(uuid);
   Transport.Write( @network, 0, SizeOf(network));
 end;
 
@@ -868,7 +868,7 @@
 begin
   ASSERT( SizeOf(result) = 16);
   FTrans.ReadAll( @network, SizeOf(network), 0, SizeOf(network));
-  result := network.SwapByteOrder;
+  result := GuidUtils.SwapByteOrder(network);
 end;
 
 
diff --git a/lib/delphi/src/Thrift.Protocol.pas b/lib/delphi/src/Thrift.Protocol.pas
index 636f201..c6b1a00 100644
--- a/lib/delphi/src/Thrift.Protocol.pas
+++ b/lib/delphi/src/Thrift.Protocol.pas
@@ -889,7 +889,7 @@
 begin
   ASSERT( SizeOf(result) = 16);
   FTrans.ReadAll( @network, SizeOf(network), 0, SizeOf(network));
-  result := network.SwapByteOrder;
+  result := GuidUtils.SwapByteOrder(network);
 end;
 
 function TBinaryProtocolImpl.ReadBool: Boolean;
@@ -1064,7 +1064,7 @@
 var network : TGuid;  // in network order (Big Endian)
 begin
   ASSERT( SizeOf(uuid) = 16);
-  network := uuid.SwapByteOrder;
+  network := GuidUtils.SwapByteOrder(uuid);
   Transport.Write( @network, 0, SizeOf(network));
 end;
 
diff --git a/lib/delphi/src/Thrift.Utils.pas b/lib/delphi/src/Thrift.Utils.pas
index 1226535..fff6b86 100644
--- a/lib/delphi/src/Thrift.Utils.pas
+++ b/lib/delphi/src/Thrift.Utils.pas
@@ -96,9 +96,12 @@
   end;
 
 
-  TGuidHelper = record helper for System.TGuid
+  // problem: inheritance possible for class helpers ONLY but not with record helpers
+  // workaround: use static class method instead of record helper :-(
+  GuidUtils = class sealed
   public
-    function SwapByteOrder : TGuid;
+    // new stuff
+    class function SwapByteOrder( const aGuid : TGuid) : TGuid; static;
 
     {$IFDEF Debug}
     class procedure SelfTest; static;
@@ -355,16 +358,16 @@
 end;
 
 
-{ TGuidHelper }
+{ GuidUtils }
 
 
-function TGuidHelper.SwapByteOrder : TGuid;
+class function GuidUtils.SwapByteOrder( const aGuid : TGuid) : TGuid;
 // convert to/from network byte order
 // - https://www.ietf.org/rfc/rfc4122.txt
 // - https://stackoverflow.com/questions/10850075/guid-uuid-compatibility-issue-between-net-and-linux
 // - https://lists.gnu.org/archive/html/bug-parted/2002-01/msg00099.html
 begin
-  result := Self;
+  result := aGuid;
 
   IntegerUtils.SwapByteOrder( @result.D1, SizeOf(result.D1));
   IntegerUtils.SwapByteOrder( @result.D2, SizeOf(result.D2));
@@ -374,7 +377,7 @@
 
 
 {$IFDEF Debug}
-class procedure TGuidHelper.SelfTest;
+class procedure GuidUtils.SelfTest;
 var guid   : TGuid;
     pBytes : PByteArray;
     i, expected : Integer;
@@ -382,7 +385,7 @@
 begin
   // host to network
   guid := TEST_GUID;
-  guid := guid.SwapByteOrder;
+  guid := GuidUtils.SwapByteOrder(guid);
 
   // validate network order
   pBytes := @guid;
@@ -392,8 +395,11 @@
   end;
 
   // network to host and final validation
-  guid := guid.SwapByteOrder;
+  guid := GuidUtils.SwapByteOrder(guid);
   ASSERT( IsEqualGuid( guid, TEST_GUID));
+
+  // prevent collisions with SysUtils.TGuidHelper
+  guid := TGuid.NewGuid;
 end;
 {$ENDIF}
 
@@ -494,6 +500,6 @@
 
 begin
   {$IFDEF Debug}
-  TGuid.SelfTest;
+  GuidUtils.SelfTest;
   {$ENDIF}
 end.
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index c57db9d..071c660 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -28,7 +28,7 @@
     {
         private NamedPipeClientStream PipeStream;
         private readonly int ConnectTimeout;
-		private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000;   // Timeout.Infinite is not a good default
+        private const int DEFAULT_CONNECT_TIMEOUT = 60 * 1000;   // Timeout.Infinite is not a good default
 
         public TNamedPipeTransport(string pipe, TConfiguration config, int timeout = DEFAULT_CONNECT_TIMEOUT) 
             : this(".", pipe, config, timeout)
@@ -61,6 +61,8 @@
         {
             if (PipeStream != null)
             {
+                if (PipeStream.IsConnected)
+                    PipeStream.Close();
                 PipeStream.Dispose();
                 PipeStream = null;
             }
@@ -107,20 +109,24 @@
             }
         }
 
-        public override Task FlushAsync(CancellationToken cancellationToken)
+        public override async Task FlushAsync(CancellationToken cancellationToken)
         {
-            cancellationToken.ThrowIfCancellationRequested();
-
+            await PipeStream.FlushAsync(cancellationToken);
             ResetConsumedMessageSize();
-            return Task.CompletedTask;
         }
 
         
         protected override void Dispose(bool disposing)
         {
-            if(disposing) 
+            if (disposing)
             {
-              PipeStream?.Dispose();
+                if (PipeStream != null)
+                {
+                    if (PipeStream.IsConnected)
+                        PipeStream.Close();
+                    PipeStream.Dispose();
+                    PipeStream = null;
+                }
             }
         }
     }
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 7c94300..8ad62aa 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -24,41 +24,78 @@
 using System.ComponentModel;
 using System.Security.AccessControl;
 using System.Security.Principal;
+using System.Collections.Generic;
+using System.IO;
+using System.Diagnostics;
+
+#pragma warning disable CS1998  // async no await
 
 namespace Thrift.Transport.Server
 {
+    [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
     [Flags]
-    public enum NamedPipeClientFlags {
+    public enum NamedPipeClientFlags {  // bad name
         None = 0x00,
         OnlyLocalClients = 0x01
     };
 
+    [Flags]
+    public enum NamedPipeServerFlags
+    {
+        None = 0x00,
+        OnlyLocalClients = 0x01,
+    };
+
+
     // ReSharper disable once InconsistentNaming
     public class TNamedPipeServerTransport : TServerTransport
     {
+        // to manage incoming connections, we set up a task for each stream to listen on
+        private struct TaskStreamPair
+        {
+            public NamedPipeServerStream Stream;
+            public Task Task;
+
+            public TaskStreamPair(NamedPipeServerStream stream, Task task)
+            {
+                Stream = stream;
+                Task = task;    
+            }
+        }
+
         /// <summary>
         ///     This is the address of the Pipe on the localhost.
         /// </summary>
         private readonly string _pipeAddress;
         private bool _asyncMode = true;
         private volatile bool _isPending = true;
-        private NamedPipeServerStream _stream = null;
+        private readonly List<TaskStreamPair> _streams = new List<TaskStreamPair>();
         private readonly bool _onlyLocalClients = false;  // compatibility default
+        private readonly byte _numListenPipes = 1;  // compatibility default
 
-        public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags)
+        public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeServerFlags flags, int numListenPipes)
             : base(config)
         {
+            if ((numListenPipes < 1) || (numListenPipes > 254))
+                throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
+
+            _pipeAddress = pipeAddress;
+            _onlyLocalClients = flags.HasFlag(NamedPipeServerFlags.OnlyLocalClients);
+            _numListenPipes = (byte)numListenPipes;
+        }
+
+        [Obsolete("NamedPipeClientFlags is deprecated, use NamedPipeServerFlags instead.")]
+        public TNamedPipeServerTransport(string pipeAddress, TConfiguration config, NamedPipeClientFlags flags, int numListenPipes = 1)
+            : base(config)
+        {
+            if ((numListenPipes < 1) || (numListenPipes > 254))
+                throw new ArgumentOutOfRangeException(nameof(numListenPipes), "Value must be in the range of [1..254]");
+
             _pipeAddress = pipeAddress;
             _onlyLocalClients = flags.HasFlag(NamedPipeClientFlags.OnlyLocalClients);
+            _numListenPipes = (byte)numListenPipes;
         }
 
-        [Obsolete("This CTOR is deprecated, please use the other one instead.")]
-        public TNamedPipeServerTransport(string pipeAddress, TConfiguration config)
-            : base(config)
-        {
-            _pipeAddress = pipeAddress;
-            _onlyLocalClients = false;
-        }
 
         public override bool IsOpen() {
             return true;
@@ -69,75 +106,112 @@
             // nothing to do here
         }
 
-        public override void Close()
+        private static void Close(NamedPipeServerStream pipe)
         {
-            if (_stream != null)
+            if (pipe != null)
             {
                 try
                 {
-                    if (_stream.IsConnected)
-                        _stream.Disconnect();
-                    _stream.Dispose();
+                    if (pipe.IsConnected)
+                        pipe.Disconnect();
                 }
                 finally
                 {
-                    _stream = null;
-                    _isPending = false;
+                    pipe.Dispose();
                 }
             }
         }
 
+        public override void Close()
+        {
+            try
+            {
+                if (_streams != null)
+                {
+                    while(_streams.Count > 0)
+                    {
+                        Close(_streams[0].Stream);
+                        _streams.RemoveAt(0);
+                    }
+                }
+            }
+            finally
+            {
+                _streams.Clear();
+                _isPending = false;
+            }
+        }
+
         public override bool IsClientPending()
         {
             return _isPending;
         }
 
-        private void EnsurePipeInstance()
+        private void EnsurePipeInstances()
         {
-            if (_stream == null)
+            // set up a pool for accepting multiple calls when in multithread mode
+            // once connected, we hand that stream over to the processor and create a fresh one
+            try
             {
-                const PipeDirection direction = PipeDirection.InOut;
-                const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
-                const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
-                const int inbuf = 4096;
-                const int outbuf = 4096;
-                var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
+                while (_streams.Count < _numListenPipes)
+                    _streams.Add(CreatePipeInstance());
+            }
+            catch
+            {
+                // we might not be able to create all requested instances, e.g. due to some existing instances already processing calls
+                // if we have at least one pipe to listen on -> Good Enough(tm) 
+                if (_streams.Count < 1)
+                    throw;  // no pipes is really bad
+            }
+        }
+
+        private TaskStreamPair CreatePipeInstance()
+        {
+            const PipeDirection direction = PipeDirection.InOut;
+            const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
+            const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
+            const int inbuf = 4096;
+            const int outbuf = 4096;
+            var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
 
 
-                // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
-                // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
-                // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
-                // EITHER WAY,
-                // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
+            // TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
+            // - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
+            // - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
+            // EITHER WAY,
+            // - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
 
-                try
+            NamedPipeServerStream instance;
+            try
+            {
+                var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
+                if ((handle != null) && (!handle.IsInvalid))
                 {
-                    var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf, _onlyLocalClients);
-                    if ((handle != null) && (!handle.IsInvalid))
-                    {
-                        _stream = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
-                        handle = null; // we don't own it any longer
-                    }
-                    else
-                    {
-                        handle?.Dispose();
-                        _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
-                    }
+                    instance = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
+                    handle = null; // we don't own it any longer
                 }
-                catch (NotImplementedException) // Mono still does not support async, fallback to sync
+                else
                 {
-                    if (_asyncMode)
-                    {
-                        options &= (~PipeOptions.Asynchronous);
-                        _stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
-                        _asyncMode = false;
-                    }
-                    else
-                    {
-                        throw;
-                    }
+                    handle?.Dispose();
+                    instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
                 }
             }
+            catch (NotImplementedException) // Mono still does not support async, fallback to sync
+            {
+                if (_asyncMode)
+                {
+                    options &= (~PipeOptions.Asynchronous);
+                    instance = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
+                    _asyncMode = false;
+                }
+                else
+                {
+                    throw;
+                }
+            }
+
+            // the task gets added later
+            return new TaskStreamPair( instance, null);
         }
 
 
@@ -248,14 +322,28 @@
         {
             try
             {
-                EnsurePipeInstance();
+                EnsurePipeInstances();
 
-                await _stream.WaitForConnectionAsync(cancellationToken);
+                // fill the list and wait for any task to be completed
+                var tasks = new List<Task>();
+                for (var i = 0; i < _streams.Count; ++i)
+                {
+                    if (_streams[i].Task == null)
+                    {
+                        var pair = _streams[i];
+                        pair.Task = Task.Run(async () => await pair.Stream.WaitForConnectionAsync(cancellationToken), cancellationToken);
+                        _streams[i] = pair;
+                    }
 
-                var trans = new ServerTransport(_stream, Configuration);
-                _stream = null; // pass ownership to ServerTransport
+                    tasks.Add(_streams[i].Task);
+                }
 
-                //_isPending = false;
+                // there must be an exact mapping between task index and stream index
+                Debug.Assert(_streams.Count == tasks.Count);
+                var index = Task.WaitAny(tasks.ToArray(), cancellationToken);
+
+                var trans = new ServerTransport(_streams[index].Stream, Configuration);
+                _streams.RemoveAt(index); // pass stream ownership to ServerTransport
 
                 return trans;
             }
@@ -296,8 +384,13 @@
 
             public override void Close()
             {
-                PipeStream?.Dispose();
-                PipeStream = null;
+                if (PipeStream != null)
+                {
+                    if (PipeStream.IsConnected)
+                        PipeStream.Disconnect();
+                    PipeStream.Dispose();
+                    PipeStream = null;
+                }
             }
 
             public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
@@ -341,19 +434,23 @@
                 }
             }
 
-            public override Task FlushAsync(CancellationToken cancellationToken)
+            public override async Task FlushAsync(CancellationToken cancellationToken)
             {
-                cancellationToken.ThrowIfCancellationRequested();
-
+                await PipeStream.FlushAsync(cancellationToken);
                 ResetConsumedMessageSize();
-                return Task.CompletedTask;
             }
 
             protected override void Dispose(bool disposing)
             {
                 if (disposing)
                 {
-                    PipeStream?.Dispose();
+                    if (PipeStream != null)
+                    {
+                        if (PipeStream.IsConnected)
+                            PipeStream.Disconnect();
+                        PipeStream.Dispose();
+                        PipeStream = null;
+                    }
                 }
             }
         }
diff --git a/test/netstd/Server/TestServer.cs b/test/netstd/Server/TestServer.cs
index 27577ee..1eb5030 100644
--- a/test/netstd/Server/TestServer.cs
+++ b/test/netstd/Server/TestServer.cs
@@ -204,7 +204,6 @@
         {
             //public TServer Server { get; set; }
             private readonly int handlerID;
-            private readonly StringBuilder sb = new();
             private readonly TestLogDelegate logger;
 
             public TestHandlerAsync()
@@ -216,11 +215,12 @@
 
             public void TestConsoleLogger(string msg, params object[] values)
             {
-                sb.Clear();
+                var sb = new StringBuilder();
                 sb.AppendFormat("handler{0:D3}:", handlerID);
                 sb.AppendFormat(msg, values);
                 sb.AppendLine();
-                Console.Write(sb.ToString());
+                lock (typeof(Console))
+                    Console.Write(sb.ToString());
             }
 
             public Task testVoid(CancellationToken cancellationToken)
@@ -298,7 +298,7 @@
 
             public Task<Dictionary<int, int>> testMap(Dictionary<int, int>? thing, CancellationToken cancellationToken)
             {
-                sb.Clear();
+                var sb = new StringBuilder();
                 sb.Append("testMap({{");
                 if (thing != null)
                 {
@@ -323,7 +323,7 @@
 
             public Task<Dictionary<string, string>> testStringMap(Dictionary<string, string>? thing, CancellationToken cancellationToken)
             {
-                sb.Clear();
+                var sb = new StringBuilder();
                 sb.Append("testStringMap({{");
                 if (thing != null)
                 {
@@ -348,7 +348,7 @@
 
             public Task<HashSet<int>> testSet(HashSet<int>? thing, CancellationToken cancellationToken)
             {
-                sb.Clear();
+                var sb = new StringBuilder();
                 sb.Append("testSet({{");
                 if (thing != null)
                 {
@@ -373,7 +373,7 @@
 
             public Task<List<int>> testList(List<int>? thing, CancellationToken cancellationToken)
             {
-                sb.Clear();
+                var sb = new StringBuilder();
                 sb.Append("testList({{");
                 if (thing != null)
                 {
@@ -590,7 +590,8 @@
                     {
                         case TransportChoice.NamedPipe:
                             Debug.Assert(param.pipe != null);
-                            trans = new TNamedPipeServerTransport(param.pipe, Configuration, NamedPipeClientFlags.OnlyLocalClients);
+                            var numListen = (param.server == ServerChoice.Simple) ? 1 : 16;
+                            trans = new TNamedPipeServerTransport(param.pipe, Configuration, NamedPipeServerFlags.OnlyLocalClients, numListen);
                             break;