blob: 921507c48066c62e49be06e0159c5e0c3930a5e7 [file] [log] [blame]
Jens Geyer421444f2019-03-20 22:13:25 +01001// Licensed to the Apache Software Foundation(ASF) under one
Jens Geyeraa0c8b32019-01-28 23:27:45 +01002// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
zembord9d958a32019-11-21 13:11:44 +030019using System.Buffers.Binary;
Jens Geyeraa0c8b32019-01-28 23:27:45 +010020using System.Collections.Generic;
Jens Geyer5a17b132019-05-26 15:53:37 +020021using System.Diagnostics;
Jens Geyeraa0c8b32019-01-28 23:27:45 +010022using System.Text;
23using System.Threading;
24using System.Threading.Tasks;
25using Thrift.Protocol.Entities;
26using Thrift.Transport;
27
28namespace Thrift.Protocol
29{
30 //TODO: implementation of TProtocol
31
32 // ReSharper disable once InconsistentNaming
33 public class TCompactProtocol : TProtocol
34 {
35 private const byte ProtocolId = 0x82;
36 private const byte Version = 1;
37 private const byte VersionMask = 0x1f; // 0001 1111
38 private const byte TypeMask = 0xE0; // 1110 0000
39 private const byte TypeBits = 0x07; // 0000 0111
40 private const int TypeShiftAmount = 5;
41 private static readonly TStruct AnonymousStruct = new TStruct(string.Empty);
Jens Geyer5a17b132019-05-26 15:53:37 +020042 private static readonly TField StopField = new TField(string.Empty, TType.Stop, 0);
43
44 private const byte NoTypeOverride = 0xFF;
Jens Geyeraa0c8b32019-01-28 23:27:45 +010045
46 // ReSharper disable once InconsistentNaming
47 private static readonly byte[] TTypeToCompactType = new byte[16];
Jens Geyer5a17b132019-05-26 15:53:37 +020048 private static readonly TType[] CompactTypeToTType = new TType[13];
Jens Geyeraa0c8b32019-01-28 23:27:45 +010049
50 /// <summary>
51 /// Used to keep track of the last field for the current and previous structs, so we can do the delta stuff.
52 /// </summary>
53 private readonly Stack<short> _lastField = new Stack<short>(15);
54
55 /// <summary>
56 /// If we encounter a boolean field begin, save the TField here so it can have the value incorporated.
57 /// </summary>
58 private TField? _booleanField;
59
60 /// <summary>
61 /// If we Read a field header, and it's a boolean field, save the boolean value here so that ReadBool can use it.
62 /// </summary>
63 private bool? _boolValue;
64
65 private short _lastFieldId;
66
Jens Geyer5a17b132019-05-26 15:53:37 +020067 // minimize memory allocations by means of an preallocated bytes buffer
68 // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long)
69 private byte[] PreAllocatedBuffer = new byte[128];
70
71 private struct VarInt
72 {
73 public byte[] bytes;
74 public int count;
75 }
76
77 // minimize memory allocations by means of an preallocated VarInt buffer
78 private VarInt PreAllocatedVarInt = new VarInt()
79 {
80 bytes = new byte[10], // see Int64ToVarInt()
81 count = 0
82 };
83
84
85
86
Jens Geyeraa0c8b32019-01-28 23:27:45 +010087 public TCompactProtocol(TTransport trans)
88 : base(trans)
89 {
90 TTypeToCompactType[(int) TType.Stop] = Types.Stop;
91 TTypeToCompactType[(int) TType.Bool] = Types.BooleanTrue;
92 TTypeToCompactType[(int) TType.Byte] = Types.Byte;
93 TTypeToCompactType[(int) TType.I16] = Types.I16;
94 TTypeToCompactType[(int) TType.I32] = Types.I32;
95 TTypeToCompactType[(int) TType.I64] = Types.I64;
96 TTypeToCompactType[(int) TType.Double] = Types.Double;
97 TTypeToCompactType[(int) TType.String] = Types.Binary;
98 TTypeToCompactType[(int) TType.List] = Types.List;
99 TTypeToCompactType[(int) TType.Set] = Types.Set;
100 TTypeToCompactType[(int) TType.Map] = Types.Map;
101 TTypeToCompactType[(int) TType.Struct] = Types.Struct;
Jens Geyer5a17b132019-05-26 15:53:37 +0200102
103 CompactTypeToTType[Types.Stop] = TType.Stop;
104 CompactTypeToTType[Types.BooleanTrue] = TType.Bool;
105 CompactTypeToTType[Types.BooleanFalse] = TType.Bool;
106 CompactTypeToTType[Types.Byte] = TType.Byte;
107 CompactTypeToTType[Types.I16] = TType.I16;
108 CompactTypeToTType[Types.I32] = TType.I32;
109 CompactTypeToTType[Types.I64] = TType.I64;
110 CompactTypeToTType[Types.Double] = TType.Double;
111 CompactTypeToTType[Types.Binary] = TType.String;
112 CompactTypeToTType[Types.List] = TType.List;
113 CompactTypeToTType[Types.Set] = TType.Set;
114 CompactTypeToTType[Types.Map] = TType.Map;
115 CompactTypeToTType[Types.Struct] = TType.Struct;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100116 }
117
118 public void Reset()
119 {
120 _lastField.Clear();
121 _lastFieldId = 0;
122 }
123
124 public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
125 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200126 PreAllocatedBuffer[0] = ProtocolId;
127 PreAllocatedBuffer[1] = (byte)((Version & VersionMask) | (((uint)message.Type << TypeShiftAmount) & TypeMask));
128 await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100129
Jens Geyer5a17b132019-05-26 15:53:37 +0200130 Int32ToVarInt((uint) message.SeqID, ref PreAllocatedVarInt);
131 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100132
133 await WriteStringAsync(message.Name, cancellationToken);
134 }
135
136 public override async Task WriteMessageEndAsync(CancellationToken cancellationToken)
137 {
138 if (cancellationToken.IsCancellationRequested)
139 {
140 await Task.FromCanceled(cancellationToken);
141 }
142 }
143
144 /// <summary>
145 /// Write a struct begin. This doesn't actually put anything on the wire. We
146 /// use it as an opportunity to put special placeholder markers on the field
147 /// stack so we can get the field id deltas correct.
148 /// </summary>
149 public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken)
150 {
151 if (cancellationToken.IsCancellationRequested)
152 {
153 await Task.FromCanceled(cancellationToken);
154 }
155
156 _lastField.Push(_lastFieldId);
157 _lastFieldId = 0;
158 }
159
160 public override async Task WriteStructEndAsync(CancellationToken cancellationToken)
161 {
162 if (cancellationToken.IsCancellationRequested)
163 {
164 await Task.FromCanceled(cancellationToken);
165 }
166
167 _lastFieldId = _lastField.Pop();
168 }
169
Jens Geyer5a17b132019-05-26 15:53:37 +0200170 private async Task WriteFieldBeginInternalAsync(TField field, byte fieldType, CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100171 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200172 // if there's a exType override passed in, use that. Otherwise ask GetCompactType().
173 if (fieldType == NoTypeOverride)
174 fieldType = GetCompactType(field.Type);
175
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100176
177 // check if we can use delta encoding for the field id
Jens Geyer5a17b132019-05-26 15:53:37 +0200178 if (field.ID > _lastFieldId)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100179 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200180 var delta = field.ID - _lastFieldId;
181 if (delta <= 15)
182 {
183 // Write them together
184 PreAllocatedBuffer[0] = (byte)((delta << 4) | fieldType);
185 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
186 _lastFieldId = field.ID;
187 return;
188 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100189 }
190
Jens Geyer5a17b132019-05-26 15:53:37 +0200191 // Write them separate
192 PreAllocatedBuffer[0] = fieldType;
193 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
194 await WriteI16Async(field.ID, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100195 _lastFieldId = field.ID;
196 }
197
198 public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)
199 {
200 if (field.Type == TType.Bool)
201 {
202 _booleanField = field;
203 }
204 else
205 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200206 await WriteFieldBeginInternalAsync(field, NoTypeOverride, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100207 }
208 }
209
210 public override async Task WriteFieldEndAsync(CancellationToken cancellationToken)
211 {
212 if (cancellationToken.IsCancellationRequested)
213 {
214 await Task.FromCanceled(cancellationToken);
215 }
216 }
217
218 public override async Task WriteFieldStopAsync(CancellationToken cancellationToken)
219 {
220 if (cancellationToken.IsCancellationRequested)
221 {
222 return;
223 }
224
Jens Geyer5a17b132019-05-26 15:53:37 +0200225 PreAllocatedBuffer[0] = Types.Stop;
226 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100227 }
228
229 protected async Task WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken)
230 {
231 if (cancellationToken.IsCancellationRequested)
232 {
233 return;
234 }
235
236 /*
237 Abstract method for writing the start of lists and sets. List and sets on
238 the wire differ only by the exType indicator.
239 */
240
241 if (size <= 14)
242 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200243 PreAllocatedBuffer[0] = (byte)((size << 4) | GetCompactType(elemType));
244 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100245 }
246 else
247 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200248 PreAllocatedBuffer[0] = (byte)(0xf0 | GetCompactType(elemType));
249 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100250
Jens Geyer5a17b132019-05-26 15:53:37 +0200251 Int32ToVarInt((uint) size, ref PreAllocatedVarInt);
252 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100253 }
254 }
255
256 public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken)
257 {
258 await WriteCollectionBeginAsync(list.ElementType, list.Count, cancellationToken);
259 }
260
261 public override async Task WriteListEndAsync(CancellationToken cancellationToken)
262 {
263 if (cancellationToken.IsCancellationRequested)
264 {
265 await Task.FromCanceled(cancellationToken);
266 }
267 }
268
269 public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)
270 {
271 if (cancellationToken.IsCancellationRequested)
272 {
273 return;
274 }
275
276 await WriteCollectionBeginAsync(set.ElementType, set.Count, cancellationToken);
277 }
278
279 public override async Task WriteSetEndAsync(CancellationToken cancellationToken)
280 {
281 if (cancellationToken.IsCancellationRequested)
282 {
283 await Task.FromCanceled(cancellationToken);
284 }
285 }
286
287 public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken)
288 {
289 if (cancellationToken.IsCancellationRequested)
290 {
291 return;
292 }
293
294 /*
295 Write a boolean value. Potentially, this could be a boolean field, in
296 which case the field header info isn't written yet. If so, decide what the
297 right exType header is for the value and then Write the field header.
298 Otherwise, Write a single byte.
299 */
300
301 if (_booleanField != null)
302 {
303 // we haven't written the field header yet
Jens Geyer5a17b132019-05-26 15:53:37 +0200304 var type = b ? Types.BooleanTrue : Types.BooleanFalse;
305 await WriteFieldBeginInternalAsync(_booleanField.Value, type, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100306 _booleanField = null;
307 }
308 else
309 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200310 // we're not part of a field, so just write the value.
311 PreAllocatedBuffer[0] = b ? Types.BooleanTrue : Types.BooleanFalse;
312 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100313 }
314 }
315
316 public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
317 {
318 if (cancellationToken.IsCancellationRequested)
319 {
320 return;
321 }
322
Jens Geyer5a17b132019-05-26 15:53:37 +0200323 PreAllocatedBuffer[0] = (byte)b;
324 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100325 }
326
327 public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
328 {
329 if (cancellationToken.IsCancellationRequested)
330 {
331 return;
332 }
333
Jens Geyer5a17b132019-05-26 15:53:37 +0200334 Int32ToVarInt(IntToZigzag(i16), ref PreAllocatedVarInt);
335 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100336 }
337
Jens Geyer5a17b132019-05-26 15:53:37 +0200338 private static void Int32ToVarInt(uint n, ref VarInt varint)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100339 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200340 // Write an i32 as a varint. Results in 1 - 5 bytes on the wire.
341 varint.count = 0;
342 Debug.Assert(varint.bytes.Length >= 5);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100343
344 while (true)
345 {
346 if ((n & ~0x7F) == 0)
347 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200348 varint.bytes[varint.count++] = (byte)n;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100349 break;
350 }
351
Jens Geyer5a17b132019-05-26 15:53:37 +0200352 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100353 n >>= 7;
354 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100355 }
356
357 public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
358 {
359 if (cancellationToken.IsCancellationRequested)
360 {
361 return;
362 }
363
Jens Geyer5a17b132019-05-26 15:53:37 +0200364 Int32ToVarInt(IntToZigzag(i32), ref PreAllocatedVarInt);
365 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100366 }
367
Jens Geyer5a17b132019-05-26 15:53:37 +0200368 static private void Int64ToVarInt(ulong n, ref VarInt varint)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100369 {
370 // Write an i64 as a varint. Results in 1-10 bytes on the wire.
Jens Geyer5a17b132019-05-26 15:53:37 +0200371 varint.count = 0;
372 Debug.Assert(varint.bytes.Length >= 10);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100373
374 while (true)
375 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200376 if ((n & ~(ulong)0x7FL) == 0)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100377 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200378 varint.bytes[varint.count++] = (byte)n;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100379 break;
380 }
Jens Geyer5a17b132019-05-26 15:53:37 +0200381 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100382 n >>= 7;
383 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100384 }
385
386 public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
387 {
388 if (cancellationToken.IsCancellationRequested)
389 {
390 return;
391 }
392
Jens Geyer5a17b132019-05-26 15:53:37 +0200393 Int64ToVarInt(LongToZigzag(i64), ref PreAllocatedVarInt);
394 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100395 }
396
397 public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
398 {
399 if (cancellationToken.IsCancellationRequested)
400 {
401 return;
402 }
zembord9d958a32019-11-21 13:11:44 +0300403 BinaryPrimitives.WriteInt64LittleEndian(PreAllocatedBuffer, BitConverter.DoubleToInt64Bits(d));
Jens Geyer5a17b132019-05-26 15:53:37 +0200404 await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100405 }
406
407 public override async Task WriteStringAsync(string str, CancellationToken cancellationToken)
408 {
409 if (cancellationToken.IsCancellationRequested)
410 {
411 return;
412 }
413
414 var bytes = Encoding.UTF8.GetBytes(str);
415
Jens Geyer5a17b132019-05-26 15:53:37 +0200416 Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt);
417 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100418 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
419 }
420
421 public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
422 {
423 if (cancellationToken.IsCancellationRequested)
424 {
425 return;
426 }
427
Jens Geyer5a17b132019-05-26 15:53:37 +0200428 Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt);
429 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100430 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
431 }
432
433 public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)
434 {
435 if (cancellationToken.IsCancellationRequested)
436 {
437 return;
438 }
Jens Geyer5a17b132019-05-26 15:53:37 +0200439
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100440 if (map.Count == 0)
441 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200442 PreAllocatedBuffer[0] = 0;
443 await Trans.WriteAsync( PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100444 }
445 else
446 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200447 Int32ToVarInt((uint) map.Count, ref PreAllocatedVarInt);
448 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
449
450 PreAllocatedBuffer[0] = (byte)((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType));
451 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100452 }
453 }
454
455 public override async Task WriteMapEndAsync(CancellationToken cancellationToken)
456 {
457 if (cancellationToken.IsCancellationRequested)
458 {
459 await Task.FromCanceled(cancellationToken);
460 }
461 }
462
Jens Geyer5a17b132019-05-26 15:53:37 +0200463 public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100464 {
465 if (cancellationToken.IsCancellationRequested)
466 {
467 return await Task.FromCanceled<TMessage>(cancellationToken);
468 }
469
470 var protocolId = (byte) await ReadByteAsync(cancellationToken);
471 if (protocolId != ProtocolId)
472 {
473 throw new TProtocolException($"Expected protocol id {ProtocolId:X} but got {protocolId:X}");
474 }
475
476 var versionAndType = (byte) await ReadByteAsync(cancellationToken);
477 var version = (byte) (versionAndType & VersionMask);
478
479 if (version != Version)
480 {
481 throw new TProtocolException($"Expected version {Version} but got {version}");
482 }
483
484 var type = (byte) ((versionAndType >> TypeShiftAmount) & TypeBits);
485 var seqid = (int) await ReadVarInt32Async(cancellationToken);
486 var messageName = await ReadStringAsync(cancellationToken);
487
488 return new TMessage(messageName, (TMessageType) type, seqid);
489 }
490
491 public override async Task ReadMessageEndAsync(CancellationToken cancellationToken)
492 {
493 if (cancellationToken.IsCancellationRequested)
494 {
495 await Task.FromCanceled(cancellationToken);
496 }
497 }
498
Jens Geyer5a17b132019-05-26 15:53:37 +0200499 public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100500 {
501 if (cancellationToken.IsCancellationRequested)
502 {
503 return await Task.FromCanceled<TStruct>(cancellationToken);
504 }
505
506 // some magic is here )
507
508 _lastField.Push(_lastFieldId);
509 _lastFieldId = 0;
510
511 return AnonymousStruct;
512 }
513
514 public override async Task ReadStructEndAsync(CancellationToken cancellationToken)
515 {
516 if (cancellationToken.IsCancellationRequested)
517 {
518 await Task.FromCanceled(cancellationToken);
519 }
520
521 /*
522 Doesn't actually consume any wire data, just removes the last field for
523 this struct from the field stack.
524 */
525
526 // consume the last field we Read off the wire.
527 _lastFieldId = _lastField.Pop();
528 }
529
Jens Geyer5a17b132019-05-26 15:53:37 +0200530 public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100531 {
532 // Read a field header off the wire.
533 var type = (byte) await ReadByteAsync(cancellationToken);
Jens Geyer5a17b132019-05-26 15:53:37 +0200534
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100535 // if it's a stop, then we can return immediately, as the struct is over.
536 if (type == Types.Stop)
537 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200538 return StopField;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100539 }
540
Jens Geyer5a17b132019-05-26 15:53:37 +0200541
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100542 // mask off the 4 MSB of the exType header. it could contain a field id delta.
543 var modifier = (short) ((type & 0xf0) >> 4);
Jens Geyer5a17b132019-05-26 15:53:37 +0200544 var compactType = (byte)(type & 0x0f);
545
546 short fieldId;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100547 if (modifier == 0)
548 {
549 fieldId = await ReadI16Async(cancellationToken);
550 }
551 else
552 {
553 fieldId = (short) (_lastFieldId + modifier);
554 }
555
Jens Geyer5a17b132019-05-26 15:53:37 +0200556 var ttype = GetTType(compactType);
557 var field = new TField(string.Empty, ttype, fieldId);
558
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100559 // if this happens to be a boolean field, the value is encoded in the exType
Jens Geyer5a17b132019-05-26 15:53:37 +0200560 if( ttype == TType.Bool)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100561 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200562 _boolValue = (compactType == Types.BooleanTrue);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100563 }
564
565 // push the new field onto the field stack so we can keep the deltas going.
566 _lastFieldId = field.ID;
567 return field;
568 }
569
570 public override async Task ReadFieldEndAsync(CancellationToken cancellationToken)
571 {
572 if (cancellationToken.IsCancellationRequested)
573 {
574 await Task.FromCanceled(cancellationToken);
575 }
576 }
577
Jens Geyer5a17b132019-05-26 15:53:37 +0200578 public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100579 {
580 if (cancellationToken.IsCancellationRequested)
581 {
582 await Task.FromCanceled<TMap>(cancellationToken);
583 }
584
585 /*
586 Read a map header off the wire. If the size is zero, skip Reading the key
587 and value exType. This means that 0-length maps will yield TMaps without the
588 "correct" types.
589 */
590
591 var size = (int) await ReadVarInt32Async(cancellationToken);
592 var keyAndValueType = size == 0 ? (byte) 0 : (byte) await ReadByteAsync(cancellationToken);
593 return new TMap(GetTType((byte) (keyAndValueType >> 4)), GetTType((byte) (keyAndValueType & 0xf)), size);
594 }
595
596 public override async Task ReadMapEndAsync(CancellationToken cancellationToken)
597 {
598 if (cancellationToken.IsCancellationRequested)
599 {
600 await Task.FromCanceled(cancellationToken);
601 }
602 }
603
Jens Geyer5a17b132019-05-26 15:53:37 +0200604 public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100605 {
606 /*
607 Read a set header off the wire. If the set size is 0-14, the size will
608 be packed into the element exType header. If it's a longer set, the 4 MSB
609 of the element exType header will be 0xF, and a varint will follow with the
610 true size.
611 */
612
613 return new TSet(await ReadListBeginAsync(cancellationToken));
614 }
615
Jens Geyer5a17b132019-05-26 15:53:37 +0200616 public override ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100617 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100618 /*
619 Read a boolean off the wire. If this is a boolean field, the value should
620 already have been Read during ReadFieldBegin, so we'll just consume the
621 pre-stored value. Otherwise, Read a byte.
622 */
623
624 if (_boolValue != null)
625 {
626 var result = _boolValue.Value;
627 _boolValue = null;
Jens Geyer5a17b132019-05-26 15:53:37 +0200628 return new ValueTask<bool>(result);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100629 }
630
Jens Geyer5a17b132019-05-26 15:53:37 +0200631 return InternalCall();
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100632
Jens Geyer5a17b132019-05-26 15:53:37 +0200633 async ValueTask<bool> InternalCall()
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100634 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200635 var data = await ReadByteAsync(cancellationToken);
636 return (data == Types.BooleanTrue);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100637 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100638 }
639
Jens Geyer5a17b132019-05-26 15:53:37 +0200640
641 public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
642 {
643 // Read a single byte off the wire. Nothing interesting here.
644 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
645 return (sbyte)PreAllocatedBuffer[0];
646 }
647
648 public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100649 {
650 if (cancellationToken.IsCancellationRequested)
651 {
652 return await Task.FromCanceled<short>(cancellationToken);
653 }
654
655 return (short) ZigzagToInt(await ReadVarInt32Async(cancellationToken));
656 }
657
Jens Geyer5a17b132019-05-26 15:53:37 +0200658 public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100659 {
660 if (cancellationToken.IsCancellationRequested)
661 {
662 return await Task.FromCanceled<int>(cancellationToken);
663 }
664
665 return ZigzagToInt(await ReadVarInt32Async(cancellationToken));
666 }
667
Jens Geyer5a17b132019-05-26 15:53:37 +0200668 public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100669 {
670 if (cancellationToken.IsCancellationRequested)
671 {
672 return await Task.FromCanceled<long>(cancellationToken);
673 }
674
675 return ZigzagToLong(await ReadVarInt64Async(cancellationToken));
676 }
677
Jens Geyer5a17b132019-05-26 15:53:37 +0200678 public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100679 {
680 if (cancellationToken.IsCancellationRequested)
681 {
682 return await Task.FromCanceled<double>(cancellationToken);
683 }
684
Jens Geyer5a17b132019-05-26 15:53:37 +0200685 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
zembord9d958a32019-11-21 13:11:44 +0300686
687 return BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64LittleEndian(PreAllocatedBuffer));
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100688 }
689
Jens Geyer5a17b132019-05-26 15:53:37 +0200690 public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100691 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200692 // read length
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100693 var length = (int) await ReadVarInt32Async(cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100694 if (length == 0)
695 {
696 return string.Empty;
697 }
698
Jens Geyer5a17b132019-05-26 15:53:37 +0200699 // read and decode data
700 if (length < PreAllocatedBuffer.Length)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100701 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200702 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, length, cancellationToken);
703 return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, length);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100704 }
705
Jens Geyer5a17b132019-05-26 15:53:37 +0200706 var buf = new byte[length];
707 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
708 return Encoding.UTF8.GetString(buf, 0, length);
709 }
710
711 public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
712 {
713 // read length
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100714 var length = (int) await ReadVarInt32Async(cancellationToken);
715 if (length == 0)
716 {
717 return new byte[0];
718 }
719
Jens Geyer5a17b132019-05-26 15:53:37 +0200720 // read data
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100721 var buf = new byte[length];
722 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
723 return buf;
724 }
725
Jens Geyer5a17b132019-05-26 15:53:37 +0200726 public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100727 {
728 if (cancellationToken.IsCancellationRequested)
729 {
730 await Task.FromCanceled<TList>(cancellationToken);
731 }
732
733 /*
734 Read a list header off the wire. If the list size is 0-14, the size will
735 be packed into the element exType header. If it's a longer list, the 4 MSB
736 of the element exType header will be 0xF, and a varint will follow with the
737 true size.
738 */
739
740 var sizeAndType = (byte) await ReadByteAsync(cancellationToken);
741 var size = (sizeAndType >> 4) & 0x0f;
742 if (size == 15)
743 {
744 size = (int) await ReadVarInt32Async(cancellationToken);
745 }
746
747 var type = GetTType(sizeAndType);
748 return new TList(type, size);
749 }
750
751 public override async Task ReadListEndAsync(CancellationToken cancellationToken)
752 {
753 if (cancellationToken.IsCancellationRequested)
754 {
755 await Task.FromCanceled(cancellationToken);
756 }
757 }
758
759 public override async Task ReadSetEndAsync(CancellationToken cancellationToken)
760 {
761 if (cancellationToken.IsCancellationRequested)
762 {
763 await Task.FromCanceled(cancellationToken);
764 }
765 }
766
767 private static byte GetCompactType(TType ttype)
768 {
769 // Given a TType value, find the appropriate TCompactProtocol.Types constant.
770 return TTypeToCompactType[(int) ttype];
771 }
772
773
Jens Geyer5a17b132019-05-26 15:53:37 +0200774 private async ValueTask<uint> ReadVarInt32Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100775 {
776 if (cancellationToken.IsCancellationRequested)
777 {
778 return await Task.FromCanceled<uint>(cancellationToken);
779 }
780
781 /*
782 Read an i32 from the wire as a varint. The MSB of each byte is set
783 if there is another byte to follow. This can Read up to 5 bytes.
784 */
785
786 uint result = 0;
787 var shift = 0;
788
789 while (true)
790 {
791 var b = (byte) await ReadByteAsync(cancellationToken);
792 result |= (uint) (b & 0x7f) << shift;
793 if ((b & 0x80) != 0x80)
794 {
795 break;
796 }
797 shift += 7;
798 }
799
800 return result;
801 }
802
Jens Geyer5a17b132019-05-26 15:53:37 +0200803 private async ValueTask<ulong> ReadVarInt64Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100804 {
805 if (cancellationToken.IsCancellationRequested)
806 {
807 return await Task.FromCanceled<uint>(cancellationToken);
808 }
809
810 /*
811 Read an i64 from the wire as a proper varint. The MSB of each byte is set
812 if there is another byte to follow. This can Read up to 10 bytes.
813 */
814
815 var shift = 0;
816 ulong result = 0;
817 while (true)
818 {
819 var b = (byte) await ReadByteAsync(cancellationToken);
820 result |= (ulong) (b & 0x7f) << shift;
821 if ((b & 0x80) != 0x80)
822 {
823 break;
824 }
825 shift += 7;
826 }
827
828 return result;
829 }
830
831 private static int ZigzagToInt(uint n)
832 {
833 return (int) (n >> 1) ^ -(int) (n & 1);
834 }
835
836 private static long ZigzagToLong(ulong n)
837 {
838 return (long) (n >> 1) ^ -(long) (n & 1);
839 }
840
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100841 private static TType GetTType(byte type)
842 {
843 // Given a TCompactProtocol.Types constant, convert it to its corresponding TType value.
Jens Geyer5a17b132019-05-26 15:53:37 +0200844 return CompactTypeToTType[type & 0x0f];
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100845 }
846
847 private static ulong LongToZigzag(long n)
848 {
849 // Convert l into a zigzag long. This allows negative numbers to be represented compactly as a varint
850 return (ulong) (n << 1) ^ (ulong) (n >> 63);
851 }
852
853 private static uint IntToZigzag(int n)
854 {
855 // Convert n into a zigzag int. This allows negative numbers to be represented compactly as a varint
856 return (uint) (n << 1) ^ (uint) (n >> 31);
857 }
858
Jens Geyer421444f2019-03-20 22:13:25 +0100859 public class Factory : TProtocolFactory
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100860 {
Jens Geyer421444f2019-03-20 22:13:25 +0100861 public override TProtocol GetProtocol(TTransport trans)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100862 {
863 return new TCompactProtocol(trans);
864 }
865 }
866
867 /// <summary>
868 /// All of the on-wire exType codes.
869 /// </summary>
870 private static class Types
871 {
872 public const byte Stop = 0x00;
873 public const byte BooleanTrue = 0x01;
874 public const byte BooleanFalse = 0x02;
875 public const byte Byte = 0x03;
876 public const byte I16 = 0x04;
877 public const byte I32 = 0x05;
878 public const byte I64 = 0x06;
879 public const byte Double = 0x07;
880 public const byte Binary = 0x08;
881 public const byte List = 0x09;
882 public const byte Set = 0x0A;
883 public const byte Map = 0x0B;
884 public const byte Struct = 0x0C;
885 }
886 }
Jens Geyer421444f2019-03-20 22:13:25 +0100887}