THRIFT-4723 Consolidate C#/netcore into new netstd language target
Client: netstd
Patch: Jens Geyer

This closes #1710
diff --git a/lib/netstd/Thrift/Server/TServer.cs b/lib/netstd/Thrift/Server/TServer.cs
new file mode 100644
index 0000000..3a70c07
--- /dev/null
+++ b/lib/netstd/Thrift/Server/TServer.cs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+// 
+//     http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Thrift.Processor;
+
+namespace Thrift.Server
+{
+    // ReSharper disable once InconsistentNaming
+    public abstract class TServer
+    {
+        protected readonly ILogger Logger;
+        protected ITProtocolFactory InputProtocolFactory;
+        protected TTransportFactory InputTransportFactory;
+        protected ITProcessorFactory ProcessorFactory;
+        protected ITProtocolFactory OutputProtocolFactory;
+        protected TTransportFactory OutputTransportFactory;
+
+        protected TServerEventHandler ServerEventHandler;
+        protected TServerTransport ServerTransport;
+
+        protected TServer(ITProcessorFactory processorFactory, TServerTransport serverTransport,
+            TTransportFactory inputTransportFactory, TTransportFactory outputTransportFactory,
+            ITProtocolFactory inputProtocolFactory, ITProtocolFactory outputProtocolFactory,
+            ILogger logger = null)
+        {
+            ProcessorFactory = processorFactory ?? throw new ArgumentNullException(nameof(processorFactory));
+            ServerTransport = serverTransport;
+            InputTransportFactory = inputTransportFactory ?? throw new ArgumentNullException(nameof(inputTransportFactory));
+            OutputTransportFactory = outputTransportFactory ?? throw new ArgumentNullException(nameof(outputTransportFactory));
+            InputProtocolFactory = inputProtocolFactory ?? throw new ArgumentNullException(nameof(inputProtocolFactory));
+            OutputProtocolFactory = outputProtocolFactory ?? throw new ArgumentNullException(nameof(outputProtocolFactory));
+            Logger = logger; // null is absolutely legal
+        }
+
+        public void SetEventHandler(TServerEventHandler seh)
+        {
+            ServerEventHandler = seh;
+        }
+
+        public TServerEventHandler GetEventHandler()
+        {
+            return ServerEventHandler;
+        }
+
+        // Log delegation? deprecated, use ILogger 
+        protected void LogError( string msg)
+        {
+            if (Logger != null)
+                Logger.LogError(msg);
+        }
+
+        public abstract void Stop();
+
+        public virtual void Start()
+        {
+            // do nothing
+        }
+
+        public virtual async Task ServeAsync(CancellationToken cancellationToken)
+        {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                await Task.FromCanceled(cancellationToken);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/lib/netstd/Thrift/Server/TServerEventHandler.cs b/lib/netstd/Thrift/Server/TServerEventHandler.cs
new file mode 100644
index 0000000..0c31bf6
--- /dev/null
+++ b/lib/netstd/Thrift/Server/TServerEventHandler.cs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+// 
+//     http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using System.Threading.Tasks;
+using Thrift.Protocol;
+using Thrift.Transport;
+
+namespace Thrift.Server
+{
+    //TODO: replacement by event?
+
+    /// <summary>
+    ///     Interface implemented by server users to handle events from the server
+    /// </summary>
+    // ReSharper disable once InconsistentNaming
+    public interface TServerEventHandler
+    {
+        /// <summary>
+        ///     Called before the server begins */
+        /// </summary>
+        Task PreServeAsync(CancellationToken cancellationToken);
+
+        /// <summary>
+        ///     Called when a new client has connected and is about to being processing */
+        /// </summary>
+        Task<object> CreateContextAsync(TProtocol input, TProtocol output, CancellationToken cancellationToken);
+
+        /// <summary>
+        ///     Called when a client has finished request-handling to delete server context */
+        /// </summary>
+        Task DeleteContextAsync(object serverContext, TProtocol input, TProtocol output,
+            CancellationToken cancellationToken);
+
+        /// <summary>
+        ///     Called when a client is about to call the processor */
+        /// </summary>
+        Task ProcessContextAsync(object serverContext, TTransport transport, CancellationToken cancellationToken);
+    }
+}
\ No newline at end of file
diff --git a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
new file mode 100644
index 0000000..a0a3e4c
--- /dev/null
+++ b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+// 
+//     http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Thrift.Protocol;
+using Thrift.Processor;
+using Thrift.Transport;
+
+namespace Thrift.Server
+{
+    //TODO: unhandled exceptions, etc.
+
+    // ReSharper disable once InconsistentNaming
+    public class TSimpleAsyncServer : TServer
+    {
+        private readonly int _clientWaitingDelay;
+        private volatile Task _serverTask;
+
+        public TSimpleAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport,
+            ITProtocolFactory inputProtocolFactory, ITProtocolFactory outputProtocolFactory,
+            ILoggerFactory loggerFactory, int clientWaitingDelay = 10)
+            : this(new TSingletonProcessorFactory(processor), serverTransport,
+                new TTransportFactory(), new TTransportFactory(),
+                inputProtocolFactory, outputProtocolFactory,
+                loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)), clientWaitingDelay)
+        {
+        }
+
+        public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory, TServerTransport serverTransport,
+            TTransportFactory inputTransportFactory, TTransportFactory outputTransportFactory,
+            ITProtocolFactory inputProtocolFactory, ITProtocolFactory outputProtocolFactory,
+            ILogger logger, int clientWaitingDelay = 10)
+            : base(itProcessorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
+                inputProtocolFactory, outputProtocolFactory, logger)
+        {
+            _clientWaitingDelay = clientWaitingDelay;
+        }
+
+        public override async Task ServeAsync(CancellationToken cancellationToken)
+        {
+            try
+            {
+                // cancelation token
+                _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
+                await _serverTask;
+            }
+            catch (Exception ex)
+            {
+                Logger.LogError(ex.ToString());
+            }
+        }
+
+        private async Task StartListening(CancellationToken cancellationToken)
+        {
+            ServerTransport.Listen();
+
+            Logger.LogTrace("Started listening at server");
+
+            if (ServerEventHandler != null)
+            {
+                await ServerEventHandler.PreServeAsync(cancellationToken);
+            }
+
+            while (!cancellationToken.IsCancellationRequested)
+            {
+                if (ServerTransport.IsClientPending())
+                {
+                    Logger.LogTrace("Waiting for client connection");
+
+                    try
+                    {
+                        var client = await ServerTransport.AcceptAsync(cancellationToken);
+                        await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
+                    }
+                    catch (TTransportException ttx)
+                    {
+                        Logger.LogTrace($"Transport exception: {ttx}");
+
+                        if (ttx.Type != TTransportException.ExceptionType.Interrupted)
+                        {
+                            Logger.LogError(ttx.ToString());
+                        }
+                    }
+                }
+                else
+                {
+                    try
+                    {
+                        await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
+                    }
+                    catch (TaskCanceledException) { }
+                }
+            }
+
+            ServerTransport.Close();
+
+            Logger.LogTrace("Completed listening at server");
+        }
+
+        public override void Stop()
+        {
+        }
+
+        private async Task Execute(TTransport client, CancellationToken cancellationToken)
+        {
+            Logger.LogTrace("Started client request processing");
+
+            var processor = ProcessorFactory.GetAsyncProcessor(client, this);
+
+            TTransport inputTransport = null;
+            TTransport outputTransport = null;
+            TProtocol inputProtocol = null;
+            TProtocol outputProtocol = null;
+            object connectionContext = null;
+
+            try
+            {
+                try
+                {
+                    inputTransport = InputTransportFactory.GetTransport(client);
+                    outputTransport = OutputTransportFactory.GetTransport(client);
+                    inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
+                    outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
+
+                    if (ServerEventHandler != null)
+                    {
+                        connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
+                    }
+
+                    while (!cancellationToken.IsCancellationRequested)
+                    {
+                        if (!await inputTransport.PeekAsync(cancellationToken))
+                        {
+                            break;
+                        }
+
+                        if (ServerEventHandler != null)
+                        {
+                            await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
+                        }
+
+                        if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
+                        {
+                            break;
+                        }
+                    }
+                }
+                catch (TTransportException ttx)
+                {
+                    Logger.LogTrace($"Transport exception: {ttx}");
+                }
+                catch (Exception x)
+                {
+                    Logger.LogError($"Error: {x}");
+                }
+
+                if (ServerEventHandler != null)
+                {
+                    await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
+                }
+
+            }
+            finally
+            {
+                //Close transports
+                inputTransport?.Close();
+                outputTransport?.Close();
+
+                // disposable stuff should be disposed
+                inputProtocol?.Dispose();
+                outputProtocol?.Dispose();
+                inputTransport?.Dispose();
+                outputTransport?.Dispose();
+            }
+
+            Logger.LogTrace("Completed client request processing");
+        }
+    }
+}
diff --git a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
new file mode 100644
index 0000000..e5c5660
--- /dev/null
+++ b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Contains some contributions under the Thrift Software License.
+ * Please see doc/old-thrift-license.txt in the Thrift distribution for
+ * details.
+ */
+
+using System;
+using System.Threading;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Thrift.Processor;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Thrift.Server
+{
+    /// <summary>
+    /// Server that uses C# built-in ThreadPool to spawn threads when handling requests.
+    /// </summary>
+    public class TThreadPoolAsyncServer : TServer
+    {
+        private const int DEFAULT_MIN_THREADS = -1;  // use .NET ThreadPool defaults
+        private const int DEFAULT_MAX_THREADS = -1;  // use .NET ThreadPool defaults
+        private volatile bool stop = false;
+
+        private CancellationToken ServerCancellationToken;
+
+        public struct Configuration
+        {
+            public int MinWorkerThreads;
+            public int MaxWorkerThreads;
+            public int MinIOThreads;
+            public int MaxIOThreads;
+
+            public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS)
+            {
+                MinWorkerThreads = min;
+                MaxWorkerThreads = max;
+                MinIOThreads = min;
+                MaxIOThreads = max;
+            }
+
+            public Configuration(int minWork, int maxWork, int minIO, int maxIO)
+            {
+                MinWorkerThreads = minWork;
+                MaxWorkerThreads = maxWork;
+                MinIOThreads = minIO;
+                MaxIOThreads = maxIO;
+            }
+        }
+
+        public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null)
+            : this(new TSingletonProcessorFactory(processor), serverTransport,
+             new TTransportFactory(), new TTransportFactory(),
+             new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+             new Configuration(), logger)
+        {
+        }
+
+        public TThreadPoolAsyncServer(ITAsyncProcessor processor,
+         TServerTransport serverTransport,
+         TTransportFactory transportFactory,
+         ITProtocolFactory protocolFactory)
+            : this(new TSingletonProcessorFactory(processor), serverTransport,
+               transportFactory, transportFactory,
+               protocolFactory, protocolFactory,
+               new Configuration())
+        {
+        }
+
+        public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
+                     TServerTransport serverTransport,
+                     TTransportFactory transportFactory,
+                     ITProtocolFactory protocolFactory)
+            : this(processorFactory, serverTransport,
+             transportFactory, transportFactory,
+             protocolFactory, protocolFactory,
+             new Configuration())
+        {
+        }
+
+        public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
+                     TServerTransport serverTransport,
+                     TTransportFactory inputTransportFactory,
+                     TTransportFactory outputTransportFactory,
+                     ITProtocolFactory inputProtocolFactory,
+                     ITProtocolFactory outputProtocolFactory,
+                     int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
+            : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
+             inputProtocolFactory, outputProtocolFactory,
+             new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
+             logger)
+        {
+        }
+
+        public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
+                     TServerTransport serverTransport,
+                     TTransportFactory inputTransportFactory,
+                     TTransportFactory outputTransportFactory,
+                     ITProtocolFactory inputProtocolFactory,
+                     ITProtocolFactory outputProtocolFactory,
+                     Configuration threadConfig,
+                     ILogger logger = null)
+            : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
+            inputProtocolFactory, outputProtocolFactory, logger)
+        {
+            lock (typeof(TThreadPoolAsyncServer))
+            {
+                if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
+                {
+                    int work, comm;
+                    ThreadPool.GetMaxThreads(out work, out comm);
+                    if (threadConfig.MaxWorkerThreads > 0)
+                        work = threadConfig.MaxWorkerThreads;
+                    if (threadConfig.MaxIOThreads > 0)
+                        comm = threadConfig.MaxIOThreads;
+                    if (!ThreadPool.SetMaxThreads(work, comm))
+                        throw new Exception("Error: could not SetMaxThreads in ThreadPool");
+                }
+
+                if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
+                {
+                    int work, comm;
+                    ThreadPool.GetMinThreads(out work, out comm);
+                    if (threadConfig.MinWorkerThreads > 0)
+                        work = threadConfig.MinWorkerThreads;
+                    if (threadConfig.MinIOThreads > 0)
+                        comm = threadConfig.MinIOThreads;
+                    if (!ThreadPool.SetMinThreads(work, comm))
+                        throw new Exception("Error: could not SetMinThreads in ThreadPool");
+                }
+            }
+        }
+
+
+        /// <summary>
+        /// Use new ThreadPool thread for each new client connection.
+        /// </summary>
+        public override async Task ServeAsync(CancellationToken cancellationToken)
+        {
+            ServerCancellationToken = cancellationToken;
+            try
+            {
+                try
+                {
+                    ServerTransport.Listen();
+                }
+                catch (TTransportException ttx)
+                {
+                    LogError("Error, could not listen on ServerTransport: " + ttx);
+                    return;
+                }
+
+                //Fire the preServe server event when server is up but before any client connections
+                if (ServerEventHandler != null)
+                    await ServerEventHandler.PreServeAsync(cancellationToken);
+
+                while (!stop)
+                {
+                    int failureCount = 0;
+                    try
+                    {
+                        TTransport client = await ServerTransport.AcceptAsync(cancellationToken);
+                        ThreadPool.QueueUserWorkItem(this.Execute, client);
+                    }
+                    catch (TTransportException ttx)
+                    {
+                        if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
+                        {
+                            ++failureCount;
+                            LogError(ttx.ToString());
+                        }
+
+                    }
+                }
+
+                if (stop)
+                {
+                    try
+                    {
+                        ServerTransport.Close();
+                    }
+                    catch (TTransportException ttx)
+                    {
+                        LogError("TServerTransport failed on close: " + ttx.Message);
+                    }
+                    stop = false;
+                }
+
+            }
+            finally
+            {
+                ServerCancellationToken = default(CancellationToken);
+            }
+        }
+
+        /// <summary>
+        /// Loops on processing a client forever
+        /// threadContext will be a TTransport instance
+        /// </summary>
+        /// <param name="threadContext"></param>
+        private void Execute(object threadContext)
+        {
+            var cancellationToken = ServerCancellationToken;
+
+            using (TTransport client = (TTransport)threadContext)
+            {
+                ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this);
+                TTransport inputTransport = null;
+                TTransport outputTransport = null;
+                TProtocol inputProtocol = null;
+                TProtocol outputProtocol = null;
+                object connectionContext = null;
+                try
+                {
+                    try
+                    {
+                        inputTransport = InputTransportFactory.GetTransport(client);
+                        outputTransport = OutputTransportFactory.GetTransport(client);
+                        inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
+                        outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
+
+                        //Recover event handler (if any) and fire createContext server event when a client connects
+                        if (ServerEventHandler != null)
+                            connectionContext = ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken).Result;
+
+                        //Process client requests until client disconnects
+                        while (!stop)
+                        {
+                            if (! inputTransport.PeekAsync(cancellationToken).Result)
+                                break;
+
+                            //Fire processContext server event
+                            //N.B. This is the pattern implemented in C++ and the event fires provisionally.
+                            //That is to say it may be many minutes between the event firing and the client request
+                            //actually arriving or the client may hang up without ever makeing a request.
+                            if (ServerEventHandler != null)
+                                ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait();
+                            //Process client request (blocks until transport is readable)
+                            if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result)
+                                break;
+                        }
+                    }
+                    catch (TTransportException)
+                    {
+                        //Usually a client disconnect, expected
+                    }
+                    catch (Exception x)
+                    {
+                        //Unexpected
+                        LogError("Error: " + x);
+                    }
+
+                    //Fire deleteContext server event after client disconnects
+                    if (ServerEventHandler != null)
+                        ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken).Wait();
+
+                }
+                finally
+                {
+                    //Close transports
+                    inputTransport?.Close();
+                    outputTransport?.Close();
+
+                    // disposable stuff should be disposed
+                    inputProtocol?.Dispose();
+                    outputProtocol?.Dispose();
+                    inputTransport?.Dispose();
+                    outputTransport?.Dispose();
+                }
+            }
+        }
+
+        public override void Stop()
+        {
+            stop = true;
+            ServerTransport?.Close();
+        }
+    }
+}