THRIFT-5481 consolidate netstd server implementation details into one common model
Client: netstd
Patch: JensG
diff --git a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
index 45e5513..d46d58a 100644
--- a/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TSimpleAsyncServer.cs
@@ -17,21 +17,24 @@
using System;
using System.Threading;
+using Thrift.Protocol;
+using Thrift.Transport;
+using Thrift.Processor;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
-using Thrift.Protocol;
-using Thrift.Processor;
-using Thrift.Transport;
+
+#pragma warning disable IDE0079 // remove unnecessary pragmas
+#pragma warning disable IDE0063 // using can be simplified, we don't
namespace Thrift.Server
{
- //TODO: unhandled exceptions, etc.
// ReSharper disable once InconsistentNaming
public class TSimpleAsyncServer : TServer
{
- private readonly int _clientWaitingDelay;
- private volatile Task _serverTask;
+ private volatile bool stop = false;
+
+ private CancellationToken ServerCancellationToken;
public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
TServerTransport serverTransport,
@@ -39,8 +42,7 @@
TTransportFactory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
- ILogger logger,
- int clientWaitingDelay = 10)
+ ILogger logger)
: base(itProcessorFactory,
serverTransport,
inputTransportFactory,
@@ -49,7 +51,6 @@
outputProtocolFactory,
logger)
{
- _clientWaitingDelay = clientWaitingDelay;
}
public TSimpleAsyncServer(ITProcessorFactory itProcessorFactory,
@@ -58,16 +59,14 @@
TTransportFactory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
- ILoggerFactory loggerFactory,
- int clientWaitingDelay = 10)
+ ILoggerFactory loggerFactory)
: this(itProcessorFactory,
serverTransport,
inputTransportFactory,
outputTransportFactory,
inputProtocolFactory,
outputProtocolFactory,
- loggerFactory.CreateLogger<TSimpleAsyncServer>(),
- clientWaitingDelay)
+ loggerFactory.CreateLogger<TSimpleAsyncServer>())
{
}
@@ -75,87 +74,87 @@
TServerTransport serverTransport,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
- ILoggerFactory loggerFactory,
- int clientWaitingDelay = 10)
+ ILoggerFactory loggerFactory)
: this(new TSingletonProcessorFactory(processor),
serverTransport,
null, // defaults to TTransportFactory()
null, // defaults to TTransportFactory()
inputProtocolFactory,
outputProtocolFactory,
- loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)),
- clientWaitingDelay)
+ loggerFactory.CreateLogger(nameof(TSimpleAsyncServer)))
{
}
public override async Task ServeAsync(CancellationToken cancellationToken)
{
+ ServerCancellationToken = cancellationToken;
try
{
- // cancelation token
- _serverTask = Task.Factory.StartNew(() => StartListening(cancellationToken), TaskCreationOptions.LongRunning);
- await _serverTask;
- }
- catch (Exception ex)
- {
- Logger.LogError(ex.ToString());
- }
- }
-
- private async Task StartListening(CancellationToken cancellationToken)
- {
- ServerTransport.Listen();
-
- Logger.LogTrace("Started listening at server");
-
- if (ServerEventHandler != null)
- {
- await ServerEventHandler.PreServeAsync(cancellationToken);
- }
-
- while (!cancellationToken.IsCancellationRequested)
- {
- if (ServerTransport.IsClientPending())
+ try
{
- Logger.LogTrace("Waiting for client connection");
+ ServerTransport.Listen();
+ }
+ catch (TTransportException ttx)
+ {
+ LogError("Error, could not listen on ServerTransport: " + ttx);
+ return;
+ }
+ //Fire the preServe server event when server is up but before any client connections
+ if (ServerEventHandler != null)
+ await ServerEventHandler.PreServeAsync(cancellationToken);
+
+ while (!(stop || ServerCancellationToken.IsCancellationRequested))
+ {
try
{
- var client = await ServerTransport.AcceptAsync(cancellationToken);
- await Task.Factory.StartNew(() => Execute(client, cancellationToken), cancellationToken);
+ using (TTransport client = await ServerTransport.AcceptAsync(cancellationToken))
+ {
+ await ExecuteAsync(client);
+ }
+ }
+ catch (TaskCanceledException)
+ {
+ stop = true;
}
catch (TTransportException ttx)
{
- Logger.LogTrace($"Transport exception: {ttx}");
-
- if (ttx.Type != TTransportException.ExceptionType.Interrupted)
+ if (!stop || ttx.Type != TTransportException.ExceptionType.Interrupted)
{
- Logger.LogError(ttx.ToString());
+ LogError(ttx.ToString());
}
+
}
}
- else
+
+ if (stop)
{
try
{
- await Task.Delay(TimeSpan.FromMilliseconds(_clientWaitingDelay), cancellationToken);
+ ServerTransport.Close();
}
- catch (TaskCanceledException) { }
+ catch (TTransportException ttx)
+ {
+ LogError("TServerTransport failed on close: " + ttx.Message);
+ }
+ stop = false;
}
+
}
-
- ServerTransport.Close();
-
- Logger.LogTrace("Completed listening at server");
+ finally
+ {
+ ServerCancellationToken = default;
+ }
}
- public override void Stop()
+ /// <summary>
+ /// Loops on processing a client forever
+ /// client will be a TTransport instance
+ /// </summary>
+ /// <param name="client"></param>
+ private async Task ExecuteAsync(TTransport client)
{
- }
-
- private async Task Execute(TTransport client, CancellationToken cancellationToken)
- {
- Logger.LogTrace("Started client request processing");
+ var cancellationToken = ServerCancellationToken;
var processor = ProcessorFactory.GetAsyncProcessor(client, this);
@@ -164,7 +163,6 @@
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
object connectionContext = null;
-
try
{
try
@@ -174,42 +172,41 @@
inputProtocol = InputProtocolFactory.GetProtocol(inputTransport);
outputProtocol = OutputProtocolFactory.GetProtocol(outputTransport);
+ //Recover event handler (if any) and fire createContext server event when a client connects
if (ServerEventHandler != null)
- {
connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
- }
- while (!cancellationToken.IsCancellationRequested)
+ //Process client requests until client disconnects
+ while (!(stop || cancellationToken.IsCancellationRequested))
{
if (!await inputTransport.PeekAsync(cancellationToken))
- {
break;
- }
+ //Fire processContext server event
+ //N.B. This is the pattern implemented in C++ and the event fires provisionally.
+ //That is to say it may be many minutes between the event firing and the client request
+ //actually arriving or the client may hang up without ever makeing a request.
if (ServerEventHandler != null)
- {
await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
- }
+ //Process client request (blocks until transport is readable)
if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
- {
break;
- }
}
}
- catch (TTransportException ttx)
+ catch (TTransportException)
{
- Logger.LogTrace($"Transport exception: {ttx}");
+ //Usually a client disconnect, expected
}
catch (Exception x)
{
- Logger.LogError($"Error: {x}");
+ //Unexpected
+ LogError("Error: " + x);
}
+ //Fire deleteContext server event after client disconnects
if (ServerEventHandler != null)
- {
await ServerEventHandler.DeleteContextAsync(connectionContext, inputProtocol, outputProtocol, cancellationToken);
- }
}
finally
@@ -224,8 +221,12 @@
inputTransport?.Dispose();
outputTransport?.Dispose();
}
+ }
- Logger.LogTrace("Completed client request processing");
+ public override void Stop()
+ {
+ stop = true;
+ ServerTransport?.Close();
}
}
}
diff --git a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
index 46cc9d4..ba1834c 100644
--- a/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
+++ b/lib/netstd/Thrift/Server/TThreadPoolAsyncServer.cs
@@ -105,7 +105,7 @@
TTransportFactory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
- int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger= null)
+ int minThreadPoolThreads, int maxThreadPoolThreads, ILogger logger = null)
: this(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory,
new Configuration(minThreadPoolThreads, maxThreadPoolThreads),
@@ -245,9 +245,9 @@
connectionContext = await ServerEventHandler.CreateContextAsync(inputProtocol, outputProtocol, cancellationToken);
//Process client requests until client disconnects
- while (!stop)
+ while (!(stop || cancellationToken.IsCancellationRequested))
{
- if (! await inputTransport.PeekAsync(cancellationToken))
+ if (!await inputTransport.PeekAsync(cancellationToken))
break;
//Fire processContext server event
@@ -258,7 +258,7 @@
await ServerEventHandler.ProcessContextAsync(connectionContext, inputTransport, cancellationToken);
//Process client request (blocks until transport is readable)
- if (! await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
+ if (!await processor.ProcessAsync(inputProtocol, outputProtocol, cancellationToken))
break;
}
}