blob: 9ff640a2576d80066b8bac5b47e01e65821d5555 [file] [log] [blame]
Jens Geyeraa0c8b32019-01-28 23:27:45 +01001// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Collections.Generic;
20using System.Text;
21using System.Threading;
22using System.Threading.Tasks;
23using Thrift.Protocol.Entities;
24using Thrift.Transport;
25
26namespace Thrift.Protocol
27{
28 //TODO: implementation of TProtocol
29
30 // ReSharper disable once InconsistentNaming
31 public class TCompactProtocol : TProtocol
32 {
33 private const byte ProtocolId = 0x82;
34 private const byte Version = 1;
35 private const byte VersionMask = 0x1f; // 0001 1111
36 private const byte TypeMask = 0xE0; // 1110 0000
37 private const byte TypeBits = 0x07; // 0000 0111
38 private const int TypeShiftAmount = 5;
39 private static readonly TStruct AnonymousStruct = new TStruct(string.Empty);
40 private static readonly TField Tstop = new TField(string.Empty, TType.Stop, 0);
41
42 // ReSharper disable once InconsistentNaming
43 private static readonly byte[] TTypeToCompactType = new byte[16];
44
45 /// <summary>
46 /// Used to keep track of the last field for the current and previous structs, so we can do the delta stuff.
47 /// </summary>
48 private readonly Stack<short> _lastField = new Stack<short>(15);
49
50 /// <summary>
51 /// If we encounter a boolean field begin, save the TField here so it can have the value incorporated.
52 /// </summary>
53 private TField? _booleanField;
54
55 /// <summary>
56 /// If we Read a field header, and it's a boolean field, save the boolean value here so that ReadBool can use it.
57 /// </summary>
58 private bool? _boolValue;
59
60 private short _lastFieldId;
61
62 public TCompactProtocol(TTransport trans)
63 : base(trans)
64 {
65 TTypeToCompactType[(int) TType.Stop] = Types.Stop;
66 TTypeToCompactType[(int) TType.Bool] = Types.BooleanTrue;
67 TTypeToCompactType[(int) TType.Byte] = Types.Byte;
68 TTypeToCompactType[(int) TType.I16] = Types.I16;
69 TTypeToCompactType[(int) TType.I32] = Types.I32;
70 TTypeToCompactType[(int) TType.I64] = Types.I64;
71 TTypeToCompactType[(int) TType.Double] = Types.Double;
72 TTypeToCompactType[(int) TType.String] = Types.Binary;
73 TTypeToCompactType[(int) TType.List] = Types.List;
74 TTypeToCompactType[(int) TType.Set] = Types.Set;
75 TTypeToCompactType[(int) TType.Map] = Types.Map;
76 TTypeToCompactType[(int) TType.Struct] = Types.Struct;
77 }
78
79 public void Reset()
80 {
81 _lastField.Clear();
82 _lastFieldId = 0;
83 }
84
85 public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
86 {
87 if (cancellationToken.IsCancellationRequested)
88 {
89 return;
90 }
91
92 await Trans.WriteAsync(new[] {ProtocolId}, cancellationToken);
93 await
94 Trans.WriteAsync(
95 new[] {(byte) ((Version & VersionMask) | (((uint) message.Type << TypeShiftAmount) & TypeMask))},
96 cancellationToken);
97
98 var bufferTuple = CreateWriteVarInt32((uint) message.SeqID);
99 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
100
101 await WriteStringAsync(message.Name, cancellationToken);
102 }
103
104 public override async Task WriteMessageEndAsync(CancellationToken cancellationToken)
105 {
106 if (cancellationToken.IsCancellationRequested)
107 {
108 await Task.FromCanceled(cancellationToken);
109 }
110 }
111
112 /// <summary>
113 /// Write a struct begin. This doesn't actually put anything on the wire. We
114 /// use it as an opportunity to put special placeholder markers on the field
115 /// stack so we can get the field id deltas correct.
116 /// </summary>
117 public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken)
118 {
119 if (cancellationToken.IsCancellationRequested)
120 {
121 await Task.FromCanceled(cancellationToken);
122 }
123
124 _lastField.Push(_lastFieldId);
125 _lastFieldId = 0;
126 }
127
128 public override async Task WriteStructEndAsync(CancellationToken cancellationToken)
129 {
130 if (cancellationToken.IsCancellationRequested)
131 {
132 await Task.FromCanceled(cancellationToken);
133 }
134
135 _lastFieldId = _lastField.Pop();
136 }
137
138 private async Task WriteFieldBeginInternalAsync(TField field, byte typeOverride,
139 CancellationToken cancellationToken)
140 {
141 // if there's a exType override, use that.
142 var typeToWrite = typeOverride == 0xFF ? GetCompactType(field.Type) : typeOverride;
143
144 // check if we can use delta encoding for the field id
145 if ((field.ID > _lastFieldId) && (field.ID - _lastFieldId <= 15))
146 {
147 var b = (byte) (((field.ID - _lastFieldId) << 4) | typeToWrite);
148 // Write them together
149 await Trans.WriteAsync(new[] {b}, cancellationToken);
150 }
151 else
152 {
153 // Write them separate
154 await Trans.WriteAsync(new[] {typeToWrite}, cancellationToken);
155 await WriteI16Async(field.ID, cancellationToken);
156 }
157
158 _lastFieldId = field.ID;
159 }
160
161 public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)
162 {
163 if (field.Type == TType.Bool)
164 {
165 _booleanField = field;
166 }
167 else
168 {
169 await WriteFieldBeginInternalAsync(field, 0xFF, cancellationToken);
170 }
171 }
172
173 public override async Task WriteFieldEndAsync(CancellationToken cancellationToken)
174 {
175 if (cancellationToken.IsCancellationRequested)
176 {
177 await Task.FromCanceled(cancellationToken);
178 }
179 }
180
181 public override async Task WriteFieldStopAsync(CancellationToken cancellationToken)
182 {
183 if (cancellationToken.IsCancellationRequested)
184 {
185 return;
186 }
187
188 await Trans.WriteAsync(new[] {Types.Stop}, cancellationToken);
189 }
190
191 protected async Task WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken)
192 {
193 if (cancellationToken.IsCancellationRequested)
194 {
195 return;
196 }
197
198 /*
199 Abstract method for writing the start of lists and sets. List and sets on
200 the wire differ only by the exType indicator.
201 */
202
203 if (size <= 14)
204 {
205 await Trans.WriteAsync(new[] {(byte) ((size << 4) | GetCompactType(elemType))}, cancellationToken);
206 }
207 else
208 {
209 await Trans.WriteAsync(new[] {(byte) (0xf0 | GetCompactType(elemType))}, cancellationToken);
210
211 var bufferTuple = CreateWriteVarInt32((uint) size);
212 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
213 }
214 }
215
216 public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken)
217 {
218 await WriteCollectionBeginAsync(list.ElementType, list.Count, cancellationToken);
219 }
220
221 public override async Task WriteListEndAsync(CancellationToken cancellationToken)
222 {
223 if (cancellationToken.IsCancellationRequested)
224 {
225 await Task.FromCanceled(cancellationToken);
226 }
227 }
228
229 public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)
230 {
231 if (cancellationToken.IsCancellationRequested)
232 {
233 return;
234 }
235
236 await WriteCollectionBeginAsync(set.ElementType, set.Count, cancellationToken);
237 }
238
239 public override async Task WriteSetEndAsync(CancellationToken cancellationToken)
240 {
241 if (cancellationToken.IsCancellationRequested)
242 {
243 await Task.FromCanceled(cancellationToken);
244 }
245 }
246
247 public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken)
248 {
249 if (cancellationToken.IsCancellationRequested)
250 {
251 return;
252 }
253
254 /*
255 Write a boolean value. Potentially, this could be a boolean field, in
256 which case the field header info isn't written yet. If so, decide what the
257 right exType header is for the value and then Write the field header.
258 Otherwise, Write a single byte.
259 */
260
261 if (_booleanField != null)
262 {
263 // we haven't written the field header yet
264 await
265 WriteFieldBeginInternalAsync(_booleanField.Value, b ? Types.BooleanTrue : Types.BooleanFalse,
266 cancellationToken);
267 _booleanField = null;
268 }
269 else
270 {
271 // we're not part of a field, so just Write the value.
272 await Trans.WriteAsync(new[] {b ? Types.BooleanTrue : Types.BooleanFalse}, cancellationToken);
273 }
274 }
275
276 public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
277 {
278 if (cancellationToken.IsCancellationRequested)
279 {
280 return;
281 }
282
283 await Trans.WriteAsync(new[] {(byte) b}, cancellationToken);
284 }
285
286 public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
287 {
288 if (cancellationToken.IsCancellationRequested)
289 {
290 return;
291 }
292
293 var bufferTuple = CreateWriteVarInt32(IntToZigzag(i16));
294 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
295 }
296
297 protected internal Tuple<byte[], int> CreateWriteVarInt32(uint n)
298 {
299 // Write an i32 as a varint.Results in 1 - 5 bytes on the wire.
300 var i32Buf = new byte[5];
301 var idx = 0;
302
303 while (true)
304 {
305 if ((n & ~0x7F) == 0)
306 {
307 i32Buf[idx++] = (byte) n;
308 break;
309 }
310
311 i32Buf[idx++] = (byte) ((n & 0x7F) | 0x80);
312 n >>= 7;
313 }
314
315 return new Tuple<byte[], int>(i32Buf, idx);
316 }
317
318 public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
319 {
320 if (cancellationToken.IsCancellationRequested)
321 {
322 return;
323 }
324
325 var bufferTuple = CreateWriteVarInt32(IntToZigzag(i32));
326 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
327 }
328
329 protected internal Tuple<byte[], int> CreateWriteVarInt64(ulong n)
330 {
331 // Write an i64 as a varint. Results in 1-10 bytes on the wire.
332 var buf = new byte[10];
333 var idx = 0;
334
335 while (true)
336 {
337 if ((n & ~(ulong) 0x7FL) == 0)
338 {
339 buf[idx++] = (byte) n;
340 break;
341 }
342 buf[idx++] = (byte) ((n & 0x7F) | 0x80);
343 n >>= 7;
344 }
345
346 return new Tuple<byte[], int>(buf, idx);
347 }
348
349 public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
350 {
351 if (cancellationToken.IsCancellationRequested)
352 {
353 return;
354 }
355
356 var bufferTuple = CreateWriteVarInt64(LongToZigzag(i64));
357 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
358 }
359
360 public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
361 {
362 if (cancellationToken.IsCancellationRequested)
363 {
364 return;
365 }
366
367 var data = new byte[8];
368 FixedLongToBytes(BitConverter.DoubleToInt64Bits(d), data, 0);
369 await Trans.WriteAsync(data, cancellationToken);
370 }
371
372 public override async Task WriteStringAsync(string str, CancellationToken cancellationToken)
373 {
374 if (cancellationToken.IsCancellationRequested)
375 {
376 return;
377 }
378
379 var bytes = Encoding.UTF8.GetBytes(str);
380
381 var bufferTuple = CreateWriteVarInt32((uint) bytes.Length);
382 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
383 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
384 }
385
386 public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
387 {
388 if (cancellationToken.IsCancellationRequested)
389 {
390 return;
391 }
392
393 var bufferTuple = CreateWriteVarInt32((uint) bytes.Length);
394 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
395 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
396 }
397
398 public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)
399 {
400 if (cancellationToken.IsCancellationRequested)
401 {
402 return;
403 }
404
405 if (map.Count == 0)
406 {
407 await Trans.WriteAsync(new[] {(byte) 0}, cancellationToken);
408 }
409 else
410 {
411 var bufferTuple = CreateWriteVarInt32((uint) map.Count);
412 await Trans.WriteAsync(bufferTuple.Item1, 0, bufferTuple.Item2, cancellationToken);
413 await
414 Trans.WriteAsync(
415 new[] {(byte) ((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType))},
416 cancellationToken);
417 }
418 }
419
420 public override async Task WriteMapEndAsync(CancellationToken cancellationToken)
421 {
422 if (cancellationToken.IsCancellationRequested)
423 {
424 await Task.FromCanceled(cancellationToken);
425 }
426 }
427
428 public override async Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
429 {
430 if (cancellationToken.IsCancellationRequested)
431 {
432 return await Task.FromCanceled<TMessage>(cancellationToken);
433 }
434
435 var protocolId = (byte) await ReadByteAsync(cancellationToken);
436 if (protocolId != ProtocolId)
437 {
438 throw new TProtocolException($"Expected protocol id {ProtocolId:X} but got {protocolId:X}");
439 }
440
441 var versionAndType = (byte) await ReadByteAsync(cancellationToken);
442 var version = (byte) (versionAndType & VersionMask);
443
444 if (version != Version)
445 {
446 throw new TProtocolException($"Expected version {Version} but got {version}");
447 }
448
449 var type = (byte) ((versionAndType >> TypeShiftAmount) & TypeBits);
450 var seqid = (int) await ReadVarInt32Async(cancellationToken);
451 var messageName = await ReadStringAsync(cancellationToken);
452
453 return new TMessage(messageName, (TMessageType) type, seqid);
454 }
455
456 public override async Task ReadMessageEndAsync(CancellationToken cancellationToken)
457 {
458 if (cancellationToken.IsCancellationRequested)
459 {
460 await Task.FromCanceled(cancellationToken);
461 }
462 }
463
464 public override async Task<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
465 {
466 if (cancellationToken.IsCancellationRequested)
467 {
468 return await Task.FromCanceled<TStruct>(cancellationToken);
469 }
470
471 // some magic is here )
472
473 _lastField.Push(_lastFieldId);
474 _lastFieldId = 0;
475
476 return AnonymousStruct;
477 }
478
479 public override async Task ReadStructEndAsync(CancellationToken cancellationToken)
480 {
481 if (cancellationToken.IsCancellationRequested)
482 {
483 await Task.FromCanceled(cancellationToken);
484 }
485
486 /*
487 Doesn't actually consume any wire data, just removes the last field for
488 this struct from the field stack.
489 */
490
491 // consume the last field we Read off the wire.
492 _lastFieldId = _lastField.Pop();
493 }
494
495 public override async Task<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
496 {
497 // Read a field header off the wire.
498 var type = (byte) await ReadByteAsync(cancellationToken);
499 // if it's a stop, then we can return immediately, as the struct is over.
500 if (type == Types.Stop)
501 {
502 return Tstop;
503 }
504
505 short fieldId;
506 // mask off the 4 MSB of the exType header. it could contain a field id delta.
507 var modifier = (short) ((type & 0xf0) >> 4);
508 if (modifier == 0)
509 {
510 fieldId = await ReadI16Async(cancellationToken);
511 }
512 else
513 {
514 fieldId = (short) (_lastFieldId + modifier);
515 }
516
517 var field = new TField(string.Empty, GetTType((byte) (type & 0x0f)), fieldId);
518 // if this happens to be a boolean field, the value is encoded in the exType
519 if (IsBoolType(type))
520 {
521 _boolValue = (byte) (type & 0x0f) == Types.BooleanTrue;
522 }
523
524 // push the new field onto the field stack so we can keep the deltas going.
525 _lastFieldId = field.ID;
526 return field;
527 }
528
529 public override async Task ReadFieldEndAsync(CancellationToken cancellationToken)
530 {
531 if (cancellationToken.IsCancellationRequested)
532 {
533 await Task.FromCanceled(cancellationToken);
534 }
535 }
536
537 public override async Task<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
538 {
539 if (cancellationToken.IsCancellationRequested)
540 {
541 await Task.FromCanceled<TMap>(cancellationToken);
542 }
543
544 /*
545 Read a map header off the wire. If the size is zero, skip Reading the key
546 and value exType. This means that 0-length maps will yield TMaps without the
547 "correct" types.
548 */
549
550 var size = (int) await ReadVarInt32Async(cancellationToken);
551 var keyAndValueType = size == 0 ? (byte) 0 : (byte) await ReadByteAsync(cancellationToken);
552 return new TMap(GetTType((byte) (keyAndValueType >> 4)), GetTType((byte) (keyAndValueType & 0xf)), size);
553 }
554
555 public override async Task ReadMapEndAsync(CancellationToken cancellationToken)
556 {
557 if (cancellationToken.IsCancellationRequested)
558 {
559 await Task.FromCanceled(cancellationToken);
560 }
561 }
562
563 public override async Task<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
564 {
565 /*
566 Read a set header off the wire. If the set size is 0-14, the size will
567 be packed into the element exType header. If it's a longer set, the 4 MSB
568 of the element exType header will be 0xF, and a varint will follow with the
569 true size.
570 */
571
572 return new TSet(await ReadListBeginAsync(cancellationToken));
573 }
574
575 public override async Task<bool> ReadBoolAsync(CancellationToken cancellationToken)
576 {
577 if (cancellationToken.IsCancellationRequested)
578 {
579 return await Task.FromCanceled<bool>(cancellationToken);
580 }
581
582 /*
583 Read a boolean off the wire. If this is a boolean field, the value should
584 already have been Read during ReadFieldBegin, so we'll just consume the
585 pre-stored value. Otherwise, Read a byte.
586 */
587
588 if (_boolValue != null)
589 {
590 var result = _boolValue.Value;
591 _boolValue = null;
592 return result;
593 }
594
595 return await ReadByteAsync(cancellationToken) == Types.BooleanTrue;
596 }
597
598 public override async Task<sbyte> ReadByteAsync(CancellationToken cancellationToken)
599 {
600 if (cancellationToken.IsCancellationRequested)
601 {
602 return await Task.FromCanceled<sbyte>(cancellationToken);
603 }
604
605 // Read a single byte off the wire. Nothing interesting here.
606 var buf = new byte[1];
607 await Trans.ReadAllAsync(buf, 0, 1, cancellationToken);
608 return (sbyte) buf[0];
609 }
610
611 public override async Task<short> ReadI16Async(CancellationToken cancellationToken)
612 {
613 if (cancellationToken.IsCancellationRequested)
614 {
615 return await Task.FromCanceled<short>(cancellationToken);
616 }
617
618 return (short) ZigzagToInt(await ReadVarInt32Async(cancellationToken));
619 }
620
621 public override async Task<int> ReadI32Async(CancellationToken cancellationToken)
622 {
623 if (cancellationToken.IsCancellationRequested)
624 {
625 return await Task.FromCanceled<int>(cancellationToken);
626 }
627
628 return ZigzagToInt(await ReadVarInt32Async(cancellationToken));
629 }
630
631 public override async Task<long> ReadI64Async(CancellationToken cancellationToken)
632 {
633 if (cancellationToken.IsCancellationRequested)
634 {
635 return await Task.FromCanceled<long>(cancellationToken);
636 }
637
638 return ZigzagToLong(await ReadVarInt64Async(cancellationToken));
639 }
640
641 public override async Task<double> ReadDoubleAsync(CancellationToken cancellationToken)
642 {
643 if (cancellationToken.IsCancellationRequested)
644 {
645 return await Task.FromCanceled<double>(cancellationToken);
646 }
647
648 var longBits = new byte[8];
649 await Trans.ReadAllAsync(longBits, 0, 8, cancellationToken);
650
651 return BitConverter.Int64BitsToDouble(BytesToLong(longBits));
652 }
653
654 public override async Task<string> ReadStringAsync(CancellationToken cancellationToken)
655 {
656 if (cancellationToken.IsCancellationRequested)
657 {
658 await Task.FromCanceled<string>(cancellationToken);
659 }
660
661 // Reads a byte[] (via ReadBinary), and then UTF-8 decodes it.
662 var length = (int) await ReadVarInt32Async(cancellationToken);
663
664 if (length == 0)
665 {
666 return string.Empty;
667 }
668
669 var buf = new byte[length];
670 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
671
672 return Encoding.UTF8.GetString(buf);
673 }
674
675 public override async Task<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
676 {
677 if (cancellationToken.IsCancellationRequested)
678 {
679 return await Task.FromCanceled<byte[]>(cancellationToken);
680 }
681
682 // Read a byte[] from the wire.
683 var length = (int) await ReadVarInt32Async(cancellationToken);
684 if (length == 0)
685 {
686 return new byte[0];
687 }
688
689 var buf = new byte[length];
690 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
691 return buf;
692 }
693
694 public override async Task<TList> ReadListBeginAsync(CancellationToken cancellationToken)
695 {
696 if (cancellationToken.IsCancellationRequested)
697 {
698 await Task.FromCanceled<TList>(cancellationToken);
699 }
700
701 /*
702 Read a list header off the wire. If the list size is 0-14, the size will
703 be packed into the element exType header. If it's a longer list, the 4 MSB
704 of the element exType header will be 0xF, and a varint will follow with the
705 true size.
706 */
707
708 var sizeAndType = (byte) await ReadByteAsync(cancellationToken);
709 var size = (sizeAndType >> 4) & 0x0f;
710 if (size == 15)
711 {
712 size = (int) await ReadVarInt32Async(cancellationToken);
713 }
714
715 var type = GetTType(sizeAndType);
716 return new TList(type, size);
717 }
718
719 public override async Task ReadListEndAsync(CancellationToken cancellationToken)
720 {
721 if (cancellationToken.IsCancellationRequested)
722 {
723 await Task.FromCanceled(cancellationToken);
724 }
725 }
726
727 public override async Task ReadSetEndAsync(CancellationToken cancellationToken)
728 {
729 if (cancellationToken.IsCancellationRequested)
730 {
731 await Task.FromCanceled(cancellationToken);
732 }
733 }
734
735 private static byte GetCompactType(TType ttype)
736 {
737 // Given a TType value, find the appropriate TCompactProtocol.Types constant.
738 return TTypeToCompactType[(int) ttype];
739 }
740
741
742 private async Task<uint> ReadVarInt32Async(CancellationToken cancellationToken)
743 {
744 if (cancellationToken.IsCancellationRequested)
745 {
746 return await Task.FromCanceled<uint>(cancellationToken);
747 }
748
749 /*
750 Read an i32 from the wire as a varint. The MSB of each byte is set
751 if there is another byte to follow. This can Read up to 5 bytes.
752 */
753
754 uint result = 0;
755 var shift = 0;
756
757 while (true)
758 {
759 var b = (byte) await ReadByteAsync(cancellationToken);
760 result |= (uint) (b & 0x7f) << shift;
761 if ((b & 0x80) != 0x80)
762 {
763 break;
764 }
765 shift += 7;
766 }
767
768 return result;
769 }
770
771 private async Task<ulong> ReadVarInt64Async(CancellationToken cancellationToken)
772 {
773 if (cancellationToken.IsCancellationRequested)
774 {
775 return await Task.FromCanceled<uint>(cancellationToken);
776 }
777
778 /*
779 Read an i64 from the wire as a proper varint. The MSB of each byte is set
780 if there is another byte to follow. This can Read up to 10 bytes.
781 */
782
783 var shift = 0;
784 ulong result = 0;
785 while (true)
786 {
787 var b = (byte) await ReadByteAsync(cancellationToken);
788 result |= (ulong) (b & 0x7f) << shift;
789 if ((b & 0x80) != 0x80)
790 {
791 break;
792 }
793 shift += 7;
794 }
795
796 return result;
797 }
798
799 private static int ZigzagToInt(uint n)
800 {
801 return (int) (n >> 1) ^ -(int) (n & 1);
802 }
803
804 private static long ZigzagToLong(ulong n)
805 {
806 return (long) (n >> 1) ^ -(long) (n & 1);
807 }
808
809 private static long BytesToLong(byte[] bytes)
810 {
811 /*
812 Note that it's important that the mask bytes are long literals,
813 otherwise they'll default to ints, and when you shift an int left 56 bits,
814 you just get a messed up int.
815 */
816
817 return
818 ((bytes[7] & 0xffL) << 56) |
819 ((bytes[6] & 0xffL) << 48) |
820 ((bytes[5] & 0xffL) << 40) |
821 ((bytes[4] & 0xffL) << 32) |
822 ((bytes[3] & 0xffL) << 24) |
823 ((bytes[2] & 0xffL) << 16) |
824 ((bytes[1] & 0xffL) << 8) |
825 (bytes[0] & 0xffL);
826 }
827
828 private static bool IsBoolType(byte b)
829 {
830 var lowerNibble = b & 0x0f;
831 return (lowerNibble == Types.BooleanTrue) || (lowerNibble == Types.BooleanFalse);
832 }
833
834 private static TType GetTType(byte type)
835 {
836 // Given a TCompactProtocol.Types constant, convert it to its corresponding TType value.
837 switch ((byte) (type & 0x0f))
838 {
839 case Types.Stop:
840 return TType.Stop;
841 case Types.BooleanFalse:
842 case Types.BooleanTrue:
843 return TType.Bool;
844 case Types.Byte:
845 return TType.Byte;
846 case Types.I16:
847 return TType.I16;
848 case Types.I32:
849 return TType.I32;
850 case Types.I64:
851 return TType.I64;
852 case Types.Double:
853 return TType.Double;
854 case Types.Binary:
855 return TType.String;
856 case Types.List:
857 return TType.List;
858 case Types.Set:
859 return TType.Set;
860 case Types.Map:
861 return TType.Map;
862 case Types.Struct:
863 return TType.Struct;
864 default:
865 throw new TProtocolException($"Don't know what exType: {(byte) (type & 0x0f)}");
866 }
867 }
868
869 private static ulong LongToZigzag(long n)
870 {
871 // Convert l into a zigzag long. This allows negative numbers to be represented compactly as a varint
872 return (ulong) (n << 1) ^ (ulong) (n >> 63);
873 }
874
875 private static uint IntToZigzag(int n)
876 {
877 // Convert n into a zigzag int. This allows negative numbers to be represented compactly as a varint
878 return (uint) (n << 1) ^ (uint) (n >> 31);
879 }
880
881 private static void FixedLongToBytes(long n, byte[] buf, int off)
882 {
883 // Convert a long into little-endian bytes in buf starting at off and going until off+7.
884 buf[off + 0] = (byte) (n & 0xff);
885 buf[off + 1] = (byte) ((n >> 8) & 0xff);
886 buf[off + 2] = (byte) ((n >> 16) & 0xff);
887 buf[off + 3] = (byte) ((n >> 24) & 0xff);
888 buf[off + 4] = (byte) ((n >> 32) & 0xff);
889 buf[off + 5] = (byte) ((n >> 40) & 0xff);
890 buf[off + 6] = (byte) ((n >> 48) & 0xff);
891 buf[off + 7] = (byte) ((n >> 56) & 0xff);
892 }
893
894 public class Factory : ITProtocolFactory
895 {
896 public TProtocol GetProtocol(TTransport trans)
897 {
898 return new TCompactProtocol(trans);
899 }
900 }
901
902 /// <summary>
903 /// All of the on-wire exType codes.
904 /// </summary>
905 private static class Types
906 {
907 public const byte Stop = 0x00;
908 public const byte BooleanTrue = 0x01;
909 public const byte BooleanFalse = 0x02;
910 public const byte Byte = 0x03;
911 public const byte I16 = 0x04;
912 public const byte I32 = 0x05;
913 public const byte I64 = 0x06;
914 public const byte Double = 0x07;
915 public const byte Binary = 0x08;
916 public const byte List = 0x09;
917 public const byte Set = 0x0A;
918 public const byte Map = 0x0B;
919 public const byte Struct = 0x0C;
920 }
921 }
922}