THRIFT-4879 general performance improvements for netstd library
Client: netstd
Patch: Jens Geyer
This closes #1808
diff --git a/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs b/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
index 7a0243a..3f30d4a 100644
--- a/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
@@ -27,16 +27,19 @@
// ReSharper disable once InconsistentNaming
public class TBinaryProtocol : TProtocol
{
- //TODO: Unit tests
- //TODO: Localization
- //TODO: pragma
-
protected const uint VersionMask = 0xffff0000;
protected const uint Version1 = 0x80010000;
protected bool StrictRead;
protected bool StrictWrite;
+ // minimize memory allocations by means of an preallocated bytes buffer
+ // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long)
+ private byte[] PreAllocatedBuffer = new byte[128];
+
+ private static readonly TStruct AnonymousStruct = new TStruct(string.Empty);
+ private static readonly TField StopField = new TField() { Type = TType.Stop };
+
public TBinaryProtocol(TTransport trans)
: this(trans, false, true)
{
@@ -131,12 +134,15 @@
return;
}
- await WriteByteAsync((sbyte) map.KeyType, cancellationToken);
- await WriteByteAsync((sbyte) map.ValueType, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)map.KeyType;
+ PreAllocatedBuffer[1] = (byte)map.ValueType;
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
+
await WriteI32Async(map.Count, cancellationToken);
}
public override async Task WriteMapEndAsync(CancellationToken cancellationToken)
+
{
if (cancellationToken.IsCancellationRequested)
{
@@ -192,15 +198,6 @@
await WriteByteAsync(b ? (sbyte) 1 : (sbyte) 0, cancellationToken);
}
- protected internal static byte[] CreateWriteByte(sbyte b)
- {
- var bout = new byte[1];
-
- bout[0] = (byte) b;
-
- return bout;
- }
-
public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
@@ -208,20 +205,10 @@
return;
}
- var bout = CreateWriteByte(b);
- await Trans.WriteAsync(bout, 0, 1, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)b;
+
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
}
-
- protected internal static byte[] CreateWriteI16(short s)
- {
- var i16Out = new byte[2];
-
- i16Out[0] = (byte) (0xff & (s >> 8));
- i16Out[1] = (byte) (0xff & s);
-
- return i16Out;
- }
-
public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
@@ -229,20 +216,10 @@
return;
}
- var i16Out = CreateWriteI16(i16);
- await Trans.WriteAsync(i16Out, 0, 2, cancellationToken);
- }
+ PreAllocatedBuffer[0] = (byte)(0xff & (i16 >> 8));
+ PreAllocatedBuffer[1] = (byte)(0xff & i16);
- protected internal static byte[] CreateWriteI32(int i32)
- {
- var i32Out = new byte[4];
-
- i32Out[0] = (byte) (0xff & (i32 >> 24));
- i32Out[1] = (byte) (0xff & (i32 >> 16));
- i32Out[2] = (byte) (0xff & (i32 >> 8));
- i32Out[3] = (byte) (0xff & i32);
-
- return i32Out;
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
}
public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
@@ -252,26 +229,15 @@
return;
}
- var i32Out = CreateWriteI32(i32);
- await Trans.WriteAsync(i32Out, 0, 4, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)(0xff & (i32 >> 24));
+ PreAllocatedBuffer[1] = (byte)(0xff & (i32 >> 16));
+ PreAllocatedBuffer[2] = (byte)(0xff & (i32 >> 8));
+ PreAllocatedBuffer[3] = (byte)(0xff & i32);
+
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 4, cancellationToken);
}
- protected internal static byte[] CreateWriteI64(long i64)
- {
- var i64Out = new byte[8];
-
- i64Out[0] = (byte) (0xff & (i64 >> 56));
- i64Out[1] = (byte) (0xff & (i64 >> 48));
- i64Out[2] = (byte) (0xff & (i64 >> 40));
- i64Out[3] = (byte) (0xff & (i64 >> 32));
- i64Out[4] = (byte) (0xff & (i64 >> 24));
- i64Out[5] = (byte) (0xff & (i64 >> 16));
- i64Out[6] = (byte) (0xff & (i64 >> 8));
- i64Out[7] = (byte) (0xff & i64);
-
- return i64Out;
- }
-
+
public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
@@ -279,8 +245,16 @@
return;
}
- var i64Out = CreateWriteI64(i64);
- await Trans.WriteAsync(i64Out, 0, 8, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)(0xff & (i64 >> 56));
+ PreAllocatedBuffer[1] = (byte)(0xff & (i64 >> 48));
+ PreAllocatedBuffer[2] = (byte)(0xff & (i64 >> 40));
+ PreAllocatedBuffer[3] = (byte)(0xff & (i64 >> 32));
+ PreAllocatedBuffer[4] = (byte)(0xff & (i64 >> 24));
+ PreAllocatedBuffer[5] = (byte)(0xff & (i64 >> 16));
+ PreAllocatedBuffer[6] = (byte)(0xff & (i64 >> 8));
+ PreAllocatedBuffer[7] = (byte)(0xff & i64);
+
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
}
public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
@@ -293,6 +267,7 @@
await WriteI64Async(BitConverter.DoubleToInt64Bits(d), cancellationToken);
}
+
public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
@@ -304,7 +279,7 @@
await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
}
- public override async Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -332,7 +307,7 @@
throw new TProtocolException(TProtocolException.BAD_VERSION,
"Missing version in ReadMessageBegin, old client?");
}
- message.Name = await ReadStringBodyAsync(size, cancellationToken);
+ message.Name = (size > 0) ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty;
message.Type = (TMessageType) await ReadByteAsync(cancellationToken);
message.SeqID = await ReadI32Async(cancellationToken);
}
@@ -347,15 +322,14 @@
}
}
- public override async Task<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
await Task.FromCanceled(cancellationToken);
}
- //TODO: no read from internal transport?
- return new TStruct();
+ return AnonymousStruct;
}
public override async Task ReadStructEndAsync(CancellationToken cancellationToken)
@@ -366,24 +340,24 @@
}
}
- public override async Task<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<TField>(cancellationToken);
}
- var field = new TField
- {
- Type = (TType) await ReadByteAsync(cancellationToken)
- };
- if (field.Type != TType.Stop)
+ var type = (TType)await ReadByteAsync(cancellationToken);
+ if (type == TType.Stop)
{
- field.ID = await ReadI16Async(cancellationToken);
+ return StopField;
}
- return field;
+ return new TField {
+ Type = type,
+ ID = await ReadI16Async(cancellationToken)
+ };
}
public override async Task ReadFieldEndAsync(CancellationToken cancellationToken)
@@ -394,7 +368,7 @@
}
}
- public override async Task<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -419,7 +393,7 @@
}
}
- public override async Task<TList> ReadListBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -443,7 +417,7 @@
}
}
- public override async Task<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -467,7 +441,7 @@
}
}
- public override async Task<bool> ReadBoolAsync(CancellationToken cancellationToken)
+ public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -477,82 +451,78 @@
return await ReadByteAsync(cancellationToken) == 1;
}
- public override async Task<sbyte> ReadByteAsync(CancellationToken cancellationToken)
+ public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<sbyte>(cancellationToken);
}
- var bin = new byte[1];
- await Trans.ReadAllAsync(bin, 0, 1, cancellationToken); //TODO: why readall ?
- return (sbyte) bin[0];
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
+ return (sbyte)PreAllocatedBuffer[0];
}
- public override async Task<short> ReadI16Async(CancellationToken cancellationToken)
+ public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<short>(cancellationToken);
}
- var i16In = new byte[2];
- await Trans.ReadAllAsync(i16In, 0, 2, cancellationToken);
- var result = (short) (((i16In[0] & 0xff) << 8) | i16In[1] & 0xff);
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
+ var result = (short) (((PreAllocatedBuffer[0] & 0xff) << 8) | PreAllocatedBuffer[1] & 0xff);
return result;
}
- public override async Task<int> ReadI32Async(CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<int>(cancellationToken);
}
- var i32In = new byte[4];
- await Trans.ReadAllAsync(i32In, 0, 4, cancellationToken);
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 4, cancellationToken);
var result =
- ((i32In[0] & 0xff) << 24) |
- ((i32In[1] & 0xff) << 16) |
- ((i32In[2] & 0xff) << 8) |
- i32In[3] & 0xff;
+ ((PreAllocatedBuffer[0] & 0xff) << 24) |
+ ((PreAllocatedBuffer[1] & 0xff) << 16) |
+ ((PreAllocatedBuffer[2] & 0xff) << 8) |
+ PreAllocatedBuffer[3] & 0xff;
return result;
}
#pragma warning disable 675
- protected internal long CreateReadI64(byte[] buf)
+ protected internal long ReadI64FromPreAllocatedBuffer()
{
var result =
- ((long) (buf[0] & 0xff) << 56) |
- ((long) (buf[1] & 0xff) << 48) |
- ((long) (buf[2] & 0xff) << 40) |
- ((long) (buf[3] & 0xff) << 32) |
- ((long) (buf[4] & 0xff) << 24) |
- ((long) (buf[5] & 0xff) << 16) |
- ((long) (buf[6] & 0xff) << 8) |
- buf[7] & 0xff;
+ ((long) (PreAllocatedBuffer[0] & 0xff) << 56) |
+ ((long) (PreAllocatedBuffer[1] & 0xff) << 48) |
+ ((long) (PreAllocatedBuffer[2] & 0xff) << 40) |
+ ((long) (PreAllocatedBuffer[3] & 0xff) << 32) |
+ ((long) (PreAllocatedBuffer[4] & 0xff) << 24) |
+ ((long) (PreAllocatedBuffer[5] & 0xff) << 16) |
+ ((long) (PreAllocatedBuffer[6] & 0xff) << 8) |
+ PreAllocatedBuffer[7] & 0xff;
return result;
}
#pragma warning restore 675
- public override async Task<long> ReadI64Async(CancellationToken cancellationToken)
+ public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<long>(cancellationToken);
}
- var i64In = new byte[8];
- await Trans.ReadAllAsync(i64In, 0, 8, cancellationToken);
- return CreateReadI64(i64In);
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
+ return ReadI64FromPreAllocatedBuffer();
}
- public override async Task<double> ReadDoubleAsync(CancellationToken cancellationToken)
+ public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -563,7 +533,7 @@
return BitConverter.Int64BitsToDouble(d);
}
- public override async Task<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
+ public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -576,13 +546,30 @@
return buf;
}
- private async Task<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken)
+ public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return await Task.FromCanceled<string>(cancellationToken);
+ }
+
+ var size = await ReadI32Async(cancellationToken);
+ return size > 0 ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty;
+ }
+
+ private async ValueTask<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
await Task.FromCanceled<string>(cancellationToken);
}
+ if (size <= PreAllocatedBuffer.Length)
+ {
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, size, cancellationToken);
+ return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, size);
+ }
+
var buf = new byte[size];
await Trans.ReadAllAsync(buf, 0, size, cancellationToken);
return Encoding.UTF8.GetString(buf, 0, buf.Length);