blob: bb531f42eda1a2dfd1960d435ce515a6165b26eb [file] [log] [blame]
Jens Geyer421444f2019-03-20 22:13:25 +01001// Licensed to the Apache Software Foundation(ASF) under one
Jens Geyeraa0c8b32019-01-28 23:27:45 +01002// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
Mikel Blanchard4b66a9d2020-03-05 00:46:21 +010019using System.Buffers;
zembord9d958a32019-11-21 13:11:44 +030020using System.Buffers.Binary;
Jens Geyeraa0c8b32019-01-28 23:27:45 +010021using System.Collections.Generic;
Jens Geyer5a17b132019-05-26 15:53:37 +020022using System.Diagnostics;
Jens Geyeraa0c8b32019-01-28 23:27:45 +010023using System.Text;
24using System.Threading;
25using System.Threading.Tasks;
26using Thrift.Protocol.Entities;
27using Thrift.Transport;
28
29namespace Thrift.Protocol
30{
31 //TODO: implementation of TProtocol
32
33 // ReSharper disable once InconsistentNaming
34 public class TCompactProtocol : TProtocol
35 {
36 private const byte ProtocolId = 0x82;
37 private const byte Version = 1;
38 private const byte VersionMask = 0x1f; // 0001 1111
39 private const byte TypeMask = 0xE0; // 1110 0000
40 private const byte TypeBits = 0x07; // 0000 0111
41 private const int TypeShiftAmount = 5;
42 private static readonly TStruct AnonymousStruct = new TStruct(string.Empty);
Jens Geyer5a17b132019-05-26 15:53:37 +020043 private static readonly TField StopField = new TField(string.Empty, TType.Stop, 0);
44
45 private const byte NoTypeOverride = 0xFF;
Jens Geyeraa0c8b32019-01-28 23:27:45 +010046
47 // ReSharper disable once InconsistentNaming
48 private static readonly byte[] TTypeToCompactType = new byte[16];
Jens Geyer5a17b132019-05-26 15:53:37 +020049 private static readonly TType[] CompactTypeToTType = new TType[13];
Jens Geyeraa0c8b32019-01-28 23:27:45 +010050
51 /// <summary>
52 /// Used to keep track of the last field for the current and previous structs, so we can do the delta stuff.
53 /// </summary>
54 private readonly Stack<short> _lastField = new Stack<short>(15);
55
56 /// <summary>
57 /// If we encounter a boolean field begin, save the TField here so it can have the value incorporated.
58 /// </summary>
59 private TField? _booleanField;
60
61 /// <summary>
62 /// If we Read a field header, and it's a boolean field, save the boolean value here so that ReadBool can use it.
63 /// </summary>
64 private bool? _boolValue;
65
66 private short _lastFieldId;
67
Jens Geyer5a17b132019-05-26 15:53:37 +020068 // minimize memory allocations by means of an preallocated bytes buffer
69 // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long)
Mikel Blanchard4b66a9d2020-03-05 00:46:21 +010070 private readonly byte[] PreAllocatedBuffer = new byte[128];
Jens Geyer5a17b132019-05-26 15:53:37 +020071
72 private struct VarInt
73 {
74 public byte[] bytes;
75 public int count;
76 }
77
78 // minimize memory allocations by means of an preallocated VarInt buffer
79 private VarInt PreAllocatedVarInt = new VarInt()
80 {
81 bytes = new byte[10], // see Int64ToVarInt()
82 count = 0
83 };
84
85
86
87
Jens Geyeraa0c8b32019-01-28 23:27:45 +010088 public TCompactProtocol(TTransport trans)
89 : base(trans)
90 {
91 TTypeToCompactType[(int) TType.Stop] = Types.Stop;
92 TTypeToCompactType[(int) TType.Bool] = Types.BooleanTrue;
93 TTypeToCompactType[(int) TType.Byte] = Types.Byte;
94 TTypeToCompactType[(int) TType.I16] = Types.I16;
95 TTypeToCompactType[(int) TType.I32] = Types.I32;
96 TTypeToCompactType[(int) TType.I64] = Types.I64;
97 TTypeToCompactType[(int) TType.Double] = Types.Double;
98 TTypeToCompactType[(int) TType.String] = Types.Binary;
99 TTypeToCompactType[(int) TType.List] = Types.List;
100 TTypeToCompactType[(int) TType.Set] = Types.Set;
101 TTypeToCompactType[(int) TType.Map] = Types.Map;
102 TTypeToCompactType[(int) TType.Struct] = Types.Struct;
Jens Geyer5a17b132019-05-26 15:53:37 +0200103
104 CompactTypeToTType[Types.Stop] = TType.Stop;
105 CompactTypeToTType[Types.BooleanTrue] = TType.Bool;
106 CompactTypeToTType[Types.BooleanFalse] = TType.Bool;
107 CompactTypeToTType[Types.Byte] = TType.Byte;
108 CompactTypeToTType[Types.I16] = TType.I16;
109 CompactTypeToTType[Types.I32] = TType.I32;
110 CompactTypeToTType[Types.I64] = TType.I64;
111 CompactTypeToTType[Types.Double] = TType.Double;
112 CompactTypeToTType[Types.Binary] = TType.String;
113 CompactTypeToTType[Types.List] = TType.List;
114 CompactTypeToTType[Types.Set] = TType.Set;
115 CompactTypeToTType[Types.Map] = TType.Map;
116 CompactTypeToTType[Types.Struct] = TType.Struct;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100117 }
118
119 public void Reset()
120 {
121 _lastField.Clear();
122 _lastFieldId = 0;
123 }
124
125 public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
126 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200127 PreAllocatedBuffer[0] = ProtocolId;
128 PreAllocatedBuffer[1] = (byte)((Version & VersionMask) | (((uint)message.Type << TypeShiftAmount) & TypeMask));
129 await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100130
Jens Geyer5a17b132019-05-26 15:53:37 +0200131 Int32ToVarInt((uint) message.SeqID, ref PreAllocatedVarInt);
132 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100133
134 await WriteStringAsync(message.Name, cancellationToken);
135 }
136
137 public override async Task WriteMessageEndAsync(CancellationToken cancellationToken)
138 {
139 if (cancellationToken.IsCancellationRequested)
140 {
141 await Task.FromCanceled(cancellationToken);
142 }
143 }
144
145 /// <summary>
146 /// Write a struct begin. This doesn't actually put anything on the wire. We
147 /// use it as an opportunity to put special placeholder markers on the field
148 /// stack so we can get the field id deltas correct.
149 /// </summary>
150 public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken)
151 {
152 if (cancellationToken.IsCancellationRequested)
153 {
154 await Task.FromCanceled(cancellationToken);
155 }
156
157 _lastField.Push(_lastFieldId);
158 _lastFieldId = 0;
159 }
160
161 public override async Task WriteStructEndAsync(CancellationToken cancellationToken)
162 {
163 if (cancellationToken.IsCancellationRequested)
164 {
165 await Task.FromCanceled(cancellationToken);
166 }
167
168 _lastFieldId = _lastField.Pop();
169 }
170
Jens Geyer5a17b132019-05-26 15:53:37 +0200171 private async Task WriteFieldBeginInternalAsync(TField field, byte fieldType, CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100172 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200173 // if there's a exType override passed in, use that. Otherwise ask GetCompactType().
174 if (fieldType == NoTypeOverride)
175 fieldType = GetCompactType(field.Type);
176
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100177
178 // check if we can use delta encoding for the field id
Jens Geyer5a17b132019-05-26 15:53:37 +0200179 if (field.ID > _lastFieldId)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100180 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200181 var delta = field.ID - _lastFieldId;
182 if (delta <= 15)
183 {
184 // Write them together
185 PreAllocatedBuffer[0] = (byte)((delta << 4) | fieldType);
186 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
187 _lastFieldId = field.ID;
188 return;
189 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100190 }
191
Jens Geyer5a17b132019-05-26 15:53:37 +0200192 // Write them separate
193 PreAllocatedBuffer[0] = fieldType;
194 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
195 await WriteI16Async(field.ID, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100196 _lastFieldId = field.ID;
197 }
198
199 public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)
200 {
201 if (field.Type == TType.Bool)
202 {
203 _booleanField = field;
204 }
205 else
206 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200207 await WriteFieldBeginInternalAsync(field, NoTypeOverride, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100208 }
209 }
210
211 public override async Task WriteFieldEndAsync(CancellationToken cancellationToken)
212 {
213 if (cancellationToken.IsCancellationRequested)
214 {
215 await Task.FromCanceled(cancellationToken);
216 }
217 }
218
219 public override async Task WriteFieldStopAsync(CancellationToken cancellationToken)
220 {
221 if (cancellationToken.IsCancellationRequested)
222 {
223 return;
224 }
225
Jens Geyer5a17b132019-05-26 15:53:37 +0200226 PreAllocatedBuffer[0] = Types.Stop;
227 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100228 }
229
230 protected async Task WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken)
231 {
232 if (cancellationToken.IsCancellationRequested)
233 {
234 return;
235 }
236
237 /*
238 Abstract method for writing the start of lists and sets. List and sets on
239 the wire differ only by the exType indicator.
240 */
241
242 if (size <= 14)
243 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200244 PreAllocatedBuffer[0] = (byte)((size << 4) | GetCompactType(elemType));
245 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100246 }
247 else
248 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200249 PreAllocatedBuffer[0] = (byte)(0xf0 | GetCompactType(elemType));
250 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100251
Jens Geyer5a17b132019-05-26 15:53:37 +0200252 Int32ToVarInt((uint) size, ref PreAllocatedVarInt);
253 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100254 }
255 }
256
257 public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken)
258 {
259 await WriteCollectionBeginAsync(list.ElementType, list.Count, cancellationToken);
260 }
261
262 public override async Task WriteListEndAsync(CancellationToken cancellationToken)
263 {
264 if (cancellationToken.IsCancellationRequested)
265 {
266 await Task.FromCanceled(cancellationToken);
267 }
268 }
269
270 public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)
271 {
272 if (cancellationToken.IsCancellationRequested)
273 {
274 return;
275 }
276
277 await WriteCollectionBeginAsync(set.ElementType, set.Count, cancellationToken);
278 }
279
280 public override async Task WriteSetEndAsync(CancellationToken cancellationToken)
281 {
282 if (cancellationToken.IsCancellationRequested)
283 {
284 await Task.FromCanceled(cancellationToken);
285 }
286 }
287
288 public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken)
289 {
290 if (cancellationToken.IsCancellationRequested)
291 {
292 return;
293 }
294
295 /*
296 Write a boolean value. Potentially, this could be a boolean field, in
297 which case the field header info isn't written yet. If so, decide what the
298 right exType header is for the value and then Write the field header.
299 Otherwise, Write a single byte.
300 */
301
302 if (_booleanField != null)
303 {
304 // we haven't written the field header yet
Jens Geyer5a17b132019-05-26 15:53:37 +0200305 var type = b ? Types.BooleanTrue : Types.BooleanFalse;
306 await WriteFieldBeginInternalAsync(_booleanField.Value, type, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100307 _booleanField = null;
308 }
309 else
310 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200311 // we're not part of a field, so just write the value.
312 PreAllocatedBuffer[0] = b ? Types.BooleanTrue : Types.BooleanFalse;
313 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100314 }
315 }
316
317 public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
318 {
319 if (cancellationToken.IsCancellationRequested)
320 {
321 return;
322 }
323
Jens Geyer5a17b132019-05-26 15:53:37 +0200324 PreAllocatedBuffer[0] = (byte)b;
325 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100326 }
327
328 public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
329 {
330 if (cancellationToken.IsCancellationRequested)
331 {
332 return;
333 }
334
Jens Geyer5a17b132019-05-26 15:53:37 +0200335 Int32ToVarInt(IntToZigzag(i16), ref PreAllocatedVarInt);
336 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100337 }
338
Jens Geyer5a17b132019-05-26 15:53:37 +0200339 private static void Int32ToVarInt(uint n, ref VarInt varint)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100340 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200341 // Write an i32 as a varint. Results in 1 - 5 bytes on the wire.
342 varint.count = 0;
343 Debug.Assert(varint.bytes.Length >= 5);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100344
345 while (true)
346 {
347 if ((n & ~0x7F) == 0)
348 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200349 varint.bytes[varint.count++] = (byte)n;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100350 break;
351 }
352
Jens Geyer5a17b132019-05-26 15:53:37 +0200353 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100354 n >>= 7;
355 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100356 }
357
358 public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
359 {
360 if (cancellationToken.IsCancellationRequested)
361 {
362 return;
363 }
364
Jens Geyer5a17b132019-05-26 15:53:37 +0200365 Int32ToVarInt(IntToZigzag(i32), ref PreAllocatedVarInt);
366 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100367 }
368
Jens Geyer5a17b132019-05-26 15:53:37 +0200369 static private void Int64ToVarInt(ulong n, ref VarInt varint)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100370 {
371 // Write an i64 as a varint. Results in 1-10 bytes on the wire.
Jens Geyer5a17b132019-05-26 15:53:37 +0200372 varint.count = 0;
373 Debug.Assert(varint.bytes.Length >= 10);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100374
375 while (true)
376 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200377 if ((n & ~(ulong)0x7FL) == 0)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100378 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200379 varint.bytes[varint.count++] = (byte)n;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100380 break;
381 }
Jens Geyer5a17b132019-05-26 15:53:37 +0200382 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100383 n >>= 7;
384 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100385 }
386
387 public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
388 {
389 if (cancellationToken.IsCancellationRequested)
390 {
391 return;
392 }
393
Jens Geyer5a17b132019-05-26 15:53:37 +0200394 Int64ToVarInt(LongToZigzag(i64), ref PreAllocatedVarInt);
395 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100396 }
397
398 public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
399 {
400 if (cancellationToken.IsCancellationRequested)
401 {
402 return;
403 }
zembord9d958a32019-11-21 13:11:44 +0300404 BinaryPrimitives.WriteInt64LittleEndian(PreAllocatedBuffer, BitConverter.DoubleToInt64Bits(d));
Jens Geyer5a17b132019-05-26 15:53:37 +0200405 await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100406 }
407
408 public override async Task WriteStringAsync(string str, CancellationToken cancellationToken)
409 {
410 if (cancellationToken.IsCancellationRequested)
411 {
412 return;
413 }
414
Mikel Blanchard4b66a9d2020-03-05 00:46:21 +0100415 var buf = ArrayPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(str));
416 try
417 {
418 var numberOfBytes = Encoding.UTF8.GetBytes(str, 0, str.Length, buf, 0);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100419
Mikel Blanchard4b66a9d2020-03-05 00:46:21 +0100420 Int32ToVarInt((uint)numberOfBytes, ref PreAllocatedVarInt);
421 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
422 await Trans.WriteAsync(buf, 0, numberOfBytes, cancellationToken);
423 }
424 finally
425 {
426 ArrayPool<byte>.Shared.Return(buf);
427 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100428 }
429
430 public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
431 {
432 if (cancellationToken.IsCancellationRequested)
433 {
434 return;
435 }
436
Jens Geyer5a17b132019-05-26 15:53:37 +0200437 Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt);
438 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100439 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
440 }
441
442 public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)
443 {
444 if (cancellationToken.IsCancellationRequested)
445 {
446 return;
447 }
Jens Geyer5a17b132019-05-26 15:53:37 +0200448
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100449 if (map.Count == 0)
450 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200451 PreAllocatedBuffer[0] = 0;
452 await Trans.WriteAsync( PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100453 }
454 else
455 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200456 Int32ToVarInt((uint) map.Count, ref PreAllocatedVarInt);
457 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
458
459 PreAllocatedBuffer[0] = (byte)((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType));
460 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100461 }
462 }
463
464 public override async Task WriteMapEndAsync(CancellationToken cancellationToken)
465 {
466 if (cancellationToken.IsCancellationRequested)
467 {
468 await Task.FromCanceled(cancellationToken);
469 }
470 }
471
Jens Geyer5a17b132019-05-26 15:53:37 +0200472 public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100473 {
474 if (cancellationToken.IsCancellationRequested)
475 {
476 return await Task.FromCanceled<TMessage>(cancellationToken);
477 }
478
479 var protocolId = (byte) await ReadByteAsync(cancellationToken);
480 if (protocolId != ProtocolId)
481 {
482 throw new TProtocolException($"Expected protocol id {ProtocolId:X} but got {protocolId:X}");
483 }
484
485 var versionAndType = (byte) await ReadByteAsync(cancellationToken);
486 var version = (byte) (versionAndType & VersionMask);
487
488 if (version != Version)
489 {
490 throw new TProtocolException($"Expected version {Version} but got {version}");
491 }
492
493 var type = (byte) ((versionAndType >> TypeShiftAmount) & TypeBits);
494 var seqid = (int) await ReadVarInt32Async(cancellationToken);
495 var messageName = await ReadStringAsync(cancellationToken);
496
497 return new TMessage(messageName, (TMessageType) type, seqid);
498 }
499
500 public override async Task ReadMessageEndAsync(CancellationToken cancellationToken)
501 {
502 if (cancellationToken.IsCancellationRequested)
503 {
504 await Task.FromCanceled(cancellationToken);
505 }
506 }
507
Jens Geyer5a17b132019-05-26 15:53:37 +0200508 public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100509 {
510 if (cancellationToken.IsCancellationRequested)
511 {
512 return await Task.FromCanceled<TStruct>(cancellationToken);
513 }
514
515 // some magic is here )
516
517 _lastField.Push(_lastFieldId);
518 _lastFieldId = 0;
519
520 return AnonymousStruct;
521 }
522
523 public override async Task ReadStructEndAsync(CancellationToken cancellationToken)
524 {
525 if (cancellationToken.IsCancellationRequested)
526 {
527 await Task.FromCanceled(cancellationToken);
528 }
529
530 /*
531 Doesn't actually consume any wire data, just removes the last field for
532 this struct from the field stack.
533 */
534
535 // consume the last field we Read off the wire.
536 _lastFieldId = _lastField.Pop();
537 }
538
Jens Geyer5a17b132019-05-26 15:53:37 +0200539 public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100540 {
541 // Read a field header off the wire.
542 var type = (byte) await ReadByteAsync(cancellationToken);
Jens Geyer5a17b132019-05-26 15:53:37 +0200543
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100544 // if it's a stop, then we can return immediately, as the struct is over.
545 if (type == Types.Stop)
546 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200547 return StopField;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100548 }
549
Jens Geyer5a17b132019-05-26 15:53:37 +0200550
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100551 // mask off the 4 MSB of the exType header. it could contain a field id delta.
552 var modifier = (short) ((type & 0xf0) >> 4);
Jens Geyer5a17b132019-05-26 15:53:37 +0200553 var compactType = (byte)(type & 0x0f);
554
555 short fieldId;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100556 if (modifier == 0)
557 {
558 fieldId = await ReadI16Async(cancellationToken);
559 }
560 else
561 {
562 fieldId = (short) (_lastFieldId + modifier);
563 }
564
Jens Geyer5a17b132019-05-26 15:53:37 +0200565 var ttype = GetTType(compactType);
566 var field = new TField(string.Empty, ttype, fieldId);
567
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100568 // if this happens to be a boolean field, the value is encoded in the exType
Jens Geyer5a17b132019-05-26 15:53:37 +0200569 if( ttype == TType.Bool)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100570 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200571 _boolValue = (compactType == Types.BooleanTrue);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100572 }
573
574 // push the new field onto the field stack so we can keep the deltas going.
575 _lastFieldId = field.ID;
576 return field;
577 }
578
579 public override async Task ReadFieldEndAsync(CancellationToken cancellationToken)
580 {
581 if (cancellationToken.IsCancellationRequested)
582 {
583 await Task.FromCanceled(cancellationToken);
584 }
585 }
586
Jens Geyer5a17b132019-05-26 15:53:37 +0200587 public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100588 {
589 if (cancellationToken.IsCancellationRequested)
590 {
591 await Task.FromCanceled<TMap>(cancellationToken);
592 }
593
594 /*
595 Read a map header off the wire. If the size is zero, skip Reading the key
596 and value exType. This means that 0-length maps will yield TMaps without the
597 "correct" types.
598 */
599
600 var size = (int) await ReadVarInt32Async(cancellationToken);
601 var keyAndValueType = size == 0 ? (byte) 0 : (byte) await ReadByteAsync(cancellationToken);
Jens Geyer50806452019-11-23 01:55:58 +0100602 var map = new TMap(GetTType((byte) (keyAndValueType >> 4)), GetTType((byte) (keyAndValueType & 0xf)), size);
603 CheckReadBytesAvailable(map);
604 return map;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100605 }
606
607 public override async Task ReadMapEndAsync(CancellationToken cancellationToken)
608 {
609 if (cancellationToken.IsCancellationRequested)
610 {
611 await Task.FromCanceled(cancellationToken);
612 }
613 }
614
Jens Geyer5a17b132019-05-26 15:53:37 +0200615 public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100616 {
617 /*
618 Read a set header off the wire. If the set size is 0-14, the size will
619 be packed into the element exType header. If it's a longer set, the 4 MSB
620 of the element exType header will be 0xF, and a varint will follow with the
621 true size.
622 */
623
624 return new TSet(await ReadListBeginAsync(cancellationToken));
625 }
626
Jens Geyer5a17b132019-05-26 15:53:37 +0200627 public override ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100628 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100629 /*
630 Read a boolean off the wire. If this is a boolean field, the value should
631 already have been Read during ReadFieldBegin, so we'll just consume the
632 pre-stored value. Otherwise, Read a byte.
633 */
634
635 if (_boolValue != null)
636 {
637 var result = _boolValue.Value;
638 _boolValue = null;
Jens Geyer5a17b132019-05-26 15:53:37 +0200639 return new ValueTask<bool>(result);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100640 }
641
Jens Geyer5a17b132019-05-26 15:53:37 +0200642 return InternalCall();
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100643
Jens Geyer5a17b132019-05-26 15:53:37 +0200644 async ValueTask<bool> InternalCall()
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100645 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200646 var data = await ReadByteAsync(cancellationToken);
647 return (data == Types.BooleanTrue);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100648 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100649 }
650
Jens Geyer5a17b132019-05-26 15:53:37 +0200651
652 public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
653 {
654 // Read a single byte off the wire. Nothing interesting here.
655 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
656 return (sbyte)PreAllocatedBuffer[0];
657 }
658
659 public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100660 {
661 if (cancellationToken.IsCancellationRequested)
662 {
663 return await Task.FromCanceled<short>(cancellationToken);
664 }
665
666 return (short) ZigzagToInt(await ReadVarInt32Async(cancellationToken));
667 }
668
Jens Geyer5a17b132019-05-26 15:53:37 +0200669 public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100670 {
671 if (cancellationToken.IsCancellationRequested)
672 {
673 return await Task.FromCanceled<int>(cancellationToken);
674 }
675
676 return ZigzagToInt(await ReadVarInt32Async(cancellationToken));
677 }
678
Jens Geyer5a17b132019-05-26 15:53:37 +0200679 public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100680 {
681 if (cancellationToken.IsCancellationRequested)
682 {
683 return await Task.FromCanceled<long>(cancellationToken);
684 }
685
686 return ZigzagToLong(await ReadVarInt64Async(cancellationToken));
687 }
688
Jens Geyer5a17b132019-05-26 15:53:37 +0200689 public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100690 {
691 if (cancellationToken.IsCancellationRequested)
692 {
693 return await Task.FromCanceled<double>(cancellationToken);
694 }
695
Jens Geyer5a17b132019-05-26 15:53:37 +0200696 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
zembord9d958a32019-11-21 13:11:44 +0300697
698 return BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64LittleEndian(PreAllocatedBuffer));
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100699 }
700
Jens Geyer5a17b132019-05-26 15:53:37 +0200701 public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100702 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200703 // read length
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100704 var length = (int) await ReadVarInt32Async(cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100705 if (length == 0)
706 {
707 return string.Empty;
708 }
709
Jens Geyer5a17b132019-05-26 15:53:37 +0200710 // read and decode data
711 if (length < PreAllocatedBuffer.Length)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100712 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200713 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, length, cancellationToken);
714 return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, length);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100715 }
716
Jens Geyer50806452019-11-23 01:55:58 +0100717 Transport.CheckReadBytesAvailable(length);
Mikel Blanchard4b66a9d2020-03-05 00:46:21 +0100718
719 var buf = ArrayPool<byte>.Shared.Rent(length);
720 try
721 {
722 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
723 return Encoding.UTF8.GetString(buf, 0, length);
724 }
725 finally
726 {
727 ArrayPool<byte>.Shared.Return(buf);
728 }
Jens Geyer5a17b132019-05-26 15:53:37 +0200729 }
730
731 public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
732 {
733 // read length
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100734 var length = (int) await ReadVarInt32Async(cancellationToken);
735 if (length == 0)
736 {
Mikel Blanchard4b66a9d2020-03-05 00:46:21 +0100737 return Array.Empty<byte>();
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100738 }
739
Jens Geyer5a17b132019-05-26 15:53:37 +0200740 // read data
Jens Geyer50806452019-11-23 01:55:58 +0100741 Transport.CheckReadBytesAvailable(length);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100742 var buf = new byte[length];
743 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
744 return buf;
745 }
746
Jens Geyer5a17b132019-05-26 15:53:37 +0200747 public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100748 {
749 if (cancellationToken.IsCancellationRequested)
750 {
751 await Task.FromCanceled<TList>(cancellationToken);
752 }
753
754 /*
755 Read a list header off the wire. If the list size is 0-14, the size will
756 be packed into the element exType header. If it's a longer list, the 4 MSB
757 of the element exType header will be 0xF, and a varint will follow with the
758 true size.
759 */
760
761 var sizeAndType = (byte) await ReadByteAsync(cancellationToken);
762 var size = (sizeAndType >> 4) & 0x0f;
763 if (size == 15)
764 {
765 size = (int) await ReadVarInt32Async(cancellationToken);
766 }
767
768 var type = GetTType(sizeAndType);
Jens Geyer50806452019-11-23 01:55:58 +0100769 var list = new TList(type, size);
770 CheckReadBytesAvailable(list);
771 return list;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100772 }
773
774 public override async Task ReadListEndAsync(CancellationToken cancellationToken)
775 {
776 if (cancellationToken.IsCancellationRequested)
777 {
778 await Task.FromCanceled(cancellationToken);
779 }
780 }
781
782 public override async Task ReadSetEndAsync(CancellationToken cancellationToken)
783 {
784 if (cancellationToken.IsCancellationRequested)
785 {
786 await Task.FromCanceled(cancellationToken);
787 }
788 }
789
790 private static byte GetCompactType(TType ttype)
791 {
792 // Given a TType value, find the appropriate TCompactProtocol.Types constant.
793 return TTypeToCompactType[(int) ttype];
794 }
795
796
Jens Geyer5a17b132019-05-26 15:53:37 +0200797 private async ValueTask<uint> ReadVarInt32Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100798 {
799 if (cancellationToken.IsCancellationRequested)
800 {
801 return await Task.FromCanceled<uint>(cancellationToken);
802 }
803
804 /*
805 Read an i32 from the wire as a varint. The MSB of each byte is set
806 if there is another byte to follow. This can Read up to 5 bytes.
807 */
808
809 uint result = 0;
810 var shift = 0;
811
812 while (true)
813 {
814 var b = (byte) await ReadByteAsync(cancellationToken);
815 result |= (uint) (b & 0x7f) << shift;
816 if ((b & 0x80) != 0x80)
817 {
818 break;
819 }
820 shift += 7;
821 }
822
823 return result;
824 }
825
Jens Geyer5a17b132019-05-26 15:53:37 +0200826 private async ValueTask<ulong> ReadVarInt64Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100827 {
828 if (cancellationToken.IsCancellationRequested)
829 {
830 return await Task.FromCanceled<uint>(cancellationToken);
831 }
832
833 /*
834 Read an i64 from the wire as a proper varint. The MSB of each byte is set
835 if there is another byte to follow. This can Read up to 10 bytes.
836 */
837
838 var shift = 0;
839 ulong result = 0;
840 while (true)
841 {
842 var b = (byte) await ReadByteAsync(cancellationToken);
843 result |= (ulong) (b & 0x7f) << shift;
844 if ((b & 0x80) != 0x80)
845 {
846 break;
847 }
848 shift += 7;
849 }
850
851 return result;
852 }
853
854 private static int ZigzagToInt(uint n)
855 {
856 return (int) (n >> 1) ^ -(int) (n & 1);
857 }
858
859 private static long ZigzagToLong(ulong n)
860 {
861 return (long) (n >> 1) ^ -(long) (n & 1);
862 }
863
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100864 private static TType GetTType(byte type)
865 {
866 // Given a TCompactProtocol.Types constant, convert it to its corresponding TType value.
Jens Geyer5a17b132019-05-26 15:53:37 +0200867 return CompactTypeToTType[type & 0x0f];
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100868 }
869
870 private static ulong LongToZigzag(long n)
871 {
872 // Convert l into a zigzag long. This allows negative numbers to be represented compactly as a varint
873 return (ulong) (n << 1) ^ (ulong) (n >> 63);
874 }
875
876 private static uint IntToZigzag(int n)
877 {
878 // Convert n into a zigzag int. This allows negative numbers to be represented compactly as a varint
879 return (uint) (n << 1) ^ (uint) (n >> 31);
880 }
881
Jens Geyer50806452019-11-23 01:55:58 +0100882 // Return the minimum number of bytes a type will consume on the wire
883 public override int GetMinSerializedSize(TType type)
884 {
885 switch (type)
886 {
887 case TType.Stop: return 0;
888 case TType.Void: return 0;
889 case TType.Bool: return sizeof(byte);
890 case TType.Double: return 8; // uses fixedLongToBytes() which always writes 8 bytes
891 case TType.Byte: return sizeof(byte);
892 case TType.I16: return sizeof(byte); // zigzag
893 case TType.I32: return sizeof(byte); // zigzag
894 case TType.I64: return sizeof(byte); // zigzag
895 case TType.String: return sizeof(byte); // string length
896 case TType.Struct: return 0; // empty struct
897 case TType.Map: return sizeof(byte); // element count
898 case TType.Set: return sizeof(byte); // element count
899 case TType.List: return sizeof(byte); // element count
900 default: throw new TTransportException(TTransportException.ExceptionType.Unknown, "unrecognized type code");
901 }
902 }
903
Jens Geyer421444f2019-03-20 22:13:25 +0100904 public class Factory : TProtocolFactory
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100905 {
Jens Geyer421444f2019-03-20 22:13:25 +0100906 public override TProtocol GetProtocol(TTransport trans)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100907 {
908 return new TCompactProtocol(trans);
909 }
910 }
911
912 /// <summary>
913 /// All of the on-wire exType codes.
914 /// </summary>
915 private static class Types
916 {
917 public const byte Stop = 0x00;
918 public const byte BooleanTrue = 0x01;
919 public const byte BooleanFalse = 0x02;
920 public const byte Byte = 0x03;
921 public const byte I16 = 0x04;
922 public const byte I32 = 0x05;
923 public const byte I64 = 0x06;
924 public const byte Double = 0x07;
925 public const byte Binary = 0x08;
926 public const byte List = 0x09;
927 public const byte Set = 0x0A;
928 public const byte Map = 0x0B;
929 public const byte Struct = 0x0C;
930 }
931 }
Jens Geyer421444f2019-03-20 22:13:25 +0100932}