THRIFT-4879 general performance improvements for netstd library
Client: netstd
Patch: Jens Geyer
This closes #1808
diff --git a/.gitignore b/.gitignore
index fb7651e..3f584e2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -344,6 +344,10 @@
/test/netcore/Thrift
/test/netstd/**/bin
/test/netstd/**/obj
+/test/netstd/*.psess
+/test/netstd/*.vspx
+/test/netstd/*.vsp
+/test/netstd/*.diagsession
/test/netstd/Thrift
/test/php/php_ext_dir/
/test/rs/Cargo.lock
diff --git a/compiler/cpp/src/thrift/generate/t_netstd_generator.cc b/compiler/cpp/src/thrift/generate/t_netstd_generator.cc
index b76a34d..ffe51ab 100644
--- a/compiler/cpp/src/thrift/generate/t_netstd_generator.cc
+++ b/compiler/cpp/src/thrift/generate/t_netstd_generator.cc
@@ -1473,7 +1473,7 @@
out << indent() << "public override int GetHashCode() {" << endl;
indent_up();
- out << indent() << "int hashcode = 0;" << endl;
+ out << indent() << "int hashcode = 157;" << endl;
out << indent() << "unchecked {" << endl;
indent_up();
@@ -1483,24 +1483,26 @@
for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter)
{
t_type* ttype = (*f_iter)->get_type();
- out << indent() << "hashcode = (hashcode * 397) ^ ";
- if (field_is_required((*f_iter)))
+ if (!field_is_required((*f_iter)))
{
- out << "(";
+ out << indent() << "if(__isset." << normalize_name((*f_iter)->get_name()) << ")" << endl;
+ indent_up();
}
- else
- {
- out << "(!__isset." << normalize_name((*f_iter)->get_name()) << " ? 0 : ";
- }
+ out << indent() << "hashcode = (hashcode * 397) + ";
if (ttype->is_container())
{
- out << "(TCollections.GetHashCode(" << prop_name((*f_iter)) << "))";
+ out << "TCollections.GetHashCode(" << prop_name((*f_iter)) << ")";
}
else
{
- out << "(" << prop_name((*f_iter)) << ".GetHashCode())";
+ out << prop_name((*f_iter)) << ".GetHashCode()";
+ }
+ out << ";" << endl;
+
+ if (!field_is_required((*f_iter)))
+ {
+ indent_down();
}
- out << ");" << endl;
}
indent_down();
@@ -2223,7 +2225,6 @@
obj = tmp("_list");
}
- out << indent() << prefix << " = new " << type_name(ttype) << "();" << endl;
if (ttype->is_map())
{
out << indent() << "TMap " << obj << " = await iprot.ReadMapBeginAsync(cancellationToken);" << endl;
@@ -2237,6 +2238,7 @@
out << indent() << "TList " << obj << " = await iprot.ReadListBeginAsync(cancellationToken);" << endl;
}
+ out << indent() << prefix << " = new " << type_name(ttype) << "(" << obj << ".Count);" << endl;
string i = tmp("_i");
out << indent() << "for(int " << i << " = 0; " << i << " < " << obj << ".Count; ++" << i << ")" << endl
<< indent() << "{" << endl;
diff --git a/lib/netstd/Thrift/Collections/THashSet.cs b/lib/netstd/Thrift/Collections/THashSet.cs
index 011f0a0..ffab577 100644
--- a/lib/netstd/Thrift/Collections/THashSet.cs
+++ b/lib/netstd/Thrift/Collections/THashSet.cs
@@ -23,45 +23,56 @@
// ReSharper disable once InconsistentNaming
public class THashSet<T> : ICollection<T>
{
- private readonly HashSet<T> _set = new HashSet<T>();
+ private readonly HashSet<T> Items;
- public int Count => _set.Count;
+ public THashSet()
+ {
+ Items = new HashSet<T>();
+ }
+
+ public THashSet(int capacity)
+ {
+ // TODO: uncomment capacity when NET Standard also implements it
+ Items = new HashSet<T>(/*capacity*/);
+ }
+
+ public int Count => Items.Count;
public bool IsReadOnly => false;
public void Add(T item)
{
- _set.Add(item);
+ Items.Add(item);
}
public void Clear()
{
- _set.Clear();
+ Items.Clear();
}
public bool Contains(T item)
{
- return _set.Contains(item);
+ return Items.Contains(item);
}
public void CopyTo(T[] array, int arrayIndex)
{
- _set.CopyTo(array, arrayIndex);
+ Items.CopyTo(array, arrayIndex);
}
public IEnumerator GetEnumerator()
{
- return _set.GetEnumerator();
+ return Items.GetEnumerator();
}
IEnumerator<T> IEnumerable<T>.GetEnumerator()
{
- return ((IEnumerable<T>) _set).GetEnumerator();
+ return ((IEnumerable<T>) Items).GetEnumerator();
}
public bool Remove(T item)
{
- return _set.Remove(item);
+ return Items.Remove(item);
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs b/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs
index b8c1bce..f5b8d16 100644
--- a/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs
+++ b/lib/netstd/Thrift/Processor/ITAsyncProcessor.cs
@@ -25,4 +25,4 @@
{
Task<bool> ProcessAsync(TProtocol iprot, TProtocol oprot, CancellationToken cancellationToken = default(CancellationToken));
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs b/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs
index 81274be..e5aeaa6 100644
--- a/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs
+++ b/lib/netstd/Thrift/Processor/TMultiplexedProcessor.cs
@@ -129,7 +129,7 @@
_msgBegin = messageBegin;
}
- public override async Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -140,4 +140,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs b/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
index 7a0243a..3f30d4a 100644
--- a/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TBinaryProtocol.cs
@@ -27,16 +27,19 @@
// ReSharper disable once InconsistentNaming
public class TBinaryProtocol : TProtocol
{
- //TODO: Unit tests
- //TODO: Localization
- //TODO: pragma
-
protected const uint VersionMask = 0xffff0000;
protected const uint Version1 = 0x80010000;
protected bool StrictRead;
protected bool StrictWrite;
+ // minimize memory allocations by means of an preallocated bytes buffer
+ // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long)
+ private byte[] PreAllocatedBuffer = new byte[128];
+
+ private static readonly TStruct AnonymousStruct = new TStruct(string.Empty);
+ private static readonly TField StopField = new TField() { Type = TType.Stop };
+
public TBinaryProtocol(TTransport trans)
: this(trans, false, true)
{
@@ -131,12 +134,15 @@
return;
}
- await WriteByteAsync((sbyte) map.KeyType, cancellationToken);
- await WriteByteAsync((sbyte) map.ValueType, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)map.KeyType;
+ PreAllocatedBuffer[1] = (byte)map.ValueType;
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
+
await WriteI32Async(map.Count, cancellationToken);
}
public override async Task WriteMapEndAsync(CancellationToken cancellationToken)
+
{
if (cancellationToken.IsCancellationRequested)
{
@@ -192,15 +198,6 @@
await WriteByteAsync(b ? (sbyte) 1 : (sbyte) 0, cancellationToken);
}
- protected internal static byte[] CreateWriteByte(sbyte b)
- {
- var bout = new byte[1];
-
- bout[0] = (byte) b;
-
- return bout;
- }
-
public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
@@ -208,20 +205,10 @@
return;
}
- var bout = CreateWriteByte(b);
- await Trans.WriteAsync(bout, 0, 1, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)b;
+
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
}
-
- protected internal static byte[] CreateWriteI16(short s)
- {
- var i16Out = new byte[2];
-
- i16Out[0] = (byte) (0xff & (s >> 8));
- i16Out[1] = (byte) (0xff & s);
-
- return i16Out;
- }
-
public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
@@ -229,20 +216,10 @@
return;
}
- var i16Out = CreateWriteI16(i16);
- await Trans.WriteAsync(i16Out, 0, 2, cancellationToken);
- }
+ PreAllocatedBuffer[0] = (byte)(0xff & (i16 >> 8));
+ PreAllocatedBuffer[1] = (byte)(0xff & i16);
- protected internal static byte[] CreateWriteI32(int i32)
- {
- var i32Out = new byte[4];
-
- i32Out[0] = (byte) (0xff & (i32 >> 24));
- i32Out[1] = (byte) (0xff & (i32 >> 16));
- i32Out[2] = (byte) (0xff & (i32 >> 8));
- i32Out[3] = (byte) (0xff & i32);
-
- return i32Out;
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
}
public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
@@ -252,26 +229,15 @@
return;
}
- var i32Out = CreateWriteI32(i32);
- await Trans.WriteAsync(i32Out, 0, 4, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)(0xff & (i32 >> 24));
+ PreAllocatedBuffer[1] = (byte)(0xff & (i32 >> 16));
+ PreAllocatedBuffer[2] = (byte)(0xff & (i32 >> 8));
+ PreAllocatedBuffer[3] = (byte)(0xff & i32);
+
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 4, cancellationToken);
}
- protected internal static byte[] CreateWriteI64(long i64)
- {
- var i64Out = new byte[8];
-
- i64Out[0] = (byte) (0xff & (i64 >> 56));
- i64Out[1] = (byte) (0xff & (i64 >> 48));
- i64Out[2] = (byte) (0xff & (i64 >> 40));
- i64Out[3] = (byte) (0xff & (i64 >> 32));
- i64Out[4] = (byte) (0xff & (i64 >> 24));
- i64Out[5] = (byte) (0xff & (i64 >> 16));
- i64Out[6] = (byte) (0xff & (i64 >> 8));
- i64Out[7] = (byte) (0xff & i64);
-
- return i64Out;
- }
-
+
public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
@@ -279,8 +245,16 @@
return;
}
- var i64Out = CreateWriteI64(i64);
- await Trans.WriteAsync(i64Out, 0, 8, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)(0xff & (i64 >> 56));
+ PreAllocatedBuffer[1] = (byte)(0xff & (i64 >> 48));
+ PreAllocatedBuffer[2] = (byte)(0xff & (i64 >> 40));
+ PreAllocatedBuffer[3] = (byte)(0xff & (i64 >> 32));
+ PreAllocatedBuffer[4] = (byte)(0xff & (i64 >> 24));
+ PreAllocatedBuffer[5] = (byte)(0xff & (i64 >> 16));
+ PreAllocatedBuffer[6] = (byte)(0xff & (i64 >> 8));
+ PreAllocatedBuffer[7] = (byte)(0xff & i64);
+
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
}
public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
@@ -293,6 +267,7 @@
await WriteI64Async(BitConverter.DoubleToInt64Bits(d), cancellationToken);
}
+
public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
@@ -304,7 +279,7 @@
await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
}
- public override async Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -332,7 +307,7 @@
throw new TProtocolException(TProtocolException.BAD_VERSION,
"Missing version in ReadMessageBegin, old client?");
}
- message.Name = await ReadStringBodyAsync(size, cancellationToken);
+ message.Name = (size > 0) ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty;
message.Type = (TMessageType) await ReadByteAsync(cancellationToken);
message.SeqID = await ReadI32Async(cancellationToken);
}
@@ -347,15 +322,14 @@
}
}
- public override async Task<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
await Task.FromCanceled(cancellationToken);
}
- //TODO: no read from internal transport?
- return new TStruct();
+ return AnonymousStruct;
}
public override async Task ReadStructEndAsync(CancellationToken cancellationToken)
@@ -366,24 +340,24 @@
}
}
- public override async Task<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<TField>(cancellationToken);
}
- var field = new TField
- {
- Type = (TType) await ReadByteAsync(cancellationToken)
- };
- if (field.Type != TType.Stop)
+ var type = (TType)await ReadByteAsync(cancellationToken);
+ if (type == TType.Stop)
{
- field.ID = await ReadI16Async(cancellationToken);
+ return StopField;
}
- return field;
+ return new TField {
+ Type = type,
+ ID = await ReadI16Async(cancellationToken)
+ };
}
public override async Task ReadFieldEndAsync(CancellationToken cancellationToken)
@@ -394,7 +368,7 @@
}
}
- public override async Task<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -419,7 +393,7 @@
}
}
- public override async Task<TList> ReadListBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -443,7 +417,7 @@
}
}
- public override async Task<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -467,7 +441,7 @@
}
}
- public override async Task<bool> ReadBoolAsync(CancellationToken cancellationToken)
+ public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -477,82 +451,78 @@
return await ReadByteAsync(cancellationToken) == 1;
}
- public override async Task<sbyte> ReadByteAsync(CancellationToken cancellationToken)
+ public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<sbyte>(cancellationToken);
}
- var bin = new byte[1];
- await Trans.ReadAllAsync(bin, 0, 1, cancellationToken); //TODO: why readall ?
- return (sbyte) bin[0];
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
+ return (sbyte)PreAllocatedBuffer[0];
}
- public override async Task<short> ReadI16Async(CancellationToken cancellationToken)
+ public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<short>(cancellationToken);
}
- var i16In = new byte[2];
- await Trans.ReadAllAsync(i16In, 0, 2, cancellationToken);
- var result = (short) (((i16In[0] & 0xff) << 8) | i16In[1] & 0xff);
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
+ var result = (short) (((PreAllocatedBuffer[0] & 0xff) << 8) | PreAllocatedBuffer[1] & 0xff);
return result;
}
- public override async Task<int> ReadI32Async(CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<int>(cancellationToken);
}
- var i32In = new byte[4];
- await Trans.ReadAllAsync(i32In, 0, 4, cancellationToken);
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 4, cancellationToken);
var result =
- ((i32In[0] & 0xff) << 24) |
- ((i32In[1] & 0xff) << 16) |
- ((i32In[2] & 0xff) << 8) |
- i32In[3] & 0xff;
+ ((PreAllocatedBuffer[0] & 0xff) << 24) |
+ ((PreAllocatedBuffer[1] & 0xff) << 16) |
+ ((PreAllocatedBuffer[2] & 0xff) << 8) |
+ PreAllocatedBuffer[3] & 0xff;
return result;
}
#pragma warning disable 675
- protected internal long CreateReadI64(byte[] buf)
+ protected internal long ReadI64FromPreAllocatedBuffer()
{
var result =
- ((long) (buf[0] & 0xff) << 56) |
- ((long) (buf[1] & 0xff) << 48) |
- ((long) (buf[2] & 0xff) << 40) |
- ((long) (buf[3] & 0xff) << 32) |
- ((long) (buf[4] & 0xff) << 24) |
- ((long) (buf[5] & 0xff) << 16) |
- ((long) (buf[6] & 0xff) << 8) |
- buf[7] & 0xff;
+ ((long) (PreAllocatedBuffer[0] & 0xff) << 56) |
+ ((long) (PreAllocatedBuffer[1] & 0xff) << 48) |
+ ((long) (PreAllocatedBuffer[2] & 0xff) << 40) |
+ ((long) (PreAllocatedBuffer[3] & 0xff) << 32) |
+ ((long) (PreAllocatedBuffer[4] & 0xff) << 24) |
+ ((long) (PreAllocatedBuffer[5] & 0xff) << 16) |
+ ((long) (PreAllocatedBuffer[6] & 0xff) << 8) |
+ PreAllocatedBuffer[7] & 0xff;
return result;
}
#pragma warning restore 675
- public override async Task<long> ReadI64Async(CancellationToken cancellationToken)
+ public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<long>(cancellationToken);
}
- var i64In = new byte[8];
- await Trans.ReadAllAsync(i64In, 0, 8, cancellationToken);
- return CreateReadI64(i64In);
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
+ return ReadI64FromPreAllocatedBuffer();
}
- public override async Task<double> ReadDoubleAsync(CancellationToken cancellationToken)
+ public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -563,7 +533,7 @@
return BitConverter.Int64BitsToDouble(d);
}
- public override async Task<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
+ public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -576,13 +546,30 @@
return buf;
}
- private async Task<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken)
+ public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return await Task.FromCanceled<string>(cancellationToken);
+ }
+
+ var size = await ReadI32Async(cancellationToken);
+ return size > 0 ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty;
+ }
+
+ private async ValueTask<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
await Task.FromCanceled<string>(cancellationToken);
}
+ if (size <= PreAllocatedBuffer.Length)
+ {
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, size, cancellationToken);
+ return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, size);
+ }
+
var buf = new byte[size];
await Trans.ReadAllAsync(buf, 0, size, cancellationToken);
return Encoding.UTF8.GetString(buf, 0, buf.Length);
diff --git a/lib/netstd/Thrift/Protocol/TCompactProtocol.cs b/lib/netstd/Thrift/Protocol/TCompactProtocol.cs
index e6c5dbd..c26633a 100644
--- a/lib/netstd/Thrift/Protocol/TCompactProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TCompactProtocol.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -37,10 +38,13 @@
private const byte TypeBits = 0x07; // 0000 0111
private const int TypeShiftAmount = 5;
private static readonly TStruct AnonymousStruct = new TStruct(string.Empty);
- private static readonly TField Tstop = new TField(string.Empty, TType.Stop, 0);
+ private static readonly TField StopField = new TField(string.Empty, TType.Stop, 0);
+
+ private const byte NoTypeOverride = 0xFF;
// ReSharper disable once InconsistentNaming
private static readonly byte[] TTypeToCompactType = new byte[16];
+ private static readonly TType[] CompactTypeToTType = new TType[13];
/// <summary>
/// Used to keep track of the last field for the current and previous structs, so we can do the delta stuff.
@@ -59,6 +63,26 @@
private short _lastFieldId;
+ // minimize memory allocations by means of an preallocated bytes buffer
+ // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long)
+ private byte[] PreAllocatedBuffer = new byte[128];
+
+ private struct VarInt
+ {
+ public byte[] bytes;
+ public int count;
+ }
+
+ // minimize memory allocations by means of an preallocated VarInt buffer
+ private VarInt PreAllocatedVarInt = new VarInt()
+ {
+ bytes = new byte[10], // see Int64ToVarInt()
+ count = 0
+ };
+
+
+
+
public TCompactProtocol(TTransport trans)
: base(trans)
{
@@ -74,6 +98,20 @@
TTypeToCompactType[(int) TType.Set] = Types.Set;
TTypeToCompactType[(int) TType.Map] = Types.Map;
TTypeToCompactType[(int) TType.Struct] = Types.Struct;
+
+ CompactTypeToTType[Types.Stop] = TType.Stop;
+ CompactTypeToTType[Types.BooleanTrue] = TType.Bool;
+ CompactTypeToTType[Types.BooleanFalse] = TType.Bool;
+ CompactTypeToTType[Types.Byte] = TType.Byte;
+ CompactTypeToTType[Types.I16] = TType.I16;
+ CompactTypeToTType[Types.I32] = TType.I32;
+ CompactTypeToTType[Types.I64] = TType.I64;
+ CompactTypeToTType[Types.Double] = TType.Double;
+ CompactTypeToTType[Types.Binary] = TType.String;
+ CompactTypeToTType[Types.List] = TType.List;
+ CompactTypeToTType[Types.Set] = TType.Set;
+ CompactTypeToTType[Types.Map] = TType.Map;
+ CompactTypeToTType[Types.Struct] = TType.Struct;
}
public void Reset()
@@ -84,19 +122,12 @@
public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
{
- if (cancellationToken.IsCancellationRequested)
- {
- return;
- }
+ PreAllocatedBuffer[0] = ProtocolId;
+ PreAllocatedBuffer[1] = (byte)((Version & VersionMask) | (((uint)message.Type << TypeShiftAmount) & TypeMask));
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
- await Trans.WriteAsync(new[] {ProtocolId}, cancellationToken);
- await
- Trans.WriteAsync(
- new[] {(byte) ((Version & VersionMask) | (((uint) message.Type << TypeShiftAmount) & TypeMask))},
- cancellationToken);
-
- var bufferTuple = CreateWriteVarInt32((uint) message.SeqID);
- await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
+ Int32ToVarInt((uint) message.SeqID, ref PreAllocatedVarInt);
+ await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
await WriteStringAsync(message.Name, cancellationToken);
}
@@ -135,26 +166,31 @@
_lastFieldId = _lastField.Pop();
}
- private async Task WriteFieldBeginInternalAsync(TField field, byte typeOverride,
- CancellationToken cancellationToken)
+ private async Task WriteFieldBeginInternalAsync(TField field, byte fieldType, CancellationToken cancellationToken)
{
- // if there's a exType override, use that.
- var typeToWrite = typeOverride == 0xFF ? GetCompactType(field.Type) : typeOverride;
+ // if there's a exType override passed in, use that. Otherwise ask GetCompactType().
+ if (fieldType == NoTypeOverride)
+ fieldType = GetCompactType(field.Type);
+
// check if we can use delta encoding for the field id
- if ((field.ID > _lastFieldId) && (field.ID - _lastFieldId <= 15))
+ if (field.ID > _lastFieldId)
{
- var b = (byte) (((field.ID - _lastFieldId) << 4) | typeToWrite);
- // Write them together
- await Trans.WriteAsync(new[] {b}, cancellationToken);
- }
- else
- {
- // Write them separate
- await Trans.WriteAsync(new[] {typeToWrite}, cancellationToken);
- await WriteI16Async(field.ID, cancellationToken);
+ var delta = field.ID - _lastFieldId;
+ if (delta <= 15)
+ {
+ // Write them together
+ PreAllocatedBuffer[0] = (byte)((delta << 4) | fieldType);
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
+ _lastFieldId = field.ID;
+ return;
+ }
}
+ // Write them separate
+ PreAllocatedBuffer[0] = fieldType;
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
+ await WriteI16Async(field.ID, cancellationToken);
_lastFieldId = field.ID;
}
@@ -166,7 +202,7 @@
}
else
{
- await WriteFieldBeginInternalAsync(field, 0xFF, cancellationToken);
+ await WriteFieldBeginInternalAsync(field, NoTypeOverride, cancellationToken);
}
}
@@ -185,7 +221,8 @@
return;
}
- await Trans.WriteAsync(new[] {Types.Stop}, cancellationToken);
+ PreAllocatedBuffer[0] = Types.Stop;
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
}
protected async Task WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken)
@@ -202,14 +239,16 @@
if (size <= 14)
{
- await Trans.WriteAsync(new[] {(byte) ((size << 4) | GetCompactType(elemType))}, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)((size << 4) | GetCompactType(elemType));
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
}
else
{
- await Trans.WriteAsync(new[] {(byte) (0xf0 | GetCompactType(elemType))}, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)(0xf0 | GetCompactType(elemType));
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
- var bufferTuple = CreateWriteVarInt32((uint) size);
- await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
+ Int32ToVarInt((uint) size, ref PreAllocatedVarInt);
+ await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
}
}
@@ -261,15 +300,15 @@
if (_booleanField != null)
{
// we haven't written the field header yet
- await
- WriteFieldBeginInternalAsync(_booleanField.Value, b ? Types.BooleanTrue : Types.BooleanFalse,
- cancellationToken);
+ var type = b ? Types.BooleanTrue : Types.BooleanFalse;
+ await WriteFieldBeginInternalAsync(_booleanField.Value, type, cancellationToken);
_booleanField = null;
}
else
{
- // we're not part of a field, so just Write the value.
- await Trans.WriteAsync(new[] {b ? Types.BooleanTrue : Types.BooleanFalse}, cancellationToken);
+ // we're not part of a field, so just write the value.
+ PreAllocatedBuffer[0] = b ? Types.BooleanTrue : Types.BooleanFalse;
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
}
}
@@ -280,7 +319,8 @@
return;
}
- await Trans.WriteAsync(new[] {(byte) b}, cancellationToken);
+ PreAllocatedBuffer[0] = (byte)b;
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
}
public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
@@ -290,29 +330,27 @@
return;
}
- var bufferTuple = CreateWriteVarInt32(IntToZigzag(i16));
- await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
+ Int32ToVarInt(IntToZigzag(i16), ref PreAllocatedVarInt);
+ await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
}
- protected internal Tuple<byte[], int> CreateWriteVarInt32(uint n)
+ private static void Int32ToVarInt(uint n, ref VarInt varint)
{
- // Write an i32 as a varint.Results in 1 - 5 bytes on the wire.
- var i32Buf = new byte[5];
- var idx = 0;
+ // Write an i32 as a varint. Results in 1 - 5 bytes on the wire.
+ varint.count = 0;
+ Debug.Assert(varint.bytes.Length >= 5);
while (true)
{
if ((n & ~0x7F) == 0)
{
- i32Buf[idx++] = (byte) n;
+ varint.bytes[varint.count++] = (byte)n;
break;
}
- i32Buf[idx++] = (byte) ((n & 0x7F) | 0x80);
+ varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
n >>= 7;
}
-
- return new Tuple<byte[], int>(i32Buf, idx);
}
public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
@@ -322,28 +360,26 @@
return;
}
- var bufferTuple = CreateWriteVarInt32(IntToZigzag(i32));
- await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
+ Int32ToVarInt(IntToZigzag(i32), ref PreAllocatedVarInt);
+ await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
}
- protected internal Tuple<byte[], int> CreateWriteVarInt64(ulong n)
+ static private void Int64ToVarInt(ulong n, ref VarInt varint)
{
// Write an i64 as a varint. Results in 1-10 bytes on the wire.
- var buf = new byte[10];
- var idx = 0;
+ varint.count = 0;
+ Debug.Assert(varint.bytes.Length >= 10);
while (true)
{
- if ((n & ~(ulong) 0x7FL) == 0)
+ if ((n & ~(ulong)0x7FL) == 0)
{
- buf[idx++] = (byte) n;
+ varint.bytes[varint.count++] = (byte)n;
break;
}
- buf[idx++] = (byte) ((n & 0x7F) | 0x80);
+ varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
n >>= 7;
}
-
- return new Tuple<byte[], int>(buf, idx);
}
public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
@@ -353,8 +389,8 @@
return;
}
- var bufferTuple = CreateWriteVarInt64(LongToZigzag(i64));
- await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
+ Int64ToVarInt(LongToZigzag(i64), ref PreAllocatedVarInt);
+ await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
}
public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
@@ -364,9 +400,8 @@
return;
}
- var data = new byte[8];
- FixedLongToBytes(BitConverter.DoubleToInt64Bits(d), data, 0);
- await Trans.WriteAsync(data, cancellationToken);
+ FixedLongToBytes(BitConverter.DoubleToInt64Bits(d), PreAllocatedBuffer, 0);
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
}
public override async Task WriteStringAsync(string str, CancellationToken cancellationToken)
@@ -378,8 +413,8 @@
var bytes = Encoding.UTF8.GetBytes(str);
- var bufferTuple = CreateWriteVarInt32((uint) bytes.Length);
- await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
+ Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt);
+ await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
}
@@ -390,8 +425,8 @@
return;
}
- var bufferTuple = CreateWriteVarInt32((uint) bytes.Length);
- await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
+ Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt);
+ await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
}
@@ -401,19 +436,19 @@
{
return;
}
-
+
if (map.Count == 0)
{
- await Trans.WriteAsync(new[] {(byte) 0}, cancellationToken);
+ PreAllocatedBuffer[0] = 0;
+ await Trans.WriteAsync( PreAllocatedBuffer, 0, 1, cancellationToken);
}
else
{
- var bufferTuple = CreateWriteVarInt32((uint) map.Count);
- await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
- await
- Trans.WriteAsync(
- new[] {(byte) ((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType))},
- cancellationToken);
+ Int32ToVarInt((uint) map.Count, ref PreAllocatedVarInt);
+ await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
+
+ PreAllocatedBuffer[0] = (byte)((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType));
+ await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
}
}
@@ -425,7 +460,7 @@
}
}
- public override async Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -461,7 +496,7 @@
}
}
- public override async Task<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -492,19 +527,23 @@
_lastFieldId = _lastField.Pop();
}
- public override async Task<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
{
// Read a field header off the wire.
var type = (byte) await ReadByteAsync(cancellationToken);
+
// if it's a stop, then we can return immediately, as the struct is over.
if (type == Types.Stop)
{
- return Tstop;
+ return StopField;
}
- short fieldId;
+
// mask off the 4 MSB of the exType header. it could contain a field id delta.
var modifier = (short) ((type & 0xf0) >> 4);
+ var compactType = (byte)(type & 0x0f);
+
+ short fieldId;
if (modifier == 0)
{
fieldId = await ReadI16Async(cancellationToken);
@@ -514,11 +553,13 @@
fieldId = (short) (_lastFieldId + modifier);
}
- var field = new TField(string.Empty, GetTType((byte) (type & 0x0f)), fieldId);
+ var ttype = GetTType(compactType);
+ var field = new TField(string.Empty, ttype, fieldId);
+
// if this happens to be a boolean field, the value is encoded in the exType
- if (IsBoolType(type))
+ if( ttype == TType.Bool)
{
- _boolValue = (byte) (type & 0x0f) == Types.BooleanTrue;
+ _boolValue = (compactType == Types.BooleanTrue);
}
// push the new field onto the field stack so we can keep the deltas going.
@@ -534,7 +575,7 @@
}
}
- public override async Task<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -560,7 +601,7 @@
}
}
- public override async Task<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
{
/*
Read a set header off the wire. If the set size is 0-14, the size will
@@ -572,13 +613,8 @@
return new TSet(await ReadListBeginAsync(cancellationToken));
}
- public override async Task<bool> ReadBoolAsync(CancellationToken cancellationToken)
+ public override ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
{
- if (cancellationToken.IsCancellationRequested)
- {
- return await Task.FromCanceled<bool>(cancellationToken);
- }
-
/*
Read a boolean off the wire. If this is a boolean field, the value should
already have been Read during ReadFieldBegin, so we'll just consume the
@@ -589,26 +625,27 @@
{
var result = _boolValue.Value;
_boolValue = null;
- return result;
+ return new ValueTask<bool>(result);
}
- return await ReadByteAsync(cancellationToken) == Types.BooleanTrue;
- }
+ return InternalCall();
- public override async Task<sbyte> ReadByteAsync(CancellationToken cancellationToken)
- {
- if (cancellationToken.IsCancellationRequested)
+ async ValueTask<bool> InternalCall()
{
- return await Task.FromCanceled<sbyte>(cancellationToken);
+ var data = await ReadByteAsync(cancellationToken);
+ return (data == Types.BooleanTrue);
}
-
- // Read a single byte off the wire. Nothing interesting here.
- var buf = new byte[1];
- await Trans.ReadAllAsync(buf, 0, 1, cancellationToken);
- return (sbyte) buf[0];
}
- public override async Task<short> ReadI16Async(CancellationToken cancellationToken)
+
+ public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
+ {
+ // Read a single byte off the wire. Nothing interesting here.
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
+ return (sbyte)PreAllocatedBuffer[0];
+ }
+
+ public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -618,7 +655,7 @@
return (short) ZigzagToInt(await ReadVarInt32Async(cancellationToken));
}
- public override async Task<int> ReadI32Async(CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -628,7 +665,7 @@
return ZigzagToInt(await ReadVarInt32Async(cancellationToken));
}
- public override async Task<long> ReadI64Async(CancellationToken cancellationToken)
+ public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -638,60 +675,55 @@
return ZigzagToLong(await ReadVarInt64Async(cancellationToken));
}
- public override async Task<double> ReadDoubleAsync(CancellationToken cancellationToken)
+ public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return await Task.FromCanceled<double>(cancellationToken);
}
- var longBits = new byte[8];
- await Trans.ReadAllAsync(longBits, 0, 8, cancellationToken);
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
- return BitConverter.Int64BitsToDouble(BytesToLong(longBits));
+ return BitConverter.Int64BitsToDouble(BytesToLong(PreAllocatedBuffer));
}
- public override async Task<string> ReadStringAsync(CancellationToken cancellationToken)
+ public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
{
- if (cancellationToken.IsCancellationRequested)
- {
- await Task.FromCanceled<string>(cancellationToken);
- }
-
- // Reads a byte[] (via ReadBinary), and then UTF-8 decodes it.
+ // read length
var length = (int) await ReadVarInt32Async(cancellationToken);
-
if (length == 0)
{
return string.Empty;
}
- var buf = new byte[length];
- await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
-
- return Encoding.UTF8.GetString(buf);
- }
-
- public override async Task<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
- {
- if (cancellationToken.IsCancellationRequested)
+ // read and decode data
+ if (length < PreAllocatedBuffer.Length)
{
- return await Task.FromCanceled<byte[]>(cancellationToken);
+ await Trans.ReadAllAsync(PreAllocatedBuffer, 0, length, cancellationToken);
+ return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, length);
}
- // Read a byte[] from the wire.
+ var buf = new byte[length];
+ await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
+ return Encoding.UTF8.GetString(buf, 0, length);
+ }
+
+ public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
+ {
+ // read length
var length = (int) await ReadVarInt32Async(cancellationToken);
if (length == 0)
{
return new byte[0];
}
+ // read data
var buf = new byte[length];
await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
return buf;
}
- public override async Task<TList> ReadListBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -739,7 +771,7 @@
}
- private async Task<uint> ReadVarInt32Async(CancellationToken cancellationToken)
+ private async ValueTask<uint> ReadVarInt32Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -768,7 +800,7 @@
return result;
}
- private async Task<ulong> ReadVarInt64Async(CancellationToken cancellationToken)
+ private async ValueTask<ulong> ReadVarInt64Async(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -825,45 +857,10 @@
(bytes[0] & 0xffL);
}
- private static bool IsBoolType(byte b)
- {
- var lowerNibble = b & 0x0f;
- return (lowerNibble == Types.BooleanTrue) || (lowerNibble == Types.BooleanFalse);
- }
-
private static TType GetTType(byte type)
{
// Given a TCompactProtocol.Types constant, convert it to its corresponding TType value.
- switch ((byte) (type & 0x0f))
- {
- case Types.Stop:
- return TType.Stop;
- case Types.BooleanFalse:
- case Types.BooleanTrue:
- return TType.Bool;
- case Types.Byte:
- return TType.Byte;
- case Types.I16:
- return TType.I16;
- case Types.I32:
- return TType.I32;
- case Types.I64:
- return TType.I64;
- case Types.Double:
- return TType.Double;
- case Types.Binary:
- return TType.String;
- case Types.List:
- return TType.List;
- case Types.Set:
- return TType.Set;
- case Types.Map:
- return TType.Map;
- case Types.Struct:
- return TType.Struct;
- default:
- throw new TProtocolException($"Don't know what exType: {(byte) (type & 0x0f)}");
- }
+ return CompactTypeToTType[type & 0x0f];
}
private static ulong LongToZigzag(long n)
diff --git a/lib/netstd/Thrift/Protocol/TJSONProtocol.cs b/lib/netstd/Thrift/Protocol/TJSONProtocol.cs
index 1298052..464bd62 100644
--- a/lib/netstd/Thrift/Protocol/TJSONProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TJSONProtocol.cs
@@ -396,7 +396,7 @@
/// Read in a JSON string, unescaping as appropriate.. Skip Reading from the
/// context if skipContext is true.
/// </summary>
- private async Task<byte[]> ReadJsonStringAsync(bool skipContext, CancellationToken cancellationToken)
+ private async ValueTask<byte[]> ReadJsonStringAsync(bool skipContext, CancellationToken cancellationToken)
{
using (var buffer = new MemoryStream())
{
@@ -487,7 +487,7 @@
/// Read in a sequence of characters that are all valid in JSON numbers. Does
/// not do a complete regex check to validate that this is actually a number.
/// </summary>
- private async Task<string> ReadJsonNumericCharsAsync(CancellationToken cancellationToken)
+ private async ValueTask<string> ReadJsonNumericCharsAsync(CancellationToken cancellationToken)
{
var strbld = new StringBuilder();
while (true)
@@ -514,7 +514,7 @@
/// <summary>
/// Read in a JSON number. If the context dictates, Read in enclosing quotes.
/// </summary>
- private async Task<long> ReadJsonIntegerAsync(CancellationToken cancellationToken)
+ private async ValueTask<long> ReadJsonIntegerAsync(CancellationToken cancellationToken)
{
await Context.ReadConditionalDelimiterAsync(cancellationToken);
if (Context.EscapeNumbers())
@@ -542,7 +542,7 @@
/// Read in a JSON double value. Throw if the value is not wrapped in quotes
/// when expected or if wrapped in quotes when not expected.
/// </summary>
- private async Task<double> ReadJsonDoubleAsync(CancellationToken cancellationToken)
+ private async ValueTask<double> ReadJsonDoubleAsync(CancellationToken cancellationToken)
{
await Context.ReadConditionalDelimiterAsync(cancellationToken);
if (await Reader.PeekAsync(cancellationToken) == TJSONProtocolConstants.Quote[0])
@@ -578,7 +578,7 @@
/// <summary>
/// Read in a JSON string containing base-64 encoded data and decode it.
/// </summary>
- private async Task<byte[]> ReadJsonBase64Async(CancellationToken cancellationToken)
+ private async ValueTask<byte[]> ReadJsonBase64Async(CancellationToken cancellationToken)
{
var b = await ReadJsonStringAsync(false, cancellationToken);
var len = b.Length;
@@ -642,7 +642,7 @@
PopContext();
}
- public override async Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
{
var message = new TMessage();
await ReadJsonArrayStartAsync(cancellationToken);
@@ -663,7 +663,7 @@
await ReadJsonArrayEndAsync(cancellationToken);
}
- public override async Task<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
{
await ReadJsonObjectStartAsync(cancellationToken);
return new TStruct();
@@ -674,7 +674,7 @@
await ReadJsonObjectEndAsync(cancellationToken);
}
- public override async Task<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
{
var field = new TField();
var ch = await Reader.PeekAsync(cancellationToken);
@@ -696,7 +696,7 @@
await ReadJsonObjectEndAsync(cancellationToken);
}
- public override async Task<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
{
var map = new TMap();
await ReadJsonArrayStartAsync(cancellationToken);
@@ -713,7 +713,7 @@
await ReadJsonArrayEndAsync(cancellationToken);
}
- public override async Task<TList> ReadListBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
{
var list = new TList();
await ReadJsonArrayStartAsync(cancellationToken);
@@ -727,7 +727,7 @@
await ReadJsonArrayEndAsync(cancellationToken);
}
- public override async Task<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
{
var set = new TSet();
await ReadJsonArrayStartAsync(cancellationToken);
@@ -741,43 +741,43 @@
await ReadJsonArrayEndAsync(cancellationToken);
}
- public override async Task<bool> ReadBoolAsync(CancellationToken cancellationToken)
+ public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
{
return await ReadJsonIntegerAsync(cancellationToken) != 0;
}
- public override async Task<sbyte> ReadByteAsync(CancellationToken cancellationToken)
+ public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
{
return (sbyte) await ReadJsonIntegerAsync(cancellationToken);
}
- public override async Task<short> ReadI16Async(CancellationToken cancellationToken)
+ public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
{
return (short) await ReadJsonIntegerAsync(cancellationToken);
}
- public override async Task<int> ReadI32Async(CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
{
return (int) await ReadJsonIntegerAsync(cancellationToken);
}
- public override async Task<long> ReadI64Async(CancellationToken cancellationToken)
+ public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
{
return await ReadJsonIntegerAsync(cancellationToken);
}
- public override async Task<double> ReadDoubleAsync(CancellationToken cancellationToken)
+ public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
{
return await ReadJsonDoubleAsync(cancellationToken);
}
- public override async Task<string> ReadStringAsync(CancellationToken cancellationToken)
+ public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
{
var buf = await ReadJsonStringAsync(false, cancellationToken);
return Utf8Encoding.GetString(buf, 0, buf.Length);
}
- public override async Task<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
+ public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
{
return await ReadJsonBase64Async(cancellationToken);
}
@@ -938,7 +938,7 @@
/// Return and consume the next byte to be Read, either taking it from the
/// data buffer if present or getting it from the transport otherwise.
/// </summary>
- public async Task<byte> ReadAsync(CancellationToken cancellationToken)
+ public async ValueTask<byte> ReadAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -961,7 +961,7 @@
/// Return the next byte to be Read without consuming, filling the data
/// buffer if it has not been filled alReady.
/// </summary>
- public async Task<byte> PeekAsync(CancellationToken cancellationToken)
+ public async ValueTask<byte> PeekAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
@@ -972,8 +972,8 @@
{
// find more easy way to avoid exception on reading primitive types
await Proto.Trans.ReadAllAsync(_data, 0, 1, cancellationToken);
+ _hasData = true;
}
- _hasData = true;
return _data[0];
}
}
diff --git a/lib/netstd/Thrift/Protocol/TProtocol.cs b/lib/netstd/Thrift/Protocol/TProtocol.cs
index 1bc91eb..75edb11 100644
--- a/lib/netstd/Thrift/Protocol/TProtocol.cs
+++ b/lib/netstd/Thrift/Protocol/TProtocol.cs
@@ -1,4 +1,4 @@
-// Licensed to the Apache Software Foundation(ASF) under one
+// Licensed to the Apache Software Foundation(ASF) under one
// or more contributor license agreements.See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.The ASF licenses this file
@@ -229,12 +229,12 @@
public abstract Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken);
- public virtual async Task<TMessage> ReadMessageBeginAsync()
+ public virtual async ValueTask<TMessage> ReadMessageBeginAsync()
{
return await ReadMessageBeginAsync(CancellationToken.None);
}
- public abstract Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken);
public virtual async Task ReadMessageEndAsync()
{
@@ -243,12 +243,12 @@
public abstract Task ReadMessageEndAsync(CancellationToken cancellationToken);
- public virtual async Task<TStruct> ReadStructBeginAsync()
+ public virtual async ValueTask<TStruct> ReadStructBeginAsync()
{
return await ReadStructBeginAsync(CancellationToken.None);
}
- public abstract Task<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken);
public virtual async Task ReadStructEndAsync()
{
@@ -257,12 +257,12 @@
public abstract Task ReadStructEndAsync(CancellationToken cancellationToken);
- public virtual async Task<TField> ReadFieldBeginAsync()
+ public virtual async ValueTask<TField> ReadFieldBeginAsync()
{
return await ReadFieldBeginAsync(CancellationToken.None);
}
- public abstract Task<TField> ReadFieldBeginAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken);
public virtual async Task ReadFieldEndAsync()
{
@@ -271,12 +271,12 @@
public abstract Task ReadFieldEndAsync(CancellationToken cancellationToken);
- public virtual async Task<TMap> ReadMapBeginAsync()
+ public virtual async ValueTask<TMap> ReadMapBeginAsync()
{
return await ReadMapBeginAsync(CancellationToken.None);
}
- public abstract Task<TMap> ReadMapBeginAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken);
public virtual async Task ReadMapEndAsync()
{
@@ -285,12 +285,12 @@
public abstract Task ReadMapEndAsync(CancellationToken cancellationToken);
- public virtual async Task<TList> ReadListBeginAsync()
+ public virtual async ValueTask<TList> ReadListBeginAsync()
{
return await ReadListBeginAsync(CancellationToken.None);
}
- public abstract Task<TList> ReadListBeginAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken);
public virtual async Task ReadListEndAsync()
{
@@ -299,12 +299,12 @@
public abstract Task ReadListEndAsync(CancellationToken cancellationToken);
- public virtual async Task<TSet> ReadSetBeginAsync()
+ public virtual async ValueTask<TSet> ReadSetBeginAsync()
{
return await ReadSetBeginAsync(CancellationToken.None);
}
- public abstract Task<TSet> ReadSetBeginAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken);
public virtual async Task ReadSetEndAsync()
{
@@ -313,64 +313,64 @@
public abstract Task ReadSetEndAsync(CancellationToken cancellationToken);
- public virtual async Task<bool> ReadBoolAsync()
+ public virtual async ValueTask<bool> ReadBoolAsync()
{
return await ReadBoolAsync(CancellationToken.None);
}
- public abstract Task<bool> ReadBoolAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken);
- public virtual async Task<sbyte> ReadByteAsync()
+ public virtual async ValueTask<sbyte> ReadByteAsync()
{
return await ReadByteAsync(CancellationToken.None);
}
- public abstract Task<sbyte> ReadByteAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken);
- public virtual async Task<short> ReadI16Async()
+ public virtual async ValueTask<short> ReadI16Async()
{
return await ReadI16Async(CancellationToken.None);
}
- public abstract Task<short> ReadI16Async(CancellationToken cancellationToken);
+ public abstract ValueTask<short> ReadI16Async(CancellationToken cancellationToken);
- public virtual async Task<int> ReadI32Async()
+ public virtual async ValueTask<int> ReadI32Async()
{
return await ReadI32Async(CancellationToken.None);
}
- public abstract Task<int> ReadI32Async(CancellationToken cancellationToken);
+ public abstract ValueTask<int> ReadI32Async(CancellationToken cancellationToken);
- public virtual async Task<long> ReadI64Async()
+ public virtual async ValueTask<long> ReadI64Async()
{
return await ReadI64Async(CancellationToken.None);
}
- public abstract Task<long> ReadI64Async(CancellationToken cancellationToken);
+ public abstract ValueTask<long> ReadI64Async(CancellationToken cancellationToken);
- public virtual async Task<double> ReadDoubleAsync()
+ public virtual async ValueTask<double> ReadDoubleAsync()
{
return await ReadDoubleAsync(CancellationToken.None);
}
- public abstract Task<double> ReadDoubleAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken);
- public virtual async Task<string> ReadStringAsync()
+ public virtual async ValueTask<string> ReadStringAsync()
{
return await ReadStringAsync(CancellationToken.None);
}
- public virtual async Task<string> ReadStringAsync(CancellationToken cancellationToken)
+ public virtual async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
{
var buf = await ReadBinaryAsync(cancellationToken);
return Encoding.UTF8.GetString(buf, 0, buf.Length);
}
- public virtual async Task<byte[]> ReadBinaryAsync()
+ public virtual async ValueTask<byte[]> ReadBinaryAsync()
{
return await ReadBinaryAsync(CancellationToken.None);
}
- public abstract Task<byte[]> ReadBinaryAsync(CancellationToken cancellationToken);
+ public abstract ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken);
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs b/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
index c8a433d..845c827 100644
--- a/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
+++ b/lib/netstd/Thrift/Protocol/TProtocolDecorator.cs
@@ -1,4 +1,4 @@
-// Licensed to the Apache Software Foundation(ASF) under one
+// Licensed to the Apache Software Foundation(ASF) under one
// or more contributor license agreements.See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.The ASF licenses this file
@@ -144,7 +144,7 @@
await _wrappedProtocol.WriteBinaryAsync(bytes, cancellationToken);
}
- public override async Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadMessageBeginAsync(cancellationToken);
}
@@ -154,7 +154,7 @@
await _wrappedProtocol.ReadMessageEndAsync(cancellationToken);
}
- public override async Task<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadStructBeginAsync(cancellationToken);
}
@@ -164,7 +164,7 @@
await _wrappedProtocol.ReadStructEndAsync(cancellationToken);
}
- public override async Task<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadFieldBeginAsync(cancellationToken);
}
@@ -174,7 +174,7 @@
await _wrappedProtocol.ReadFieldEndAsync(cancellationToken);
}
- public override async Task<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadMapBeginAsync(cancellationToken);
}
@@ -184,7 +184,7 @@
await _wrappedProtocol.ReadMapEndAsync(cancellationToken);
}
- public override async Task<TList> ReadListBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadListBeginAsync(cancellationToken);
}
@@ -194,7 +194,7 @@
await _wrappedProtocol.ReadListEndAsync(cancellationToken);
}
- public override async Task<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
+ public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadSetBeginAsync(cancellationToken);
}
@@ -204,44 +204,44 @@
await _wrappedProtocol.ReadSetEndAsync(cancellationToken);
}
- public override async Task<bool> ReadBoolAsync(CancellationToken cancellationToken)
+ public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadBoolAsync(cancellationToken);
}
- public override async Task<sbyte> ReadByteAsync(CancellationToken cancellationToken)
+ public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadByteAsync(cancellationToken);
}
- public override async Task<short> ReadI16Async(CancellationToken cancellationToken)
+ public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadI16Async(cancellationToken);
}
- public override async Task<int> ReadI32Async(CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadI32Async(cancellationToken);
}
- public override async Task<long> ReadI64Async(CancellationToken cancellationToken)
+ public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadI64Async(cancellationToken);
}
- public override async Task<double> ReadDoubleAsync(CancellationToken cancellationToken)
+ public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadDoubleAsync(cancellationToken);
}
- public override async Task<string> ReadStringAsync(CancellationToken cancellationToken)
+ public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadStringAsync(cancellationToken);
}
- public override async Task<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
+ public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
{
return await _wrappedProtocol.ReadBinaryAsync(cancellationToken);
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Server/TServerEventHandler.cs b/lib/netstd/Thrift/Server/TServerEventHandler.cs
index 0c31bf6..69314ef 100644
--- a/lib/netstd/Thrift/Server/TServerEventHandler.cs
+++ b/lib/netstd/Thrift/Server/TServerEventHandler.cs
@@ -1,4 +1,4 @@
-// Licensed to the Apache Software Foundation(ASF) under one
+// Licensed to the Apache Software Foundation(ASF) under one
// or more contributor license agreements.See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.The ASF licenses this file
@@ -51,4 +51,4 @@
/// </summary>
Task ProcessContextAsync(object serverContext, TTransport transport, CancellationToken cancellationToken);
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/TApplicationException.cs b/lib/netstd/Thrift/TApplicationException.cs
index 9c86898..67ac2f8 100644
--- a/lib/netstd/Thrift/TApplicationException.cs
+++ b/lib/netstd/Thrift/TApplicationException.cs
@@ -61,7 +61,7 @@
Type = type;
}
- public static async Task<TApplicationException> ReadAsync(TProtocol inputProtocol, CancellationToken cancellationToken)
+ public static async ValueTask<TApplicationException> ReadAsync(TProtocol inputProtocol, CancellationToken cancellationToken)
{
string message = null;
var type = ExceptionType.Unknown;
diff --git a/lib/netstd/Thrift/Thrift.csproj b/lib/netstd/Thrift/Thrift.csproj
index d093803..70d9df3 100644
--- a/lib/netstd/Thrift/Thrift.csproj
+++ b/lib/netstd/Thrift/Thrift.csproj
@@ -49,6 +49,7 @@
<PackageReference Include="System.Net.NameResolution" Version="[4.3,)" />
<PackageReference Include="System.Net.Requests" Version="[4.3,)" />
<PackageReference Include="System.Net.Security" Version="4.3.2" />
+ <PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.2" />
</ItemGroup>
</Project>
diff --git a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
index 627c93d..5d7f1de 100644
--- a/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/THttpTransport.cs
@@ -102,8 +102,7 @@
}
}
- public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
- CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
diff --git a/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs b/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
index 75529d1..25895c2 100644
--- a/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TMemoryBufferTransport.cs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+using System;
+using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -24,17 +26,59 @@
// ReSharper disable once InconsistentNaming
public class TMemoryBufferTransport : TTransport
{
- private readonly MemoryStream _byteStream;
- private bool _isDisposed;
+ private bool IsDisposed;
+ private byte[] Bytes;
+ private int _bytesUsed;
public TMemoryBufferTransport()
{
- _byteStream = new MemoryStream();
+ Bytes = new byte[2048]; // default size
+ }
+
+ public TMemoryBufferTransport(int initialCapacity)
+ {
+ Bytes = new byte[initialCapacity]; // default size
}
public TMemoryBufferTransport(byte[] buf)
{
- _byteStream = new MemoryStream(buf);
+ Bytes = (byte[])buf.Clone();
+ _bytesUsed = Bytes.Length;
+ }
+
+ public int Position { get; set; }
+
+ public int Capacity
+ {
+ get
+ {
+ Debug.Assert(_bytesUsed <= Bytes.Length);
+ return Bytes.Length;
+ }
+ set
+ {
+ Array.Resize(ref Bytes, value);
+ _bytesUsed = value;
+ }
+ }
+
+ public int Length
+ {
+ get {
+ Debug.Assert(_bytesUsed <= Bytes.Length);
+ return _bytesUsed;
+ }
+ set {
+ if ((Bytes.Length < value) || (Bytes.Length > (10 * value)))
+ Array.Resize(ref Bytes, Math.Max(2048, (int)(value * 1.25)));
+ _bytesUsed = value;
+ }
+ }
+
+ public void SetLength(int value)
+ {
+ Length = value;
+ Position = Math.Min(Position, value);
}
public override bool IsOpen => true;
@@ -52,20 +96,49 @@
/** do nothing **/
}
- public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
- CancellationToken cancellationToken)
+ public void Seek(int delta, SeekOrigin origin)
{
- return await _byteStream.ReadAsync(buffer, offset, length, cancellationToken);
+ int newPos;
+ switch (origin)
+ {
+ case SeekOrigin.Begin:
+ newPos = delta;
+ break;
+ case SeekOrigin.Current:
+ newPos = Position + delta;
+ break;
+ case SeekOrigin.End:
+ newPos = _bytesUsed + delta;
+ break;
+ default:
+ throw new ArgumentException(nameof(origin));
+ }
+
+ if ((0 > newPos) || (newPos > _bytesUsed))
+ throw new ArgumentException(nameof(origin));
+ Position = newPos;
}
- public override async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
+ public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- await _byteStream.WriteAsync(buffer, 0, buffer.Length, cancellationToken);
+ var count = Math.Min(Length - Position, length);
+ Buffer.BlockCopy(Bytes, Position, buffer, offset, count);
+ Position += count;
+ return new ValueTask<int>(count);
}
- public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
+ public override Task WriteAsync(byte[] buffer, CancellationToken cancellationToken)
{
- await _byteStream.WriteAsync(buffer, offset, length, cancellationToken);
+ return WriteAsync(buffer, 0, buffer.Length, cancellationToken);
+ }
+
+ public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ {
+ var free = Length - Position;
+ Length = Length + count - free;
+ Buffer.BlockCopy(buffer, offset, Bytes, Position, count);
+ Position += count;
+ return Task.CompletedTask;
}
public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -78,20 +151,29 @@
public byte[] GetBuffer()
{
- return _byteStream.ToArray();
+ var retval = new byte[Length];
+ Buffer.BlockCopy(Bytes, 0, retval, 0, Length);
+ return retval;
}
+ internal bool TryGetBuffer(out ArraySegment<byte> bufSegment)
+ {
+ bufSegment = new ArraySegment<byte>(Bytes, 0, _bytesUsed);
+ return true;
+ }
+
+
// IDisposable
protected override void Dispose(bool disposing)
{
- if (!_isDisposed)
+ if (!IsDisposed)
{
if (disposing)
{
- _byteStream?.Dispose();
+ // nothing to do
}
}
- _isDisposed = true;
+ IsDisposed = true;
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
index b78c791..2f96a6a 100644
--- a/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TNamedPipeTransport.cs
@@ -1,4 +1,4 @@
-// Licensed to the Apache Software Foundation(ASF) under one
+// Licensed to the Apache Software Foundation(ASF) under one
// or more contributor license agreements.See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.The ASF licenses this file
@@ -61,8 +61,7 @@
}
}
- public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
- CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (_client == null)
{
@@ -95,4 +94,4 @@
_client.Dispose();
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs b/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
index 9b03533..d8574d6 100644
--- a/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
+++ b/lib/netstd/Thrift/Transport/Client/TStreamTransport.cs
@@ -65,8 +65,7 @@
}
}
- public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
- CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (InputStream == null)
{
@@ -107,4 +106,4 @@
_isDisposed = true;
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
index 8147d67..31a052a 100644
--- a/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TNamedPipeServerTransport.cs
@@ -210,7 +210,7 @@
#endregion
- protected override async Task<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
+ protected override async ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
{
try
{
@@ -261,8 +261,7 @@
_stream?.Dispose();
}
- public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
- CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (_stream == null)
{
@@ -272,8 +271,7 @@
return await _stream.ReadAsync(buffer, offset, length, cancellationToken);
}
- public override async Task WriteAsync(byte[] buffer, int offset, int length,
- CancellationToken cancellationToken)
+ public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (_stream == null)
{
diff --git a/lib/netstd/Thrift/Transport/Server/TServerSocketTransport.cs b/lib/netstd/Thrift/Transport/Server/TServerSocketTransport.cs
index 0f90841..86d82e3 100644
--- a/lib/netstd/Thrift/Transport/Server/TServerSocketTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TServerSocketTransport.cs
@@ -74,7 +74,7 @@
return _server.Pending();
}
- protected override async Task<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
+ protected override async ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
diff --git a/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs b/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs
index 5070919..1286805 100644
--- a/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs
+++ b/lib/netstd/Thrift/Transport/Server/TTlsServerSocketTransport.cs
@@ -99,7 +99,7 @@
return _server.Pending();
}
- protected override async Task<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
+ protected override async ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
diff --git a/lib/netstd/Thrift/Transport/TBufferedTransport.cs b/lib/netstd/Thrift/Transport/TBufferedTransport.cs
index c648f5c..e4fdd3a 100644
--- a/lib/netstd/Thrift/Transport/TBufferedTransport.cs
+++ b/lib/netstd/Thrift/Transport/TBufferedTransport.cs
@@ -1,4 +1,4 @@
-// Licensed to the Apache Software Foundation(ASF) under one
+// Licensed to the Apache Software Foundation(ASF) under one
// or more contributor license agreements.See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.The ASF licenses this file
@@ -16,6 +16,7 @@
// under the License.
using System;
+using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -25,11 +26,11 @@
// ReSharper disable once InconsistentNaming
public class TBufferedTransport : TTransport
{
- private readonly int _bufSize;
- private readonly MemoryStream _inputBuffer = new MemoryStream(0);
- private readonly MemoryStream _outputBuffer = new MemoryStream(0);
- private readonly TTransport _transport;
- private bool _isDisposed;
+ private readonly int DesiredBufferSize;
+ private readonly Client.TMemoryBufferTransport ReadBuffer = new Client.TMemoryBufferTransport(1024);
+ private readonly Client.TMemoryBufferTransport WriteBuffer = new Client.TMemoryBufferTransport(1024);
+ private readonly TTransport InnerTransport;
+ private bool IsDisposed;
public class Factory : TTransportFactory
{
@@ -47,8 +48,13 @@
throw new ArgumentOutOfRangeException(nameof(bufSize), "Buffer size must be a positive number.");
}
- _transport = transport ?? throw new ArgumentNullException(nameof(transport));
- _bufSize = bufSize;
+ InnerTransport = transport ?? throw new ArgumentNullException(nameof(transport));
+ DesiredBufferSize = bufSize;
+
+ if (DesiredBufferSize != ReadBuffer.Capacity)
+ ReadBuffer.Capacity = DesiredBufferSize;
+ if (DesiredBufferSize != WriteBuffer.Capacity)
+ WriteBuffer.Capacity = DesiredBufferSize;
}
public TTransport UnderlyingTransport
@@ -57,32 +63,29 @@
{
CheckNotDisposed();
- return _transport;
+ return InnerTransport;
}
}
- public override bool IsOpen => !_isDisposed && _transport.IsOpen;
+ public override bool IsOpen => !IsDisposed && InnerTransport.IsOpen;
public override async Task OpenAsync(CancellationToken cancellationToken)
{
CheckNotDisposed();
- await _transport.OpenAsync(cancellationToken);
+ await InnerTransport.OpenAsync(cancellationToken);
}
public override void Close()
{
CheckNotDisposed();
- _transport.Close();
+ InnerTransport.Close();
}
- public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
- CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
- //TODO: investigate how it should work correctly
CheckNotDisposed();
-
ValidateBufferArgs(buffer, offset, length);
if (!IsOpen)
@@ -90,39 +93,36 @@
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- if (_inputBuffer.Capacity < _bufSize)
+
+ // do we have something buffered?
+ var count = ReadBuffer.Length - ReadBuffer.Position;
+ if (count > 0)
{
- _inputBuffer.Capacity = _bufSize;
+ return await ReadBuffer.ReadAsync(buffer, offset, length, cancellationToken);
}
- var got = await _inputBuffer.ReadAsync(buffer, offset, length, cancellationToken);
- if (got > 0)
+ // does the request even fit into the buffer?
+ // Note we test for >= instead of > to avoid nonsense buffering
+ if (length >= ReadBuffer.Capacity)
{
- return got;
+ return await InnerTransport.ReadAsync(buffer, offset, length, cancellationToken);
}
- _inputBuffer.Seek(0, SeekOrigin.Begin);
- _inputBuffer.SetLength(_inputBuffer.Capacity);
-
+ // buffer a new chunk of bytes from the underlying transport
+ ReadBuffer.Length = ReadBuffer.Capacity;
ArraySegment<byte> bufSegment;
- _inputBuffer.TryGetBuffer(out bufSegment);
+ ReadBuffer.TryGetBuffer(out bufSegment);
+ ReadBuffer.Length = await InnerTransport.ReadAsync(bufSegment.Array, 0, bufSegment.Count, cancellationToken);
+ ReadBuffer.Position = 0;
- // investigate
- var filled = await _transport.ReadAsync(bufSegment.Array, 0, (int) _inputBuffer.Length, cancellationToken);
- _inputBuffer.SetLength(filled);
-
- if (filled == 0)
- {
- return 0;
- }
-
- return await ReadAsync(buffer, offset, length, cancellationToken);
+ // deliver the bytes
+ return await ReadBuffer.ReadAsync(buffer, offset, length, cancellationToken);
}
+
public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
CheckNotDisposed();
-
ValidateBufferArgs(buffer, offset, length);
if (!IsOpen)
@@ -130,41 +130,26 @@
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- // Relative offset from "off" argument
- var writtenCount = 0;
- if (_outputBuffer.Length > 0)
+ // enough space left in buffer?
+ var free = WriteBuffer.Capacity - WriteBuffer.Length;
+ if (length > free)
{
- var capa = (int) (_outputBuffer.Capacity - _outputBuffer.Length);
- var writeSize = capa <= length ? capa : length;
- await _outputBuffer.WriteAsync(buffer, offset, writeSize, cancellationToken);
-
- writtenCount += writeSize;
- if (writeSize == capa)
- {
- //ArraySegment<byte> bufSegment;
- //_outputBuffer.TryGetBuffer(out bufSegment);
- var data = _outputBuffer.ToArray();
- //await _transport.WriteAsync(bufSegment.Array, cancellationToken);
- await _transport.WriteAsync(data, cancellationToken);
- _outputBuffer.SetLength(0);
- }
+ ArraySegment<byte> bufSegment;
+ WriteBuffer.TryGetBuffer(out bufSegment);
+ await InnerTransport.WriteAsync(bufSegment.Array, 0, bufSegment.Count, cancellationToken);
+ WriteBuffer.SetLength(0);
}
- while (length - writtenCount >= _bufSize)
+ // do the data even fit into the buffer?
+ // Note we test for < instead of <= to avoid nonsense buffering
+ if (length < WriteBuffer.Capacity)
{
- await _transport.WriteAsync(buffer, offset + writtenCount, _bufSize, cancellationToken);
- writtenCount += _bufSize;
+ await WriteBuffer.WriteAsync(buffer, offset, length, cancellationToken);
+ return;
}
- var remain = length - writtenCount;
- if (remain > 0)
- {
- if (_outputBuffer.Capacity < _bufSize)
- {
- _outputBuffer.Capacity = _bufSize;
- }
- await _outputBuffer.WriteAsync(buffer, offset + writtenCount, remain, cancellationToken);
- }
+ // write thru
+ await InnerTransport.WriteAsync(buffer, offset, length, cancellationToken);
}
public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -176,38 +161,38 @@
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- if (_outputBuffer.Length > 0)
+ if (WriteBuffer.Length > 0)
{
- var data = _outputBuffer.ToArray();
-
- await _transport.WriteAsync(data /*bufSegment.Array*/, cancellationToken);
- _outputBuffer.SetLength(0);
+ ArraySegment<byte> bufSegment;
+ WriteBuffer.TryGetBuffer(out bufSegment);
+ await InnerTransport.WriteAsync(bufSegment.Array, 0, bufSegment.Count, cancellationToken);
+ WriteBuffer.SetLength(0);
}
- await _transport.FlushAsync(cancellationToken);
+ await InnerTransport.FlushAsync(cancellationToken);
}
private void CheckNotDisposed()
{
- if (_isDisposed)
+ if (IsDisposed)
{
- throw new ObjectDisposedException(nameof(_transport));
+ throw new ObjectDisposedException(nameof(InnerTransport));
}
}
// IDisposable
protected override void Dispose(bool disposing)
{
- if (!_isDisposed)
+ if (!IsDisposed)
{
if (disposing)
{
- _inputBuffer?.Dispose();
- _outputBuffer?.Dispose();
- _transport?.Dispose();
+ ReadBuffer?.Dispose();
+ WriteBuffer?.Dispose();
+ InnerTransport?.Dispose();
}
}
- _isDisposed = true;
+ IsDisposed = true;
}
}
-}
\ No newline at end of file
+}
diff --git a/lib/netstd/Thrift/Transport/TFramedTransport.cs b/lib/netstd/Thrift/Transport/TFramedTransport.cs
index fe7793e..de6df72 100644
--- a/lib/netstd/Thrift/Transport/TFramedTransport.cs
+++ b/lib/netstd/Thrift/Transport/TFramedTransport.cs
@@ -26,12 +26,12 @@
public class TFramedTransport : TTransport
{
private const int HeaderSize = 4;
- private readonly byte[] _headerBuf = new byte[HeaderSize];
- private readonly MemoryStream _readBuffer = new MemoryStream(1024);
- private readonly TTransport _transport;
- private readonly MemoryStream _writeBuffer = new MemoryStream(1024);
+ private readonly byte[] HeaderBuf = new byte[HeaderSize];
+ private readonly Client.TMemoryBufferTransport ReadBuffer = new Client.TMemoryBufferTransport();
+ private readonly Client.TMemoryBufferTransport WriteBuffer = new Client.TMemoryBufferTransport();
+ private readonly TTransport InnerTransport;
- private bool _isDisposed;
+ private bool IsDisposed;
public class Factory : TTransportFactory
{
@@ -43,32 +43,30 @@
public TFramedTransport(TTransport transport)
{
- _transport = transport ?? throw new ArgumentNullException(nameof(transport));
+ InnerTransport = transport ?? throw new ArgumentNullException(nameof(transport));
InitWriteBuffer();
}
- public override bool IsOpen => !_isDisposed && _transport.IsOpen;
+ public override bool IsOpen => !IsDisposed && InnerTransport.IsOpen;
public override async Task OpenAsync(CancellationToken cancellationToken)
{
CheckNotDisposed();
- await _transport.OpenAsync(cancellationToken);
+ await InnerTransport.OpenAsync(cancellationToken);
}
public override void Close()
{
CheckNotDisposed();
- _transport.Close();
+ InnerTransport.Close();
}
- public override async Task<int> ReadAsync(byte[] buffer, int offset, int length,
- CancellationToken cancellationToken)
+ public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
CheckNotDisposed();
-
ValidateBufferArgs(buffer, offset, length);
if (!IsOpen)
@@ -76,39 +74,31 @@
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- var got = await _readBuffer.ReadAsync(buffer, offset, length, cancellationToken);
- if (got > 0)
+ // Read another frame of data if we run out of bytes
+ if (ReadBuffer.Position >= ReadBuffer.Length)
{
- return got;
+ await ReadFrameAsync(cancellationToken);
}
- // Read another frame of data
- await ReadFrameAsync(cancellationToken);
-
- return await _readBuffer.ReadAsync(buffer, offset, length, cancellationToken);
+ return await ReadBuffer.ReadAsync(buffer, offset, length, cancellationToken);
}
- private async Task ReadFrameAsync(CancellationToken cancellationToken)
+ private async ValueTask ReadFrameAsync(CancellationToken cancellationToken)
{
- await _transport.ReadAllAsync(_headerBuf, 0, HeaderSize, cancellationToken);
+ await InnerTransport.ReadAllAsync(HeaderBuf, 0, HeaderSize, cancellationToken);
+ var size = DecodeFrameSize(HeaderBuf);
- var size = DecodeFrameSize(_headerBuf);
-
- _readBuffer.SetLength(size);
- _readBuffer.Seek(0, SeekOrigin.Begin);
+ ReadBuffer.SetLength(size);
+ ReadBuffer.Seek(0, SeekOrigin.Begin);
ArraySegment<byte> bufSegment;
- _readBuffer.TryGetBuffer(out bufSegment);
-
- var buff = bufSegment.Array;
-
- await _transport.ReadAllAsync(buff, 0, size, cancellationToken);
+ ReadBuffer.TryGetBuffer(out bufSegment);
+ await InnerTransport.ReadAllAsync(bufSegment.Array, 0, size, cancellationToken);
}
public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
CheckNotDisposed();
-
ValidateBufferArgs(buffer, offset, length);
if (!IsOpen)
@@ -116,12 +106,12 @@
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- if (_writeBuffer.Length + length > int.MaxValue)
+ if (WriteBuffer.Length > (int.MaxValue - length))
{
await FlushAsync(cancellationToken);
}
- await _writeBuffer.WriteAsync(buffer, offset, length, cancellationToken);
+ await WriteBuffer.WriteAsync(buffer, offset, length, cancellationToken);
}
public override async Task FlushAsync(CancellationToken cancellationToken)
@@ -133,34 +123,31 @@
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
- //ArraySegment<byte> bufSegment;
- //_writeBuffer.TryGetBuffer(out bufSegment);
- //var buf = bufSegment.Array;
- var buf = _writeBuffer.ToArray();
+ ArraySegment<byte> bufSegment;
+ WriteBuffer.TryGetBuffer(out bufSegment);
- //var len = (int)_writeBuffer.Length;
- var dataLen = (int) _writeBuffer.Length - HeaderSize;
+ int dataLen = bufSegment.Count - HeaderSize;
if (dataLen < 0)
{
throw new InvalidOperationException(); // logic error actually
}
// Inject message header into the reserved buffer space
- EncodeFrameSize(dataLen, buf);
+ EncodeFrameSize(dataLen, bufSegment.Array);
// Send the entire message at once
- await _transport.WriteAsync(buf, cancellationToken);
+ await InnerTransport.WriteAsync(bufSegment.Array, 0, bufSegment.Count, cancellationToken);
InitWriteBuffer();
- await _transport.FlushAsync(cancellationToken);
+ await InnerTransport.FlushAsync(cancellationToken);
}
private void InitWriteBuffer()
{
// Reserve space for message header to be put right before sending it out
- _writeBuffer.SetLength(HeaderSize);
- _writeBuffer.Seek(0, SeekOrigin.End);
+ WriteBuffer.SetLength(HeaderSize);
+ WriteBuffer.Seek(0, SeekOrigin.End);
}
private static void EncodeFrameSize(int frameSize, byte[] buf)
@@ -183,25 +170,25 @@
private void CheckNotDisposed()
{
- if (_isDisposed)
+ if (IsDisposed)
{
- throw new ObjectDisposedException("TFramedTransport");
+ throw new ObjectDisposedException(this.GetType().Name);
}
}
// IDisposable
protected override void Dispose(bool disposing)
{
- if (!_isDisposed)
+ if (!IsDisposed)
{
if (disposing)
{
- _readBuffer?.Dispose();
- _writeBuffer?.Dispose();
- _transport?.Dispose();
+ ReadBuffer?.Dispose();
+ WriteBuffer?.Dispose();
+ InnerTransport?.Dispose();
}
}
- _isDisposed = true;
+ IsDisposed = true;
}
}
}
diff --git a/lib/netstd/Thrift/Transport/TServerTransport.cs b/lib/netstd/Thrift/Transport/TServerTransport.cs
index e25c0c5..74c54cd 100644
--- a/lib/netstd/Thrift/Transport/TServerTransport.cs
+++ b/lib/netstd/Thrift/Transport/TServerTransport.cs
@@ -27,19 +27,19 @@
public abstract void Close();
public abstract bool IsClientPending();
- protected virtual async Task<TTransport> AcceptImplementationAsync()
+ protected virtual async ValueTask<TTransport> AcceptImplementationAsync()
{
return await AcceptImplementationAsync(CancellationToken.None);
}
- protected abstract Task<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken);
+ protected abstract ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken);
- public async Task<TTransport> AcceptAsync()
+ public async ValueTask<TTransport> AcceptAsync()
{
return await AcceptAsync(CancellationToken.None);
}
- public async Task<TTransport> AcceptAsync(CancellationToken cancellationToken)
+ public async ValueTask<TTransport> AcceptAsync(CancellationToken cancellationToken)
{
var transport = await AcceptImplementationAsync(cancellationToken);
diff --git a/lib/netstd/Thrift/Transport/TTransport.cs b/lib/netstd/Thrift/Transport/TTransport.cs
index d5c8186..fd802ec 100644
--- a/lib/netstd/Thrift/Transport/TTransport.cs
+++ b/lib/netstd/Thrift/Transport/TTransport.cs
@@ -1,4 +1,4 @@
-// Licensed to the Apache Software Foundation(ASF) under one
+// Licensed to the Apache Software Foundation(ASF) under one
// or more contributor license agreements.See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.The ASF licenses this file
@@ -16,6 +16,7 @@
// under the License.
using System;
+using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -37,7 +38,7 @@
GC.SuppressFinalize(this);
}
- public async Task<bool> PeekAsync(CancellationToken cancellationToken)
+ public async ValueTask<bool> PeekAsync(CancellationToken cancellationToken)
{
//If we already have a byte read but not consumed, do nothing.
if (_hasPeekByte)
@@ -85,69 +86,80 @@
throw new ArgumentNullException(nameof(buffer));
}
+#if DEBUG // let it fail with OutOfRange in RELEASE mode
if (offset < 0)
{
- throw new ArgumentOutOfRangeException(nameof(offset), "Buffer offset is smaller than zero.");
+ throw new ArgumentOutOfRangeException(nameof(offset), "Buffer offset must be >= 0");
}
if (length < 0)
{
- throw new ArgumentOutOfRangeException(nameof(length), "Buffer length is smaller than zero.");
+ throw new ArgumentOutOfRangeException(nameof(length), "Buffer length must be >= 0");
}
if (offset + length > buffer.Length)
{
- throw new ArgumentOutOfRangeException(nameof(buffer), "Not enough data.");
+ throw new ArgumentOutOfRangeException(nameof(buffer), "Not enough data");
}
+#endif
}
- public virtual async Task<int> ReadAsync(byte[] buffer, int offset, int length)
+ public virtual async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length)
{
return await ReadAsync(buffer, offset, length, CancellationToken.None);
}
- public abstract Task<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
+ public abstract ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken);
- public virtual async Task<int> ReadAllAsync(byte[] buffer, int offset, int length)
+ public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length)
{
return await ReadAllAsync(buffer, offset, length, CancellationToken.None);
}
- public virtual async Task<int> ReadAllAsync(byte[] buffer, int offset, int length,
- CancellationToken cancellationToken)
+ public virtual async ValueTask<int> ReadAllAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
ValidateBufferArgs(buffer, offset, length);
if (cancellationToken.IsCancellationRequested)
- {
return await Task.FromCanceled<int>(cancellationToken);
- }
- var retrieved = 0;
+ if (length <= 0)
+ return 0;
- //If we previously peeked a byte, we need to use that first.
+ // If we previously peeked a byte, we need to use that first.
+ var totalBytes = 0;
if (_hasPeekByte)
{
- buffer[offset + retrieved++] = _peekBuffer[0];
+ buffer[offset++] = _peekBuffer[0];
_hasPeekByte = false;
+ if (1 == length)
+ {
+ Debug.Assert(totalBytes == 1); // what else?
+ return 1; // we're done
+ }
+ ++totalBytes;
}
- while (retrieved < length)
+ var remaining = length - totalBytes;
+ Debug.Assert(remaining > 0); // any other possible cases should have been handled already
+ while (true)
{
- if (cancellationToken.IsCancellationRequested)
+ var numBytes = await ReadAsync(buffer, offset, remaining, cancellationToken);
+ totalBytes += numBytes;
+ if (totalBytes >= length)
{
- return await Task.FromCanceled<int>(cancellationToken);
+ return totalBytes; // we're done
}
- var returnedCount = await ReadAsync(buffer, offset + retrieved, length - retrieved, cancellationToken);
- if (returnedCount <= 0)
+ if (numBytes <= 0)
{
throw new TTransportException(TTransportException.ExceptionType.EndOfFile,
"Cannot read, Remote side has closed");
}
- retrieved += returnedCount;
+
+ remaining -= numBytes;
+ offset += numBytes;
}
- return retrieved;
}
public virtual async Task WriteAsync(byte[] buffer)
@@ -176,4 +188,4 @@
protected abstract void Dispose(bool disposing);
}
-}
\ No newline at end of file
+}
diff --git a/test/netstd/Client/Performance/PerformanceTests.cs b/test/netstd/Client/Performance/PerformanceTests.cs
new file mode 100644
index 0000000..041d12e
--- /dev/null
+++ b/test/netstd/Client/Performance/PerformanceTests.cs
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using ThriftTest;
+using Thrift.Collections;
+using Thrift.Protocol;
+using System.Threading;
+using Thrift.Transport.Client;
+using System.Threading.Tasks;
+using System.Diagnostics;
+using Thrift.Transport;
+
+namespace Client.Tests
+{
+ public class PerformanceTests
+ {
+ private CancellationTokenSource Cancel;
+ private CrazyNesting Testdata;
+ private TMemoryBufferTransport MemBuffer;
+ private TTransport Transport;
+ private LayeredChoice Layered;
+
+ internal static int Execute()
+ {
+ var instance = new PerformanceTests();
+ instance.ProtocolPeformanceTestAsync().Wait();
+
+ // debug only
+ if (Debugger.IsAttached)
+ {
+ Console.Write("Hit ENTER ...");
+ Console.ReadKey();
+ }
+
+ return 0;
+ }
+
+ private async Task ProtocolPeformanceTestAsync()
+ {
+ Console.WriteLine("Setting up for ProtocolPeformanceTestAsync ...");
+ Cancel = new CancellationTokenSource();
+ Testdata = TestDataFactory.CreateCrazyNesting();
+
+ foreach (var layered in Enum.GetValues(typeof(LayeredChoice)))
+ {
+ Layered = (LayeredChoice)layered;
+
+ await RunTestAsync(async (bool b) => { return await GenericProtocolFactory<TBinaryProtocol>(b); });
+ await RunTestAsync(async (bool b) => { return await GenericProtocolFactory<TCompactProtocol>(b); });
+ //await RunTestAsync(async (bool b) => { return await GenericProtocolFactory<TJsonProtocol>(b); });
+ }
+ }
+
+ private Task<TProtocol> GenericProtocolFactory<T>(bool forWrite)
+ where T : TProtocol
+ {
+ var oldTrans = Transport;
+ try
+ {
+ // read happens after write here, so let's take over the written bytes
+ if (forWrite)
+ MemBuffer = new TMemoryBufferTransport();
+ else
+ MemBuffer = new TMemoryBufferTransport(MemBuffer.GetBuffer());
+
+ // layered transports anyone?
+ switch (Layered)
+ {
+ case LayeredChoice.None:
+ Transport = MemBuffer;
+ break;
+ case LayeredChoice.Framed:
+ Transport = new TFramedTransport(MemBuffer);
+ break;
+ case LayeredChoice.Buffered:
+ Transport = new TBufferedTransport(MemBuffer);
+ break;
+ default:
+ Debug.Assert(false);
+ break;
+ }
+
+ if (!Transport.IsOpen)
+ Transport.OpenAsync().Wait();
+
+ var instance = (T)Activator.CreateInstance(typeof(T), Transport);
+ return Task.FromResult<TProtocol>(instance);
+ }
+ finally
+ {
+ if (oldTrans is IDisposable)
+ (oldTrans as IDisposable).Dispose();
+ }
+ }
+
+ private string GetProtocolTransportName(TProtocol proto)
+ {
+ var name = Transport.GetType().Name;
+ if (name.Equals(MemBuffer.GetType().Name))
+ name = string.Empty;
+ else
+ name = " + " + name;
+
+ name = proto.GetType().Name + name;
+ return name;
+ }
+
+
+ private async Task RunTestAsync(Func<bool, Task<TProtocol>> factory)
+ {
+ var stop = new Stopwatch();
+
+ var proto = await factory(true);
+ stop.Start();
+ await Testdata.WriteAsync(proto, Cancel.Token);
+ await Transport.FlushAsync(Cancel.Token);
+ stop.Stop();
+ Console.WriteLine("RunTestAsync({0}): write = {1} msec",
+ GetProtocolTransportName(proto),
+ stop.ElapsedMilliseconds);
+
+ var restored = new CrazyNesting();
+ proto = await factory(false);
+ stop.Start();
+ await restored.ReadAsync(proto, Cancel.Token);
+ stop.Stop();
+ Console.WriteLine("RunTestAsync({0}): read = {1} msec",
+ GetProtocolTransportName(proto),
+ stop.ElapsedMilliseconds);
+ }
+
+ }
+}
diff --git a/test/netstd/Client/Performance/TestDataFactory.cs b/test/netstd/Client/Performance/TestDataFactory.cs
new file mode 100644
index 0000000..9896285
--- /dev/null
+++ b/test/netstd/Client/Performance/TestDataFactory.cs
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using ThriftTest;
+using Thrift.Collections;
+
+namespace Client.Tests
+{
+
+ static class TestDataFactory
+ {
+ public static CrazyNesting CreateCrazyNesting(int count = 10)
+ {
+ if (count <= 0)
+ return null;
+
+ return new CrazyNesting()
+ {
+ Binary_field = CreateBytesArray(count),
+ List_field = CreateListField(count),
+ Set_field = CreateSetField(count),
+ String_field = string.Format("data level {0}", count)
+ };
+ }
+
+ private static THashSet<Insanity> CreateSetField(int count)
+ {
+ var retval = new THashSet<Insanity>();
+ for (var i = 0; i < count; ++i)
+ retval.Add(CreateInsanity(count));
+ return retval;
+ }
+
+ private static Insanity CreateInsanity(int count)
+ {
+ return new Insanity()
+ {
+ UserMap = CreateUserMap(count),
+ Xtructs = CreateXtructs(count)
+ };
+ }
+
+ private static List<Xtruct> CreateXtructs(int count)
+ {
+ var retval = new List<Xtruct>();
+ for (var i = 0; i < count; ++i)
+ retval.Add(CreateXtruct(count));
+ return retval;
+ }
+
+ private static Xtruct CreateXtruct(int count)
+ {
+ return new Xtruct()
+ {
+ Byte_thing = (sbyte)(count % 128),
+ I32_thing = count,
+ I64_thing = count,
+ String_thing = string.Format("data level {0}", count)
+ };
+ }
+
+ private static Dictionary<Numberz, long> CreateUserMap(int count)
+ {
+ var retval = new Dictionary<Numberz, long>();
+ retval.Add(Numberz.ONE, count);
+ retval.Add(Numberz.TWO, count);
+ retval.Add(Numberz.THREE, count);
+ retval.Add(Numberz.FIVE, count);
+ retval.Add(Numberz.SIX, count);
+ retval.Add(Numberz.EIGHT, count);
+ return retval;
+ }
+
+ private static List<Dictionary<THashSet<int>, Dictionary<int, THashSet<List<Dictionary<Insanity, string>>>>>> CreateListField(int count)
+ {
+ var retval = new List<Dictionary<THashSet<int>, Dictionary<int, THashSet<List<Dictionary<Insanity, string>>>>>>();
+ for (var i = 0; i < count; ++i)
+ retval.Add(CreateListFieldData(count));
+ return retval;
+ }
+
+ private static Dictionary<THashSet<int>, Dictionary<int, THashSet<List<Dictionary<Insanity, string>>>>> CreateListFieldData(int count)
+ {
+ var retval = new Dictionary<THashSet<int>, Dictionary<int, THashSet<List<Dictionary<Insanity, string>>>>>();
+ for (var i = 0; i < count; ++i)
+ retval.Add( CreateIntHashSet(count), CreateListFieldDataDict(count));
+ return retval;
+ }
+
+ private static THashSet<int> CreateIntHashSet(int count)
+ {
+ var retval = new THashSet<int>();
+ for (var i = 0; i < count; ++i)
+ retval.Add(i);
+ return retval;
+ }
+
+ private static Dictionary<int, THashSet<List<Dictionary<Insanity, string>>>> CreateListFieldDataDict(int count)
+ {
+ var retval = new Dictionary<int, THashSet<List<Dictionary<Insanity, string>>>>();
+ for (var i = 0; i < count; ++i)
+ retval.Add(i, CreateListFieldDataDictValue(count));
+ return retval;
+ }
+
+ private static THashSet<List<Dictionary<Insanity, string>>> CreateListFieldDataDictValue(int count)
+ {
+ var retval = new THashSet<List<Dictionary<Insanity, string>>>();
+ for (var i = 0; i < count; ++i)
+ retval.Add( CreateListFieldDataDictValueList(count));
+ return retval;
+ }
+
+ private static List<Dictionary<Insanity, string>> CreateListFieldDataDictValueList(int count)
+ {
+ var retval = new List<Dictionary<Insanity, string>>();
+ for (var i = 0; i < count; ++i)
+ retval.Add(CreateListFieldDataDictValueListDict(count));
+ return retval;
+ }
+
+ private static Dictionary<Insanity, string> CreateListFieldDataDictValueListDict(int count)
+ {
+ var retval = new Dictionary<Insanity, string>();
+ retval.Add(CreateInsanity(count), string.Format("data level {0}", count));
+ return retval;
+ }
+
+ private static byte[] CreateBytesArray(int count)
+ {
+ var retval = new byte[count];
+ for (var i = 0; i < count; ++i)
+ retval[i] = (byte)(i % 0xFF);
+ return retval;
+ }
+ }
+}
diff --git a/test/netstd/Client/Program.cs b/test/netstd/Client/Program.cs
index 8d973c4..62933e6 100644
--- a/test/netstd/Client/Program.cs
+++ b/test/netstd/Client/Program.cs
@@ -48,6 +48,8 @@
{
case "client":
return TestClient.Execute(subArgs);
+ case "performance":
+ return Tests.PerformanceTests.Execute();
case "--help":
PrintHelp();
return 0;
@@ -61,7 +63,8 @@
private static void PrintHelp()
{
Console.WriteLine("Usage:");
- Console.WriteLine(" Client client [options]'");
+ Console.WriteLine(" Client client [options]");
+ Console.WriteLine(" Client performance");
Console.WriteLine(" Client --help");
Console.WriteLine("");