THRIFT-4723 Consolidate C#/netcore into new netstd language target
Client: netstd
Patch: Jens Geyer
This closes #1710
diff --git a/lib/netstd/Thrift/Server/TServer.cs b/lib/netstd/Thrift/Server/TServer.cs
new file mode 100644
index 0000000..3a70c07
--- /dev/null
+++ b/lib/netstd/Thrift/Server/TServer.cs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Thrift.Processor;
+
+namespace Thrift.Server
+{
+ // ReSharper disable once InconsistentNaming
+ public abstract class TServer
+ {
+ protected readonly ILogger Logger;
+ protected ITProtocolFactory InputProtocolFactory;
+ protected TTransportFactory InputTransportFactory;
+ protected ITProcessorFactory ProcessorFactory;
+ protected ITProtocolFactory OutputProtocolFactory;
+ protected TTransportFactory OutputTransportFactory;
+
+ protected TServerEventHandler ServerEventHandler;
+ protected TServerTransport ServerTransport;
+
+ protected TServer(ITProcessorFactory processorFactory, TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory, TTransportFactory outputTransportFactory,
+ ITProtocolFactory inputProtocolFactory, ITProtocolFactory outputProtocolFactory,
+ ILogger logger = null)
+ {
+ ProcessorFactory = processorFactory ?? throw new ArgumentNullException(nameof(processorFactory));
+ ServerTransport = serverTransport;
+ InputTransportFactory = inputTransportFactory ?? throw new ArgumentNullException(nameof(inputTransportFactory));
+ OutputTransportFactory = outputTransportFactory ?? throw new ArgumentNullException(nameof(outputTransportFactory));
+ InputProtocolFactory = inputProtocolFactory ?? throw new ArgumentNullException(nameof(inputProtocolFactory));
+ OutputProtocolFactory = outputProtocolFactory ?? throw new ArgumentNullException(nameof(outputProtocolFactory));
+ Logger = logger; // null is absolutely legal
+ }
+
+ public void SetEventHandler(TServerEventHandler seh)
+ {
+ ServerEventHandler = seh;
+ }
+
+ public TServerEventHandler GetEventHandler()
+ {
+ return ServerEventHandler;
+ }
+
+ // Log delegation? deprecated, use ILogger
+ protected void LogError( string msg)
+ {
+ if (Logger != null)
+ Logger.LogError(msg);
+ }
+
+ public abstract void Stop();
+
+ public virtual void Start()
+ {
+ // do nothing
+ }
+
+ public virtual async Task ServeAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ await Task.FromCanceled(cancellationToken);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/lib/netstd/Thrift/Server/TServerEventHandler.cs b/lib/netstd/Thrift/Server/TServerEventHandler.cs
new file mode 100644
index 0000000..0c31bf6
--- /dev/null
+++ b/lib/netstd/Thrift/Server/TServerEventHandler.cs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Threading;
+using System.Threading.Tasks;
+using Thrift.Protocol;
+using Thrift.Transport;
+
+namespace Thrift.Server
+{
+ //TODO: replacement by event?
+
+ /// <summary>
+ /// Interface implemented by server users to handle events from the server
+ /// </summary>
+ // ReSharper disable once InconsistentNaming
+ public interface TServerEventHandler
+ {
+ /// <summary>
+ /// Called before the server begins */
+ /// </summary>
+ Task PreServeAsync(CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Called when a new client has connected and is about to being processing */
+ /// </summary>
+ Task<object> CreateContextAsync(TProtocol input, TProtocol output, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Called when a client has finished request-handling to delete server context */
+ /// </summary>
+ Task DeleteContextAsync(object serverContext, TProtocol input, TProtocol output,
+ CancellationToken cancellationToken);
+
+ /// <summary>
+ /// Called when a client is about to call the processor */
+ /// </summary>
+ Task ProcessContextAsync(object serverContext, TTransport transport, CancellationToken cancellationToken);
+ }
+}
\ No newline at end of file
diff --git a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
new file mode 100644
index 0000000..a0a3e4c
--- /dev/null
+++ b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Thrift.Protocol;
+using Thrift.Processor;
+using Thrift.Transport;
+
+namespace Thrift.Server
+{
+ //TODO: unhandled exceptions, etc.
+
+ // ReSharper disable once InconsistentNaming
+ public class TSimpleAsyncServer : TServer
+ {
+ private readonly int _clientWaitingDelay;
+ private volatile Task _serverTask;
+
+ public TSimpleAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport,
+ ITProtocolFactory inputProtocolFactory, ITProtocolFactory outputProtocolFactory,
+ ILoggerFactory loggerFactory, int clientWaitingDelay = 10)
+ : this(new TSingletonProcessorFactory(processor), serverTransport,
+ new TTransportFactory(), new TTransportFactory(),
+ inputProtocolFactory, outputProtocolFactory,
+ loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)), clientWaitingDelay)
+ {
+ }
+
+ public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory, TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory, TTransportFactory outputTransportFactory,
+ ITProtocolFactory inputProtocolFactory, ITProtocolFactory outputProtocolFactory,
+ ILogger logger, int clientWaitingDelay = 10)
+ : base(itProcessorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory, logger)
+ {
+ _clientWaitingDelay = clientWaitingDelay;
+ }
+
+ public override async Task ServeAsync(CancellationToken cancellationToken)
+ {
+ try
+ {
+ // cancelation token
+ _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
+ await _serverTask;
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex.ToString());
+ }
+ }
+
+ private async Task StartListening(CancellationToken cancellationToken)
+ {
+ ServerTransport.Listen();
+
+ Logger.LogTrace("Started listening at server");
+
+ if (ServerEventHandler != null)
+ {
+ await ServerEventHandler.PreServeAsync(cancellationToken);
+ }
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ if (ServerTransport.IsClientPending())
+ {
+ Logger.LogTrace("Waiting for client connection");
+
+ try
+ {
+ var client = await ServerTransport.AcceptAsync(cancellationToken);
+ await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
+ }
+ catch (TTransportException ttx)
+ {
+ Logger.LogTrace($"Transport exception: {ttx}");
+
+ if (ttx.Type != TTransportException.ExceptionType.Interrupted)
+ {
+ Logger.LogError(ttx.ToString());
+ }
+ }
+ }
+ else
+ {
+ try
+ {
+ await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
+ }
+ catch (TaskCanceledException) { }
+ }
+ }
+
+ ServerTransport.Close();
+
+ Logger.LogTrace("Completed listening at server");
+ }
+
+ public override void Stop()
+ {
+ }
+
+ private async Task Execute(TTransport client, CancellationToken cancellationToken)
+ {
+ Logger.LogTrace("Started client request processing");
+
+ var processor = ProcessorFactory.GetAsyncProcessor(client, this);
+
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
+ object connectionContext = null;
+
+ try
+ {
+ try
+ {
+ inputTransport = InputTransportFactory.GetTransport(client);
+ outputTransport = OutputTransportFactory.GetTransport(client);
+ inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
+ outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
+
+ if (ServerEventHandler != null)
+ {
+ connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
+ }
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ if (!await inputTransport.PeekAsync(cancellationToken))
+ {
+ break;
+ }
+
+ if (ServerEventHandler != null)
+ {
+ await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
+ }
+
+ if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
+ {
+ break;
+ }
+ }
+ }
+ catch (TTransportException ttx)
+ {
+ Logger.LogTrace($"Transport exception: {ttx}");
+ }
+ catch (Exception x)
+ {
+ Logger.LogError($"Error: {x}");
+ }
+
+ if (ServerEventHandler != null)
+ {
+ await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
+ }
+
+ }
+ finally
+ {
+ //Close transports
+ inputTransport?.Close();
+ outputTransport?.Close();
+
+ // disposable stuff should be disposed
+ inputProtocol?.Dispose();
+ outputProtocol?.Dispose();
+ inputTransport?.Dispose();
+ outputTransport?.Dispose();
+ }
+
+ Logger.LogTrace("Completed client request processing");
+ }
+ }
+}
diff --git a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
new file mode 100644
index 0000000..e5c5660
--- /dev/null
+++ b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Contains some contributions under the Thrift Software License.
+ * Please see doc/old-thrift-license.txt in the Thrift distribution for
+ * details.
+ */
+
+using System;
+using System.Threading;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Thrift.Processor;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+
+namespace Thrift.Server
+{
+ /// <summary>
+ /// Server that uses C# built-in ThreadPool to spawn threads when handling requests.
+ /// </summary>
+ public class TThreadPoolAsyncServer : TServer
+ {
+ private const int DEFAULT_MIN_THREADS = -1; // use .NET ThreadPool defaults
+ private const int DEFAULT_MAX_THREADS = -1; // use .NET ThreadPool defaults
+ private volatile bool stop = false;
+
+ private CancellationToken ServerCancellationToken;
+
+ public struct Configuration
+ {
+ public int MinWorkerThreads;
+ public int MaxWorkerThreads;
+ public int MinIOThreads;
+ public int MaxIOThreads;
+
+ public Configuration(int min = DEFAULT_MIN_THREADS, int max = DEFAULT_MAX_THREADS)
+ {
+ MinWorkerThreads = min;
+ MaxWorkerThreads = max;
+ MinIOThreads = min;
+ MaxIOThreads = max;
+ }
+
+ public Configuration(int minWork, int maxWork, int minIO, int maxIO)
+ {
+ MinWorkerThreads = minWork;
+ MaxWorkerThreads = maxWork;
+ MinIOThreads = minIO;
+ MaxIOThreads = maxIO;
+ }
+ }
+
+ public TThreadPoolAsyncServer(ITAsyncProcessor processor, TServerTransport serverTransport, ILogger logger = null)
+ : this(new TSingletonProcessorFactory(processor), serverTransport,
+ new TTransportFactory(), new TTransportFactory(),
+ new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+ new Configuration(), logger)
+ {
+ }
+
+ public TThreadPoolAsyncServer(ITAsyncProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ ITProtocolFactory protocolFactory)
+ : this(new TSingletonProcessorFactory(processor), serverTransport,
+ transportFactory, transportFactory,
+ protocolFactory, protocolFactory,
+ new Configuration())
+ {
+ }
+
+ public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ ITProtocolFactory protocolFactory)
+ : this(processorFactory, serverTransport,
+ transportFactory, transportFactory,
+ protocolFactory, protocolFactory,
+ new Configuration())
+ {
+ }
+
+ public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ ITProtocolFactory inputProtocolFactory,
+ ITProtocolFactory outputProtocolFactory,
+ int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
+ : this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory,
+ new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
+ logger)
+ {
+ }
+
+ public TThreadPoolAsyncServer(ITProcessorFactory processorFactory,
+ TServerTransport serverTransport,
+ TTransportFactory inputTransportFactory,
+ TTransportFactory outputTransportFactory,
+ ITProtocolFactory inputProtocolFactory,
+ ITProtocolFactory outputProtocolFactory,
+ Configuration threadConfig,
+ ILogger logger = null)
+ : base(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory, logger)
+ {
+ lock (typeof(TThreadPoolAsyncServer))
+ {
+ if ((threadConfig.MaxWorkerThreads > 0) || (threadConfig.MaxIOThreads > 0))
+ {
+ int work, comm;
+ ThreadPool.GetMaxThreads(out work, out comm);
+ if (threadConfig.MaxWorkerThreads > 0)
+ work = threadConfig.MaxWorkerThreads;
+ if (threadConfig.MaxIOThreads > 0)
+ comm = threadConfig.MaxIOThreads;
+ if (!ThreadPool.SetMaxThreads(work, comm))
+ throw new Exception("Error: could not SetMaxThreads in ThreadPool");
+ }
+
+ if ((threadConfig.MinWorkerThreads > 0) || (threadConfig.MinIOThreads > 0))
+ {
+ int work, comm;
+ ThreadPool.GetMinThreads(out work, out comm);
+ if (threadConfig.MinWorkerThreads > 0)
+ work = threadConfig.MinWorkerThreads;
+ if (threadConfig.MinIOThreads > 0)
+ comm = threadConfig.MinIOThreads;
+ if (!ThreadPool.SetMinThreads(work, comm))
+ throw new Exception("Error: could not SetMinThreads in ThreadPool");
+ }
+ }
+ }
+
+
+ /// <summary>
+ /// Use new ThreadPool thread for each new client connection.
+ /// </summary>
+ public override async Task ServeAsync(CancellationToken cancellationToken)
+ {
+ ServerCancellationToken = cancellationToken;
+ try
+ {
+ try
+ {
+ ServerTransport.Listen();
+ }
+ catch (TTransportException ttx)
+ {
+ LogError("Error, could not listen on ServerTransport: " + ttx);
+ return;
+ }
+
+ //Fire the preServe server event when server is up but before any client connections
+ if (ServerEventHandler != null)
+ await ServerEventHandler.PreServeAsync(cancellationToken);
+
+ while (!stop)
+ {
+ int failureCount = 0;
+ try
+ {
+ TTransport client = await ServerTransport.AcceptAsync(cancellationToken);
+ ThreadPool.QueueUserWorkItem(this.Execute, client);
+ }
+ catch (TTransportException ttx)
+ {
+ if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
+ {
+ ++failureCount;
+ LogError(ttx.ToString());
+ }
+
+ }
+ }
+
+ if (stop)
+ {
+ try
+ {
+ ServerTransport.Close();
+ }
+ catch (TTransportException ttx)
+ {
+ LogError("TServerTransport failed on close: " + ttx.Message);
+ }
+ stop = false;
+ }
+
+ }
+ finally
+ {
+ ServerCancellationToken = default(CancellationToken);
+ }
+ }
+
+ /// <summary>
+ /// Loops on processing a client forever
+ /// threadContext will be a TTransport instance
+ /// </summary>
+ /// <param name="threadContext"></param>
+ private void Execute(object threadContext)
+ {
+ var cancellationToken = ServerCancellationToken;
+
+ using (TTransport client = (TTransport)threadContext)
+ {
+ ITAsyncProcessor processor = ProcessorFactory.GetAsyncProcessor(client, this);
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
+ object connectionContext = null;
+ try
+ {
+ try
+ {
+ inputTransport = InputTransportFactory.GetTransport(client);
+ outputTransport = OutputTransportFactory.GetTransport(client);
+ inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
+ outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
+
+ //Recover event handler (if any) and fire createContext server event when a client connects
+ if (ServerEventHandler != null)
+ connectionContext = ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken).Result;
+
+ //Process client requests until client disconnects
+ while (!stop)
+ {
+ if (! inputTransport.PeekAsync(cancellationToken).Result)
+ break;
+
+ //Fire processContext server event
+ //N.B. This is the pattern implemented in C++ and the event fires provisionally.
+ //That is to say it may be many minutes between the event firing and the client request
+ //actually arriving or the client may hang up without ever makeing a request.
+ if (ServerEventHandler != null)
+ ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken).Wait();
+ //Process client request (blocks until transport is readable)
+ if (!processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken).Result)
+ break;
+ }
+ }
+ catch (TTransportException)
+ {
+ //Usually a client disconnect, expected
+ }
+ catch (Exception x)
+ {
+ //Unexpected
+ LogError("Error: " + x);
+ }
+
+ //Fire deleteContext server event after client disconnects
+ if (ServerEventHandler != null)
+ ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken).Wait();
+
+ }
+ finally
+ {
+ //Close transports
+ inputTransport?.Close();
+ outputTransport?.Close();
+
+ // disposable stuff should be disposed
+ inputProtocol?.Dispose();
+ outputProtocol?.Dispose();
+ inputTransport?.Dispose();
+ outputTransport?.Dispose();
+ }
+ }
+ }
+
+ public override void Stop()
+ {
+ stop = true;
+ ServerTransport?.Close();
+ }
+ }
+}