THRIFT-4839: Remove embedded buffering/framed options from TCP transports
Client: netstd
Patch: Kyle Smith + minor changes by Jens Geyer
This closes #1770
diff --git a/tutorial/netstd/Client/Client.csproj b/tutorial/netstd/Client/Client.csproj
index 70eae15..a1470a9 100644
--- a/tutorial/netstd/Client/Client.csproj
+++ b/tutorial/netstd/Client/Client.csproj
@@ -30,6 +30,10 @@
</PropertyGroup>
<ItemGroup>
+ <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
+ </ItemGroup>
+
+ <ItemGroup>
<ProjectReference Include="..\Interfaces\Interfaces.csproj" />
<ProjectReference Include="..\..\..\lib\netstd\Thrift\Thrift.csproj" />
</ItemGroup>
diff --git a/tutorial/netstd/Client/Program.cs b/tutorial/netstd/Client/Program.cs
index 4b68cee..f9509fa 100644
--- a/tutorial/netstd/Client/Program.cs
+++ b/tutorial/netstd/Client/Program.cs
@@ -32,6 +32,7 @@
using tutorial;
using shared;
using Microsoft.Extensions.DependencyInjection;
+using System.Diagnostics;
namespace Client
{
@@ -47,17 +48,20 @@
Client.exe -help
will diplay help information
- Client.exe -tr:<transport> -pr:<protocol> -mc:<numClients>
+ Client.exe -tr:<transport> -bf:<buffering> -pr:<protocol> -mc:<numClients>
will run client with specified arguments (tcp transport and binary protocol by default) and with 1 client
Options:
-tr (transport):
tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090)
- tcpbuffered - buffered transport over tcp will be used (host - ""localhost"", port - 9090)
namedpipe - namedpipe transport will be used (pipe address - "".test"")
http - http transport will be used (address - ""http://localhost:9090"")
tcptls - tcp tls transport will be used (host - ""localhost"", port - 9090)
- framed - tcp framed transport will be used (host - ""localhost"", port - 9090)
+
+ -bf (buffering):
+ none - (default) no buffering will be used
+ buffered - buffered transport will be used
+ framed - framed transport will be used
-pr (protocol):
binary - (default) binary protocol will be used
@@ -139,29 +143,57 @@
private static TTransport GetTransport(string[] args)
{
- var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1];
+ TTransport transport = new TSocketTransport(IPAddress.Loopback, 9090);
- Transport selectedTransport;
- if (Enum.TryParse(transport, true, out selectedTransport))
+ // construct endpoint transport
+ var transportArg = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1];
+ if (Enum.TryParse(transportArg, true, out Transport selectedTransport))
{
switch (selectedTransport)
{
case Transport.Tcp:
- return new TSocketTransport(IPAddress.Loopback, 9090);
+ transport = new TSocketTransport(IPAddress.Loopback, 9090);
+ break;
+
case Transport.NamedPipe:
- return new TNamedPipeTransport(".test");
+ transport = new TNamedPipeTransport(".test");
+ break;
+
case Transport.Http:
- return new THttpTransport(new Uri("http://localhost:9090"), null);
- case Transport.TcpBuffered:
- return new TBufferedTransport(new TSocketTransport(IPAddress.Loopback, 9090));
+ transport = new THttpTransport(new Uri("http://localhost:9090"), null);
+ break;
+
case Transport.TcpTls:
- return new TTlsSocketTransport(IPAddress.Loopback, 9090, GetCertificate(), CertValidator, LocalCertificateSelectionCallback);
- case Transport.Framed:
- return new TFramedTransport(new TSocketTransport(IPAddress.Loopback, 9090));
+ transport = new TTlsSocketTransport(IPAddress.Loopback, 9090, GetCertificate(), CertValidator, LocalCertificateSelectionCallback);
+ break;
+
+ default:
+ Debug.Assert(false, "unhandled case");
+ break;
}
}
- return new TSocketTransport(IPAddress.Loopback, 9090);
+ // optionally add layered transport(s)
+ var bufferingArg = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(':')?[1];
+ if (Enum.TryParse<Buffering>(bufferingArg, out var selectedBuffering))
+ {
+ switch (selectedBuffering)
+ {
+ case Buffering.Buffered:
+ transport = new TBufferedTransport(transport);
+ break;
+
+ case Buffering.Framed:
+ transport = new TFramedTransport(transport);
+ break;
+
+ default: // layered transport(s) are optional
+ Debug.Assert(selectedBuffering == Buffering.None, "unhandled case");
+ break;
+ }
+ }
+
+ return transport;
}
private static int GetNumberOfClients(string[] args)
@@ -231,6 +263,9 @@
case Protocol.Multiplexed:
// it returns BinaryProtocol to avoid making wrapped protocol as public in TProtocolDecorator (in RunClientAsync it will be wrapped into Multiplexed protocol)
return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport));
+ default:
+ Debug.Assert(false, "unhandled case");
+ break;
}
}
@@ -363,5 +398,12 @@
Json,
Multiplexed
}
+
+ private enum Buffering
+ {
+ None,
+ Buffered,
+ Framed
+ }
}
}
diff --git a/tutorial/netstd/Server/Program.cs b/tutorial/netstd/Server/Program.cs
index c5e26d1..25e7dae 100644
--- a/tutorial/netstd/Server/Program.cs
+++ b/tutorial/netstd/Server/Program.cs
@@ -36,6 +36,7 @@
using tutorial;
using shared;
using Thrift.Processor;
+using System.Diagnostics;
namespace Server
{
@@ -85,17 +86,20 @@
Server.exe -help
will diplay help information
- Server.exe -tr:<transport> -pr:<protocol>
- will run server with specified arguments (tcp transport and binary protocol by default)
+ Server.exe -tr:<transport> -bf:<buffering> -pr:<protocol>
+ will run server with specified arguments (tcp transport, no buffering, and binary protocol by default)
Options:
-tr (transport):
tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090)
- tcpbuffered - tcp buffered transport will be used (host - ""localhost"", port - 9090)
namedpipe - namedpipe transport will be used (pipe address - "".test"")
http - http transport will be used (http address - ""localhost:9090"")
tcptls - tcp transport with tls will be used (host - ""localhost"", port - 9090)
- framed - tcp framed transport will be used (host - ""localhost"", port - 9090)
+
+ -bf (buffering):
+ none - (default) no buffering will be used
+ buffered - buffered transport will be used
+ framed - framed transport will be used
-pr (protocol):
binary - (default) binary protocol will be used
@@ -111,6 +115,7 @@
private static async Task RunAsync(string[] args, CancellationToken cancellationToken)
{
var selectedTransport = GetTransport(args);
+ var selectedBuffering = GetBuffering(args);
var selectedProtocol = GetProtocol(args);
if (selectedTransport == Transport.Http)
@@ -119,7 +124,7 @@
}
else
{
- await RunSelectedConfigurationAsync(selectedTransport, selectedProtocol, cancellationToken);
+ await RunSelectedConfigurationAsync(selectedTransport, selectedBuffering, selectedProtocol, cancellationToken);
}
}
@@ -132,6 +137,15 @@
return selectedProtocol;
}
+ private static Buffering GetBuffering(string[] args)
+ {
+ var buffering = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(":")?[1];
+
+ Enum.TryParse<Buffering>(buffering, out var selectedBuffering);
+
+ return selectedBuffering;
+ }
+
private static Transport GetTransport(string[] args)
{
var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1];
@@ -141,10 +155,9 @@
return selectedTransport;
}
- private static async Task RunSelectedConfigurationAsync(Transport transport, Protocol protocol, CancellationToken cancellationToken)
+ private static async Task RunSelectedConfigurationAsync(Transport transport, Buffering buffering, Protocol protocol, CancellationToken cancellationToken)
{
var handler = new CalculatorAsyncHandler();
- ITAsyncProcessor processor = null;
TServerTransport serverTransport = null;
switch (transport)
@@ -152,23 +165,36 @@
case Transport.Tcp:
serverTransport = new TServerSocketTransport(9090);
break;
- case Transport.TcpBuffered:
- serverTransport = new TServerSocketTransport(port: 9090, clientTimeout: 10000, buffering: Buffering.BufferedTransport);
- break;
case Transport.NamedPipe:
serverTransport = new TNamedPipeServerTransport(".test");
break;
case Transport.TcpTls:
- serverTransport = new TTlsServerSocketTransport(9090, GetCertificate(), Buffering.None, ClientCertValidator, LocalCertificateSelectionCallback);
- break;
- case Transport.Framed:
- serverTransport = new TServerFramedTransport(9090);
+ serverTransport = new TTlsServerSocketTransport(9090, GetCertificate(), ClientCertValidator, LocalCertificateSelectionCallback);
break;
}
- TProtocolFactory inputProtocolFactory;
- TProtocolFactory outputProtocolFactory;
+ TTransportFactory inputTransportFactory = null;
+ TTransportFactory outputTransportFactory = null;
+ switch (buffering)
+ {
+ case Buffering.Buffered:
+ inputTransportFactory = new TBufferedTransport.Factory();
+ outputTransportFactory = new TBufferedTransport.Factory();
+ break;
+ case Buffering.Framed:
+ inputTransportFactory = new TFramedTransport.Factory();
+ outputTransportFactory = new TFramedTransport.Factory();
+ break;
+
+ default: // layered transport(s) are optional
+ Debug.Assert(buffering == Buffering.None, "unhandled case");
+ break;
+ }
+
+ TProtocolFactory inputProtocolFactory = null;
+ TProtocolFactory outputProtocolFactory = null;
+ ITAsyncProcessor processor = null;
switch (protocol)
{
case Protocol.Binary:
@@ -210,15 +236,25 @@
throw new ArgumentOutOfRangeException(nameof(protocol), protocol, null);
}
+
try
{
Logger.LogInformation(
$"Selected TAsyncServer with {serverTransport} transport, {processor} processor and {inputProtocolFactory} protocol factories");
- var fabric = ServiceCollection.BuildServiceProvider().GetService<ILoggerFactory>();
- var server = new TSimpleAsyncServer(processor, serverTransport, inputProtocolFactory, outputProtocolFactory, fabric);
+ var loggerFactory = ServiceCollection.BuildServiceProvider().GetService<ILoggerFactory>();
+
+ var server = new TSimpleAsyncServer(
+ itProcessorFactory: new TSingletonProcessorFactory(processor),
+ serverTransport: serverTransport,
+ inputTransportFactory: inputTransportFactory,
+ outputTransportFactory: outputTransportFactory,
+ inputProtocolFactory: inputProtocolFactory,
+ outputProtocolFactory: outputProtocolFactory,
+ logger: loggerFactory.CreateLogger<TSimpleAsyncServer>());
Logger.LogInformation("Starting the server...");
+
await server.ServeAsync(cancellationToken);
}
catch (Exception x)
@@ -266,11 +302,16 @@
private enum Transport
{
Tcp,
- TcpBuffered,
NamedPipe,
Http,
TcpTls,
- Framed
+ }
+
+ private enum Buffering
+ {
+ None,
+ Buffered,
+ Framed,
}
private enum Protocol