blob: c26633a14c018f1bc11a56375e9c41d9ef4f7577 [file] [log] [blame]
Jens Geyer421444f2019-03-20 22:13:25 +01001// Licensed to the Apache Software Foundation(ASF) under one
Jens Geyeraa0c8b32019-01-28 23:27:45 +01002// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Collections.Generic;
Jens Geyer5a17b132019-05-26 15:53:37 +020020using System.Diagnostics;
Jens Geyeraa0c8b32019-01-28 23:27:45 +010021using System.Text;
22using System.Threading;
23using System.Threading.Tasks;
24using Thrift.Protocol.Entities;
25using Thrift.Transport;
26
27namespace Thrift.Protocol
28{
29 //TODO: implementation of TProtocol
30
31 // ReSharper disable once InconsistentNaming
32 public class TCompactProtocol : TProtocol
33 {
34 private const byte ProtocolId = 0x82;
35 private const byte Version = 1;
36 private const byte VersionMask = 0x1f; // 0001 1111
37 private const byte TypeMask = 0xE0; // 1110 0000
38 private const byte TypeBits = 0x07; // 0000 0111
39 private const int TypeShiftAmount = 5;
40 private static readonly TStruct AnonymousStruct = new TStruct(string.Empty);
Jens Geyer5a17b132019-05-26 15:53:37 +020041 private static readonly TField StopField = new TField(string.Empty, TType.Stop, 0);
42
43 private const byte NoTypeOverride = 0xFF;
Jens Geyeraa0c8b32019-01-28 23:27:45 +010044
45 // ReSharper disable once InconsistentNaming
46 private static readonly byte[] TTypeToCompactType = new byte[16];
Jens Geyer5a17b132019-05-26 15:53:37 +020047 private static readonly TType[] CompactTypeToTType = new TType[13];
Jens Geyeraa0c8b32019-01-28 23:27:45 +010048
49 /// <summary>
50 /// Used to keep track of the last field for the current and previous structs, so we can do the delta stuff.
51 /// </summary>
52 private readonly Stack<short> _lastField = new Stack<short>(15);
53
54 /// <summary>
55 /// If we encounter a boolean field begin, save the TField here so it can have the value incorporated.
56 /// </summary>
57 private TField? _booleanField;
58
59 /// <summary>
60 /// If we Read a field header, and it's a boolean field, save the boolean value here so that ReadBool can use it.
61 /// </summary>
62 private bool? _boolValue;
63
64 private short _lastFieldId;
65
Jens Geyer5a17b132019-05-26 15:53:37 +020066 // minimize memory allocations by means of an preallocated bytes buffer
67 // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long)
68 private byte[] PreAllocatedBuffer = new byte[128];
69
70 private struct VarInt
71 {
72 public byte[] bytes;
73 public int count;
74 }
75
76 // minimize memory allocations by means of an preallocated VarInt buffer
77 private VarInt PreAllocatedVarInt = new VarInt()
78 {
79 bytes = new byte[10], // see Int64ToVarInt()
80 count = 0
81 };
82
83
84
85
Jens Geyeraa0c8b32019-01-28 23:27:45 +010086 public TCompactProtocol(TTransport trans)
87 : base(trans)
88 {
89 TTypeToCompactType[(int) TType.Stop] = Types.Stop;
90 TTypeToCompactType[(int) TType.Bool] = Types.BooleanTrue;
91 TTypeToCompactType[(int) TType.Byte] = Types.Byte;
92 TTypeToCompactType[(int) TType.I16] = Types.I16;
93 TTypeToCompactType[(int) TType.I32] = Types.I32;
94 TTypeToCompactType[(int) TType.I64] = Types.I64;
95 TTypeToCompactType[(int) TType.Double] = Types.Double;
96 TTypeToCompactType[(int) TType.String] = Types.Binary;
97 TTypeToCompactType[(int) TType.List] = Types.List;
98 TTypeToCompactType[(int) TType.Set] = Types.Set;
99 TTypeToCompactType[(int) TType.Map] = Types.Map;
100 TTypeToCompactType[(int) TType.Struct] = Types.Struct;
Jens Geyer5a17b132019-05-26 15:53:37 +0200101
102 CompactTypeToTType[Types.Stop] = TType.Stop;
103 CompactTypeToTType[Types.BooleanTrue] = TType.Bool;
104 CompactTypeToTType[Types.BooleanFalse] = TType.Bool;
105 CompactTypeToTType[Types.Byte] = TType.Byte;
106 CompactTypeToTType[Types.I16] = TType.I16;
107 CompactTypeToTType[Types.I32] = TType.I32;
108 CompactTypeToTType[Types.I64] = TType.I64;
109 CompactTypeToTType[Types.Double] = TType.Double;
110 CompactTypeToTType[Types.Binary] = TType.String;
111 CompactTypeToTType[Types.List] = TType.List;
112 CompactTypeToTType[Types.Set] = TType.Set;
113 CompactTypeToTType[Types.Map] = TType.Map;
114 CompactTypeToTType[Types.Struct] = TType.Struct;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100115 }
116
117 public void Reset()
118 {
119 _lastField.Clear();
120 _lastFieldId = 0;
121 }
122
123 public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
124 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200125 PreAllocatedBuffer[0] = ProtocolId;
126 PreAllocatedBuffer[1] = (byte)((Version & VersionMask) | (((uint)message.Type << TypeShiftAmount) & TypeMask));
127 await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100128
Jens Geyer5a17b132019-05-26 15:53:37 +0200129 Int32ToVarInt((uint) message.SeqID, ref PreAllocatedVarInt);
130 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100131
132 await WriteStringAsync(message.Name, cancellationToken);
133 }
134
135 public override async Task WriteMessageEndAsync(CancellationToken cancellationToken)
136 {
137 if (cancellationToken.IsCancellationRequested)
138 {
139 await Task.FromCanceled(cancellationToken);
140 }
141 }
142
143 /// <summary>
144 /// Write a struct begin. This doesn't actually put anything on the wire. We
145 /// use it as an opportunity to put special placeholder markers on the field
146 /// stack so we can get the field id deltas correct.
147 /// </summary>
148 public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken)
149 {
150 if (cancellationToken.IsCancellationRequested)
151 {
152 await Task.FromCanceled(cancellationToken);
153 }
154
155 _lastField.Push(_lastFieldId);
156 _lastFieldId = 0;
157 }
158
159 public override async Task WriteStructEndAsync(CancellationToken cancellationToken)
160 {
161 if (cancellationToken.IsCancellationRequested)
162 {
163 await Task.FromCanceled(cancellationToken);
164 }
165
166 _lastFieldId = _lastField.Pop();
167 }
168
Jens Geyer5a17b132019-05-26 15:53:37 +0200169 private async Task WriteFieldBeginInternalAsync(TField field, byte fieldType, CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100170 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200171 // if there's a exType override passed in, use that. Otherwise ask GetCompactType().
172 if (fieldType == NoTypeOverride)
173 fieldType = GetCompactType(field.Type);
174
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100175
176 // check if we can use delta encoding for the field id
Jens Geyer5a17b132019-05-26 15:53:37 +0200177 if (field.ID > _lastFieldId)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100178 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200179 var delta = field.ID - _lastFieldId;
180 if (delta <= 15)
181 {
182 // Write them together
183 PreAllocatedBuffer[0] = (byte)((delta << 4) | fieldType);
184 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
185 _lastFieldId = field.ID;
186 return;
187 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100188 }
189
Jens Geyer5a17b132019-05-26 15:53:37 +0200190 // Write them separate
191 PreAllocatedBuffer[0] = fieldType;
192 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
193 await WriteI16Async(field.ID, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100194 _lastFieldId = field.ID;
195 }
196
197 public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)
198 {
199 if (field.Type == TType.Bool)
200 {
201 _booleanField = field;
202 }
203 else
204 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200205 await WriteFieldBeginInternalAsync(field, NoTypeOverride, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100206 }
207 }
208
209 public override async Task WriteFieldEndAsync(CancellationToken cancellationToken)
210 {
211 if (cancellationToken.IsCancellationRequested)
212 {
213 await Task.FromCanceled(cancellationToken);
214 }
215 }
216
217 public override async Task WriteFieldStopAsync(CancellationToken cancellationToken)
218 {
219 if (cancellationToken.IsCancellationRequested)
220 {
221 return;
222 }
223
Jens Geyer5a17b132019-05-26 15:53:37 +0200224 PreAllocatedBuffer[0] = Types.Stop;
225 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100226 }
227
228 protected async Task WriteCollectionBeginAsync(TType elemType, int size, CancellationToken cancellationToken)
229 {
230 if (cancellationToken.IsCancellationRequested)
231 {
232 return;
233 }
234
235 /*
236 Abstract method for writing the start of lists and sets. List and sets on
237 the wire differ only by the exType indicator.
238 */
239
240 if (size <= 14)
241 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200242 PreAllocatedBuffer[0] = (byte)((size << 4) | GetCompactType(elemType));
243 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100244 }
245 else
246 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200247 PreAllocatedBuffer[0] = (byte)(0xf0 | GetCompactType(elemType));
248 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100249
Jens Geyer5a17b132019-05-26 15:53:37 +0200250 Int32ToVarInt((uint) size, ref PreAllocatedVarInt);
251 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100252 }
253 }
254
255 public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken)
256 {
257 await WriteCollectionBeginAsync(list.ElementType, list.Count, cancellationToken);
258 }
259
260 public override async Task WriteListEndAsync(CancellationToken cancellationToken)
261 {
262 if (cancellationToken.IsCancellationRequested)
263 {
264 await Task.FromCanceled(cancellationToken);
265 }
266 }
267
268 public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)
269 {
270 if (cancellationToken.IsCancellationRequested)
271 {
272 return;
273 }
274
275 await WriteCollectionBeginAsync(set.ElementType, set.Count, cancellationToken);
276 }
277
278 public override async Task WriteSetEndAsync(CancellationToken cancellationToken)
279 {
280 if (cancellationToken.IsCancellationRequested)
281 {
282 await Task.FromCanceled(cancellationToken);
283 }
284 }
285
286 public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken)
287 {
288 if (cancellationToken.IsCancellationRequested)
289 {
290 return;
291 }
292
293 /*
294 Write a boolean value. Potentially, this could be a boolean field, in
295 which case the field header info isn't written yet. If so, decide what the
296 right exType header is for the value and then Write the field header.
297 Otherwise, Write a single byte.
298 */
299
300 if (_booleanField != null)
301 {
302 // we haven't written the field header yet
Jens Geyer5a17b132019-05-26 15:53:37 +0200303 var type = b ? Types.BooleanTrue : Types.BooleanFalse;
304 await WriteFieldBeginInternalAsync(_booleanField.Value, type, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100305 _booleanField = null;
306 }
307 else
308 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200309 // we're not part of a field, so just write the value.
310 PreAllocatedBuffer[0] = b ? Types.BooleanTrue : Types.BooleanFalse;
311 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100312 }
313 }
314
315 public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
316 {
317 if (cancellationToken.IsCancellationRequested)
318 {
319 return;
320 }
321
Jens Geyer5a17b132019-05-26 15:53:37 +0200322 PreAllocatedBuffer[0] = (byte)b;
323 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100324 }
325
326 public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
327 {
328 if (cancellationToken.IsCancellationRequested)
329 {
330 return;
331 }
332
Jens Geyer5a17b132019-05-26 15:53:37 +0200333 Int32ToVarInt(IntToZigzag(i16), ref PreAllocatedVarInt);
334 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100335 }
336
Jens Geyer5a17b132019-05-26 15:53:37 +0200337 private static void Int32ToVarInt(uint n, ref VarInt varint)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100338 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200339 // Write an i32 as a varint. Results in 1 - 5 bytes on the wire.
340 varint.count = 0;
341 Debug.Assert(varint.bytes.Length >= 5);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100342
343 while (true)
344 {
345 if ((n & ~0x7F) == 0)
346 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200347 varint.bytes[varint.count++] = (byte)n;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100348 break;
349 }
350
Jens Geyer5a17b132019-05-26 15:53:37 +0200351 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100352 n >>= 7;
353 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100354 }
355
356 public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
357 {
358 if (cancellationToken.IsCancellationRequested)
359 {
360 return;
361 }
362
Jens Geyer5a17b132019-05-26 15:53:37 +0200363 Int32ToVarInt(IntToZigzag(i32), ref PreAllocatedVarInt);
364 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100365 }
366
Jens Geyer5a17b132019-05-26 15:53:37 +0200367 static private void Int64ToVarInt(ulong n, ref VarInt varint)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100368 {
369 // Write an i64 as a varint. Results in 1-10 bytes on the wire.
Jens Geyer5a17b132019-05-26 15:53:37 +0200370 varint.count = 0;
371 Debug.Assert(varint.bytes.Length >= 10);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100372
373 while (true)
374 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200375 if ((n & ~(ulong)0x7FL) == 0)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100376 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200377 varint.bytes[varint.count++] = (byte)n;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100378 break;
379 }
Jens Geyer5a17b132019-05-26 15:53:37 +0200380 varint.bytes[varint.count++] = (byte)((n & 0x7F) | 0x80);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100381 n >>= 7;
382 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100383 }
384
385 public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
386 {
387 if (cancellationToken.IsCancellationRequested)
388 {
389 return;
390 }
391
Jens Geyer5a17b132019-05-26 15:53:37 +0200392 Int64ToVarInt(LongToZigzag(i64), ref PreAllocatedVarInt);
393 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100394 }
395
396 public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
397 {
398 if (cancellationToken.IsCancellationRequested)
399 {
400 return;
401 }
402
Jens Geyer5a17b132019-05-26 15:53:37 +0200403 FixedLongToBytes(BitConverter.DoubleToInt64Bits(d), PreAllocatedBuffer, 0);
404 await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100405 }
406
407 public override async Task WriteStringAsync(string str, CancellationToken cancellationToken)
408 {
409 if (cancellationToken.IsCancellationRequested)
410 {
411 return;
412 }
413
414 var bytes = Encoding.UTF8.GetBytes(str);
415
Jens Geyer5a17b132019-05-26 15:53:37 +0200416 Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt);
417 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100418 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
419 }
420
421 public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
422 {
423 if (cancellationToken.IsCancellationRequested)
424 {
425 return;
426 }
427
Jens Geyer5a17b132019-05-26 15:53:37 +0200428 Int32ToVarInt((uint) bytes.Length, ref PreAllocatedVarInt);
429 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100430 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
431 }
432
433 public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)
434 {
435 if (cancellationToken.IsCancellationRequested)
436 {
437 return;
438 }
Jens Geyer5a17b132019-05-26 15:53:37 +0200439
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100440 if (map.Count == 0)
441 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200442 PreAllocatedBuffer[0] = 0;
443 await Trans.WriteAsync( PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100444 }
445 else
446 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200447 Int32ToVarInt((uint) map.Count, ref PreAllocatedVarInt);
448 await Trans.WriteAsync(PreAllocatedVarInt.bytes, 0, PreAllocatedVarInt.count, cancellationToken);
449
450 PreAllocatedBuffer[0] = (byte)((GetCompactType(map.KeyType) << 4) | GetCompactType(map.ValueType));
451 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100452 }
453 }
454
455 public override async Task WriteMapEndAsync(CancellationToken cancellationToken)
456 {
457 if (cancellationToken.IsCancellationRequested)
458 {
459 await Task.FromCanceled(cancellationToken);
460 }
461 }
462
Jens Geyer5a17b132019-05-26 15:53:37 +0200463 public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100464 {
465 if (cancellationToken.IsCancellationRequested)
466 {
467 return await Task.FromCanceled<TMessage>(cancellationToken);
468 }
469
470 var protocolId = (byte) await ReadByteAsync(cancellationToken);
471 if (protocolId != ProtocolId)
472 {
473 throw new TProtocolException($"Expected protocol id {ProtocolId:X} but got {protocolId:X}");
474 }
475
476 var versionAndType = (byte) await ReadByteAsync(cancellationToken);
477 var version = (byte) (versionAndType & VersionMask);
478
479 if (version != Version)
480 {
481 throw new TProtocolException($"Expected version {Version} but got {version}");
482 }
483
484 var type = (byte) ((versionAndType >> TypeShiftAmount) & TypeBits);
485 var seqid = (int) await ReadVarInt32Async(cancellationToken);
486 var messageName = await ReadStringAsync(cancellationToken);
487
488 return new TMessage(messageName, (TMessageType) type, seqid);
489 }
490
491 public override async Task ReadMessageEndAsync(CancellationToken cancellationToken)
492 {
493 if (cancellationToken.IsCancellationRequested)
494 {
495 await Task.FromCanceled(cancellationToken);
496 }
497 }
498
Jens Geyer5a17b132019-05-26 15:53:37 +0200499 public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100500 {
501 if (cancellationToken.IsCancellationRequested)
502 {
503 return await Task.FromCanceled<TStruct>(cancellationToken);
504 }
505
506 // some magic is here )
507
508 _lastField.Push(_lastFieldId);
509 _lastFieldId = 0;
510
511 return AnonymousStruct;
512 }
513
514 public override async Task ReadStructEndAsync(CancellationToken cancellationToken)
515 {
516 if (cancellationToken.IsCancellationRequested)
517 {
518 await Task.FromCanceled(cancellationToken);
519 }
520
521 /*
522 Doesn't actually consume any wire data, just removes the last field for
523 this struct from the field stack.
524 */
525
526 // consume the last field we Read off the wire.
527 _lastFieldId = _lastField.Pop();
528 }
529
Jens Geyer5a17b132019-05-26 15:53:37 +0200530 public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100531 {
532 // Read a field header off the wire.
533 var type = (byte) await ReadByteAsync(cancellationToken);
Jens Geyer5a17b132019-05-26 15:53:37 +0200534
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100535 // if it's a stop, then we can return immediately, as the struct is over.
536 if (type == Types.Stop)
537 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200538 return StopField;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100539 }
540
Jens Geyer5a17b132019-05-26 15:53:37 +0200541
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100542 // mask off the 4 MSB of the exType header. it could contain a field id delta.
543 var modifier = (short) ((type & 0xf0) >> 4);
Jens Geyer5a17b132019-05-26 15:53:37 +0200544 var compactType = (byte)(type & 0x0f);
545
546 short fieldId;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100547 if (modifier == 0)
548 {
549 fieldId = await ReadI16Async(cancellationToken);
550 }
551 else
552 {
553 fieldId = (short) (_lastFieldId + modifier);
554 }
555
Jens Geyer5a17b132019-05-26 15:53:37 +0200556 var ttype = GetTType(compactType);
557 var field = new TField(string.Empty, ttype, fieldId);
558
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100559 // if this happens to be a boolean field, the value is encoded in the exType
Jens Geyer5a17b132019-05-26 15:53:37 +0200560 if( ttype == TType.Bool)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100561 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200562 _boolValue = (compactType == Types.BooleanTrue);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100563 }
564
565 // push the new field onto the field stack so we can keep the deltas going.
566 _lastFieldId = field.ID;
567 return field;
568 }
569
570 public override async Task ReadFieldEndAsync(CancellationToken cancellationToken)
571 {
572 if (cancellationToken.IsCancellationRequested)
573 {
574 await Task.FromCanceled(cancellationToken);
575 }
576 }
577
Jens Geyer5a17b132019-05-26 15:53:37 +0200578 public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100579 {
580 if (cancellationToken.IsCancellationRequested)
581 {
582 await Task.FromCanceled<TMap>(cancellationToken);
583 }
584
585 /*
586 Read a map header off the wire. If the size is zero, skip Reading the key
587 and value exType. This means that 0-length maps will yield TMaps without the
588 "correct" types.
589 */
590
591 var size = (int) await ReadVarInt32Async(cancellationToken);
592 var keyAndValueType = size == 0 ? (byte) 0 : (byte) await ReadByteAsync(cancellationToken);
593 return new TMap(GetTType((byte) (keyAndValueType >> 4)), GetTType((byte) (keyAndValueType & 0xf)), size);
594 }
595
596 public override async Task ReadMapEndAsync(CancellationToken cancellationToken)
597 {
598 if (cancellationToken.IsCancellationRequested)
599 {
600 await Task.FromCanceled(cancellationToken);
601 }
602 }
603
Jens Geyer5a17b132019-05-26 15:53:37 +0200604 public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100605 {
606 /*
607 Read a set header off the wire. If the set size is 0-14, the size will
608 be packed into the element exType header. If it's a longer set, the 4 MSB
609 of the element exType header will be 0xF, and a varint will follow with the
610 true size.
611 */
612
613 return new TSet(await ReadListBeginAsync(cancellationToken));
614 }
615
Jens Geyer5a17b132019-05-26 15:53:37 +0200616 public override ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100617 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100618 /*
619 Read a boolean off the wire. If this is a boolean field, the value should
620 already have been Read during ReadFieldBegin, so we'll just consume the
621 pre-stored value. Otherwise, Read a byte.
622 */
623
624 if (_boolValue != null)
625 {
626 var result = _boolValue.Value;
627 _boolValue = null;
Jens Geyer5a17b132019-05-26 15:53:37 +0200628 return new ValueTask<bool>(result);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100629 }
630
Jens Geyer5a17b132019-05-26 15:53:37 +0200631 return InternalCall();
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100632
Jens Geyer5a17b132019-05-26 15:53:37 +0200633 async ValueTask<bool> InternalCall()
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100634 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200635 var data = await ReadByteAsync(cancellationToken);
636 return (data == Types.BooleanTrue);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100637 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100638 }
639
Jens Geyer5a17b132019-05-26 15:53:37 +0200640
641 public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
642 {
643 // Read a single byte off the wire. Nothing interesting here.
644 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
645 return (sbyte)PreAllocatedBuffer[0];
646 }
647
648 public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100649 {
650 if (cancellationToken.IsCancellationRequested)
651 {
652 return await Task.FromCanceled<short>(cancellationToken);
653 }
654
655 return (short) ZigzagToInt(await ReadVarInt32Async(cancellationToken));
656 }
657
Jens Geyer5a17b132019-05-26 15:53:37 +0200658 public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100659 {
660 if (cancellationToken.IsCancellationRequested)
661 {
662 return await Task.FromCanceled<int>(cancellationToken);
663 }
664
665 return ZigzagToInt(await ReadVarInt32Async(cancellationToken));
666 }
667
Jens Geyer5a17b132019-05-26 15:53:37 +0200668 public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100669 {
670 if (cancellationToken.IsCancellationRequested)
671 {
672 return await Task.FromCanceled<long>(cancellationToken);
673 }
674
675 return ZigzagToLong(await ReadVarInt64Async(cancellationToken));
676 }
677
Jens Geyer5a17b132019-05-26 15:53:37 +0200678 public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100679 {
680 if (cancellationToken.IsCancellationRequested)
681 {
682 return await Task.FromCanceled<double>(cancellationToken);
683 }
684
Jens Geyer5a17b132019-05-26 15:53:37 +0200685 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100686
Jens Geyer5a17b132019-05-26 15:53:37 +0200687 return BitConverter.Int64BitsToDouble(BytesToLong(PreAllocatedBuffer));
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100688 }
689
Jens Geyer5a17b132019-05-26 15:53:37 +0200690 public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100691 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200692 // read length
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100693 var length = (int) await ReadVarInt32Async(cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100694 if (length == 0)
695 {
696 return string.Empty;
697 }
698
Jens Geyer5a17b132019-05-26 15:53:37 +0200699 // read and decode data
700 if (length < PreAllocatedBuffer.Length)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100701 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200702 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, length, cancellationToken);
703 return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, length);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100704 }
705
Jens Geyer5a17b132019-05-26 15:53:37 +0200706 var buf = new byte[length];
707 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
708 return Encoding.UTF8.GetString(buf, 0, length);
709 }
710
711 public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
712 {
713 // read length
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100714 var length = (int) await ReadVarInt32Async(cancellationToken);
715 if (length == 0)
716 {
717 return new byte[0];
718 }
719
Jens Geyer5a17b132019-05-26 15:53:37 +0200720 // read data
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100721 var buf = new byte[length];
722 await Trans.ReadAllAsync(buf, 0, length, cancellationToken);
723 return buf;
724 }
725
Jens Geyer5a17b132019-05-26 15:53:37 +0200726 public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100727 {
728 if (cancellationToken.IsCancellationRequested)
729 {
730 await Task.FromCanceled<TList>(cancellationToken);
731 }
732
733 /*
734 Read a list header off the wire. If the list size is 0-14, the size will
735 be packed into the element exType header. If it's a longer list, the 4 MSB
736 of the element exType header will be 0xF, and a varint will follow with the
737 true size.
738 */
739
740 var sizeAndType = (byte) await ReadByteAsync(cancellationToken);
741 var size = (sizeAndType >> 4) & 0x0f;
742 if (size == 15)
743 {
744 size = (int) await ReadVarInt32Async(cancellationToken);
745 }
746
747 var type = GetTType(sizeAndType);
748 return new TList(type, size);
749 }
750
751 public override async Task ReadListEndAsync(CancellationToken cancellationToken)
752 {
753 if (cancellationToken.IsCancellationRequested)
754 {
755 await Task.FromCanceled(cancellationToken);
756 }
757 }
758
759 public override async Task ReadSetEndAsync(CancellationToken cancellationToken)
760 {
761 if (cancellationToken.IsCancellationRequested)
762 {
763 await Task.FromCanceled(cancellationToken);
764 }
765 }
766
767 private static byte GetCompactType(TType ttype)
768 {
769 // Given a TType value, find the appropriate TCompactProtocol.Types constant.
770 return TTypeToCompactType[(int) ttype];
771 }
772
773
Jens Geyer5a17b132019-05-26 15:53:37 +0200774 private async ValueTask<uint> ReadVarInt32Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100775 {
776 if (cancellationToken.IsCancellationRequested)
777 {
778 return await Task.FromCanceled<uint>(cancellationToken);
779 }
780
781 /*
782 Read an i32 from the wire as a varint. The MSB of each byte is set
783 if there is another byte to follow. This can Read up to 5 bytes.
784 */
785
786 uint result = 0;
787 var shift = 0;
788
789 while (true)
790 {
791 var b = (byte) await ReadByteAsync(cancellationToken);
792 result |= (uint) (b & 0x7f) << shift;
793 if ((b & 0x80) != 0x80)
794 {
795 break;
796 }
797 shift += 7;
798 }
799
800 return result;
801 }
802
Jens Geyer5a17b132019-05-26 15:53:37 +0200803 private async ValueTask<ulong> ReadVarInt64Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100804 {
805 if (cancellationToken.IsCancellationRequested)
806 {
807 return await Task.FromCanceled<uint>(cancellationToken);
808 }
809
810 /*
811 Read an i64 from the wire as a proper varint. The MSB of each byte is set
812 if there is another byte to follow. This can Read up to 10 bytes.
813 */
814
815 var shift = 0;
816 ulong result = 0;
817 while (true)
818 {
819 var b = (byte) await ReadByteAsync(cancellationToken);
820 result |= (ulong) (b & 0x7f) << shift;
821 if ((b & 0x80) != 0x80)
822 {
823 break;
824 }
825 shift += 7;
826 }
827
828 return result;
829 }
830
831 private static int ZigzagToInt(uint n)
832 {
833 return (int) (n >> 1) ^ -(int) (n & 1);
834 }
835
836 private static long ZigzagToLong(ulong n)
837 {
838 return (long) (n >> 1) ^ -(long) (n & 1);
839 }
840
841 private static long BytesToLong(byte[] bytes)
842 {
843 /*
844 Note that it's important that the mask bytes are long literals,
845 otherwise they'll default to ints, and when you shift an int left 56 bits,
846 you just get a messed up int.
847 */
848
849 return
850 ((bytes[7] & 0xffL) << 56) |
851 ((bytes[6] & 0xffL) << 48) |
852 ((bytes[5] & 0xffL) << 40) |
853 ((bytes[4] & 0xffL) << 32) |
854 ((bytes[3] & 0xffL) << 24) |
855 ((bytes[2] & 0xffL) << 16) |
856 ((bytes[1] & 0xffL) << 8) |
857 (bytes[0] & 0xffL);
858 }
859
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100860 private static TType GetTType(byte type)
861 {
862 // Given a TCompactProtocol.Types constant, convert it to its corresponding TType value.
Jens Geyer5a17b132019-05-26 15:53:37 +0200863 return CompactTypeToTType[type & 0x0f];
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100864 }
865
866 private static ulong LongToZigzag(long n)
867 {
868 // Convert l into a zigzag long. This allows negative numbers to be represented compactly as a varint
869 return (ulong) (n << 1) ^ (ulong) (n >> 63);
870 }
871
872 private static uint IntToZigzag(int n)
873 {
874 // Convert n into a zigzag int. This allows negative numbers to be represented compactly as a varint
875 return (uint) (n << 1) ^ (uint) (n >> 31);
876 }
877
878 private static void FixedLongToBytes(long n, byte[] buf, int off)
879 {
880 // Convert a long into little-endian bytes in buf starting at off and going until off+7.
881 buf[off + 0] = (byte) (n & 0xff);
882 buf[off + 1] = (byte) ((n >> 8) & 0xff);
883 buf[off + 2] = (byte) ((n >> 16) & 0xff);
884 buf[off + 3] = (byte) ((n >> 24) & 0xff);
885 buf[off + 4] = (byte) ((n >> 32) & 0xff);
886 buf[off + 5] = (byte) ((n >> 40) & 0xff);
887 buf[off + 6] = (byte) ((n >> 48) & 0xff);
888 buf[off + 7] = (byte) ((n >> 56) & 0xff);
889 }
890
Jens Geyer421444f2019-03-20 22:13:25 +0100891 public class Factory : TProtocolFactory
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100892 {
Jens Geyer421444f2019-03-20 22:13:25 +0100893 public override TProtocol GetProtocol(TTransport trans)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100894 {
895 return new TCompactProtocol(trans);
896 }
897 }
898
899 /// <summary>
900 /// All of the on-wire exType codes.
901 /// </summary>
902 private static class Types
903 {
904 public const byte Stop = 0x00;
905 public const byte BooleanTrue = 0x01;
906 public const byte BooleanFalse = 0x02;
907 public const byte Byte = 0x03;
908 public const byte I16 = 0x04;
909 public const byte I32 = 0x05;
910 public const byte I64 = 0x06;
911 public const byte Double = 0x07;
912 public const byte Binary = 0x08;
913 public const byte List = 0x09;
914 public const byte Set = 0x0A;
915 public const byte Map = 0x0B;
916 public const byte Struct = 0x0C;
917 }
918 }
Jens Geyer421444f2019-03-20 22:13:25 +0100919}