blob: f0772aa516dae76364a1905d7a5106ad9c6988ba [file] [log] [blame]
Jens Geyer421444f2019-03-20 22:13:25 +01001// Licensed to the Apache Software Foundation(ASF) under one
Jens Geyeraa0c8b32019-01-28 23:27:45 +01002// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
zembord9d958a32019-11-21 13:11:44 +030019using System.Buffers.Binary;
Jens Geyeraa0c8b32019-01-28 23:27:45 +010020using System.Text;
21using System.Threading;
22using System.Threading.Tasks;
23using Thrift.Protocol.Entities;
24using Thrift.Transport;
25
26namespace Thrift.Protocol
27{
28 // ReSharper disable once InconsistentNaming
29 public class TBinaryProtocol : TProtocol
30 {
Jens Geyeraa0c8b32019-01-28 23:27:45 +010031 protected const uint VersionMask = 0xffff0000;
32 protected const uint Version1 = 0x80010000;
33
34 protected bool StrictRead;
35 protected bool StrictWrite;
36
Jens Geyer5a17b132019-05-26 15:53:37 +020037 // minimize memory allocations by means of an preallocated bytes buffer
38 // The value of 128 is arbitrarily chosen, the required minimum size must be sizeof(long)
39 private byte[] PreAllocatedBuffer = new byte[128];
40
41 private static readonly TStruct AnonymousStruct = new TStruct(string.Empty);
42 private static readonly TField StopField = new TField() { Type = TType.Stop };
43
Jens Geyeraa0c8b32019-01-28 23:27:45 +010044 public TBinaryProtocol(TTransport trans)
45 : this(trans, false, true)
46 {
47 }
48
49 public TBinaryProtocol(TTransport trans, bool strictRead, bool strictWrite)
50 : base(trans)
51 {
52 StrictRead = strictRead;
53 StrictWrite = strictWrite;
54 }
55
56 public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
57 {
58 if (cancellationToken.IsCancellationRequested)
59 {
60 return;
61 }
62
63 if (StrictWrite)
64 {
65 var version = Version1 | (uint) message.Type;
66 await WriteI32Async((int) version, cancellationToken);
67 await WriteStringAsync(message.Name, cancellationToken);
68 await WriteI32Async(message.SeqID, cancellationToken);
69 }
70 else
71 {
72 await WriteStringAsync(message.Name, cancellationToken);
73 await WriteByteAsync((sbyte) message.Type, cancellationToken);
74 await WriteI32Async(message.SeqID, cancellationToken);
75 }
76 }
77
78 public override async Task WriteMessageEndAsync(CancellationToken cancellationToken)
79 {
80 if (cancellationToken.IsCancellationRequested)
81 {
82 await Task.FromCanceled(cancellationToken);
83 }
84 }
85
86 public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken)
87 {
88 if (cancellationToken.IsCancellationRequested)
89 {
90 await Task.FromCanceled(cancellationToken);
91 }
92 }
93
94 public override async Task WriteStructEndAsync(CancellationToken cancellationToken)
95 {
96 if (cancellationToken.IsCancellationRequested)
97 {
98 await Task.FromCanceled(cancellationToken);
99 }
100 }
101
102 public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)
103 {
104 if (cancellationToken.IsCancellationRequested)
105 {
106 return;
107 }
108
109 await WriteByteAsync((sbyte) field.Type, cancellationToken);
110 await WriteI16Async(field.ID, cancellationToken);
111 }
112
113 public override async Task WriteFieldEndAsync(CancellationToken cancellationToken)
114 {
115 if (cancellationToken.IsCancellationRequested)
116 {
117 await Task.FromCanceled(cancellationToken);
118 }
119 }
120
121 public override async Task WriteFieldStopAsync(CancellationToken cancellationToken)
122 {
123 if (cancellationToken.IsCancellationRequested)
124 {
125 return;
126 }
127
128 await WriteByteAsync((sbyte) TType.Stop, cancellationToken);
129 }
130
131 public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)
132 {
133 if (cancellationToken.IsCancellationRequested)
134 {
135 return;
136 }
137
Jens Geyer5a17b132019-05-26 15:53:37 +0200138 PreAllocatedBuffer[0] = (byte)map.KeyType;
139 PreAllocatedBuffer[1] = (byte)map.ValueType;
140 await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
141
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100142 await WriteI32Async(map.Count, cancellationToken);
143 }
144
145 public override async Task WriteMapEndAsync(CancellationToken cancellationToken)
Jens Geyer5a17b132019-05-26 15:53:37 +0200146
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100147 {
148 if (cancellationToken.IsCancellationRequested)
149 {
150 await Task.FromCanceled(cancellationToken);
151 }
152 }
153
154 public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken)
155 {
156 if (cancellationToken.IsCancellationRequested)
157 {
158 return;
159 }
160
161 await WriteByteAsync((sbyte) list.ElementType, cancellationToken);
162 await WriteI32Async(list.Count, cancellationToken);
163 }
164
165 public override async Task WriteListEndAsync(CancellationToken cancellationToken)
166 {
167 if (cancellationToken.IsCancellationRequested)
168 {
169 await Task.FromCanceled(cancellationToken);
170 }
171 }
172
173 public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)
174 {
175 if (cancellationToken.IsCancellationRequested)
176 {
177 return;
178 }
179
180 await WriteByteAsync((sbyte) set.ElementType, cancellationToken);
181 await WriteI32Async(set.Count, cancellationToken);
182 }
183
184 public override async Task WriteSetEndAsync(CancellationToken cancellationToken)
185 {
186 if (cancellationToken.IsCancellationRequested)
187 {
188 await Task.FromCanceled(cancellationToken);
189 }
190 }
191
192 public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken)
193 {
194 if (cancellationToken.IsCancellationRequested)
195 {
196 return;
197 }
198
199 await WriteByteAsync(b ? (sbyte) 1 : (sbyte) 0, cancellationToken);
200 }
201
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100202 public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
203 {
204 if (cancellationToken.IsCancellationRequested)
205 {
206 return;
207 }
208
Jens Geyer5a17b132019-05-26 15:53:37 +0200209 PreAllocatedBuffer[0] = (byte)b;
210
211 await Trans.WriteAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100212 }
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100213 public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
214 {
215 if (cancellationToken.IsCancellationRequested)
216 {
217 return;
218 }
zembord9d958a32019-11-21 13:11:44 +0300219 BinaryPrimitives.WriteInt16BigEndian(PreAllocatedBuffer, i16);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100220
Jens Geyer5a17b132019-05-26 15:53:37 +0200221 await Trans.WriteAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100222 }
223
224 public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
225 {
226 if (cancellationToken.IsCancellationRequested)
227 {
228 return;
229 }
230
zembord9d958a32019-11-21 13:11:44 +0300231 BinaryPrimitives.WriteInt32BigEndian(PreAllocatedBuffer, i32);
Jens Geyer5a17b132019-05-26 15:53:37 +0200232
233 await Trans.WriteAsync(PreAllocatedBuffer, 0, 4, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100234 }
235
Jens Geyer5a17b132019-05-26 15:53:37 +0200236
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100237 public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
238 {
239 if (cancellationToken.IsCancellationRequested)
240 {
241 return;
242 }
243
zembord9d958a32019-11-21 13:11:44 +0300244 BinaryPrimitives.WriteInt64BigEndian(PreAllocatedBuffer, i64);
Jens Geyer5a17b132019-05-26 15:53:37 +0200245
246 await Trans.WriteAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100247 }
248
249 public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
250 {
251 if (cancellationToken.IsCancellationRequested)
252 {
253 return;
254 }
255
256 await WriteI64Async(BitConverter.DoubleToInt64Bits(d), cancellationToken);
257 }
258
Jens Geyer5a17b132019-05-26 15:53:37 +0200259
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100260 public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
261 {
262 if (cancellationToken.IsCancellationRequested)
263 {
264 return;
265 }
266
267 await WriteI32Async(bytes.Length, cancellationToken);
268 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
269 }
270
Jens Geyer5a17b132019-05-26 15:53:37 +0200271 public override async ValueTask<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100272 {
273 if (cancellationToken.IsCancellationRequested)
274 {
275 return await Task.FromCanceled<TMessage>(cancellationToken);
276 }
277
278 var message = new TMessage();
279 var size = await ReadI32Async(cancellationToken);
280 if (size < 0)
281 {
282 var version = (uint) size & VersionMask;
283 if (version != Version1)
284 {
285 throw new TProtocolException(TProtocolException.BAD_VERSION,
286 $"Bad version in ReadMessageBegin: {version}");
287 }
288 message.Type = (TMessageType) (size & 0x000000ff);
289 message.Name = await ReadStringAsync(cancellationToken);
290 message.SeqID = await ReadI32Async(cancellationToken);
291 }
292 else
293 {
294 if (StrictRead)
295 {
296 throw new TProtocolException(TProtocolException.BAD_VERSION,
297 "Missing version in ReadMessageBegin, old client?");
298 }
Jens Geyer5a17b132019-05-26 15:53:37 +0200299 message.Name = (size > 0) ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100300 message.Type = (TMessageType) await ReadByteAsync(cancellationToken);
301 message.SeqID = await ReadI32Async(cancellationToken);
302 }
303 return message;
304 }
305
306 public override async Task ReadMessageEndAsync(CancellationToken cancellationToken)
307 {
308 if (cancellationToken.IsCancellationRequested)
309 {
310 await Task.FromCanceled(cancellationToken);
311 }
312 }
313
Jens Geyer5a17b132019-05-26 15:53:37 +0200314 public override async ValueTask<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100315 {
316 if (cancellationToken.IsCancellationRequested)
317 {
318 await Task.FromCanceled(cancellationToken);
319 }
320
Jens Geyer5a17b132019-05-26 15:53:37 +0200321 return AnonymousStruct;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100322 }
323
324 public override async Task ReadStructEndAsync(CancellationToken cancellationToken)
325 {
326 if (cancellationToken.IsCancellationRequested)
327 {
328 await Task.FromCanceled(cancellationToken);
329 }
330 }
331
Jens Geyer5a17b132019-05-26 15:53:37 +0200332 public override async ValueTask<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100333 {
334 if (cancellationToken.IsCancellationRequested)
335 {
336 return await Task.FromCanceled<TField>(cancellationToken);
337 }
338
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100339
Jens Geyer5a17b132019-05-26 15:53:37 +0200340 var type = (TType)await ReadByteAsync(cancellationToken);
341 if (type == TType.Stop)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100342 {
Jens Geyer5a17b132019-05-26 15:53:37 +0200343 return StopField;
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100344 }
345
Jens Geyer5a17b132019-05-26 15:53:37 +0200346 return new TField {
347 Type = type,
348 ID = await ReadI16Async(cancellationToken)
349 };
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100350 }
351
352 public override async Task ReadFieldEndAsync(CancellationToken cancellationToken)
353 {
354 if (cancellationToken.IsCancellationRequested)
355 {
356 await Task.FromCanceled(cancellationToken);
357 }
358 }
359
Jens Geyer5a17b132019-05-26 15:53:37 +0200360 public override async ValueTask<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100361 {
362 if (cancellationToken.IsCancellationRequested)
363 {
364 return await Task.FromCanceled<TMap>(cancellationToken);
365 }
366
367 var map = new TMap
368 {
369 KeyType = (TType) await ReadByteAsync(cancellationToken),
370 ValueType = (TType) await ReadByteAsync(cancellationToken),
371 Count = await ReadI32Async(cancellationToken)
372 };
373
374 return map;
375 }
376
377 public override async Task ReadMapEndAsync(CancellationToken cancellationToken)
378 {
379 if (cancellationToken.IsCancellationRequested)
380 {
381 await Task.FromCanceled(cancellationToken);
382 }
383 }
384
Jens Geyer5a17b132019-05-26 15:53:37 +0200385 public override async ValueTask<TList> ReadListBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100386 {
387 if (cancellationToken.IsCancellationRequested)
388 {
389 return await Task.FromCanceled<TList>(cancellationToken);
390 }
391
392 var list = new TList
393 {
394 ElementType = (TType) await ReadByteAsync(cancellationToken),
395 Count = await ReadI32Async(cancellationToken)
396 };
397
398 return list;
399 }
400
401 public override async Task ReadListEndAsync(CancellationToken cancellationToken)
402 {
403 if (cancellationToken.IsCancellationRequested)
404 {
405 await Task.FromCanceled(cancellationToken);
406 }
407 }
408
Jens Geyer5a17b132019-05-26 15:53:37 +0200409 public override async ValueTask<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100410 {
411 if (cancellationToken.IsCancellationRequested)
412 {
413 return await Task.FromCanceled<TSet>(cancellationToken);
414 }
415
416 var set = new TSet
417 {
418 ElementType = (TType) await ReadByteAsync(cancellationToken),
419 Count = await ReadI32Async(cancellationToken)
420 };
421
422 return set;
423 }
424
425 public override async Task ReadSetEndAsync(CancellationToken cancellationToken)
426 {
427 if (cancellationToken.IsCancellationRequested)
428 {
429 await Task.FromCanceled(cancellationToken);
430 }
431 }
432
Jens Geyer5a17b132019-05-26 15:53:37 +0200433 public override async ValueTask<bool> ReadBoolAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100434 {
435 if (cancellationToken.IsCancellationRequested)
436 {
437 return await Task.FromCanceled<bool>(cancellationToken);
438 }
439
440 return await ReadByteAsync(cancellationToken) == 1;
441 }
442
Jens Geyer5a17b132019-05-26 15:53:37 +0200443 public override async ValueTask<sbyte> ReadByteAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100444 {
445 if (cancellationToken.IsCancellationRequested)
446 {
447 return await Task.FromCanceled<sbyte>(cancellationToken);
448 }
449
Jens Geyer5a17b132019-05-26 15:53:37 +0200450 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 1, cancellationToken);
451 return (sbyte)PreAllocatedBuffer[0];
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100452 }
453
Jens Geyer5a17b132019-05-26 15:53:37 +0200454 public override async ValueTask<short> ReadI16Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100455 {
456 if (cancellationToken.IsCancellationRequested)
457 {
458 return await Task.FromCanceled<short>(cancellationToken);
459 }
460
Jens Geyer5a17b132019-05-26 15:53:37 +0200461 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 2, cancellationToken);
zembord9d958a32019-11-21 13:11:44 +0300462 var result = BinaryPrimitives.ReadInt16BigEndian(PreAllocatedBuffer);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100463 return result;
464 }
465
Jens Geyer5a17b132019-05-26 15:53:37 +0200466 public override async ValueTask<int> ReadI32Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100467 {
468 if (cancellationToken.IsCancellationRequested)
469 {
470 return await Task.FromCanceled<int>(cancellationToken);
471 }
472
Jens Geyer5a17b132019-05-26 15:53:37 +0200473 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 4, cancellationToken);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100474
zembord9d958a32019-11-21 13:11:44 +0300475 var result = BinaryPrimitives.ReadInt32BigEndian(PreAllocatedBuffer);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100476
477 return result;
478 }
479
Jens Geyer5a17b132019-05-26 15:53:37 +0200480 public override async ValueTask<long> ReadI64Async(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100481 {
482 if (cancellationToken.IsCancellationRequested)
483 {
484 return await Task.FromCanceled<long>(cancellationToken);
485 }
486
Jens Geyer5a17b132019-05-26 15:53:37 +0200487 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, 8, cancellationToken);
zembord9d958a32019-11-21 13:11:44 +0300488 return BinaryPrimitives.ReadInt64BigEndian(PreAllocatedBuffer);
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100489 }
490
Jens Geyer5a17b132019-05-26 15:53:37 +0200491 public override async ValueTask<double> ReadDoubleAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100492 {
493 if (cancellationToken.IsCancellationRequested)
494 {
495 return await Task.FromCanceled<double>(cancellationToken);
496 }
497
498 var d = await ReadI64Async(cancellationToken);
499 return BitConverter.Int64BitsToDouble(d);
500 }
501
Jens Geyer5a17b132019-05-26 15:53:37 +0200502 public override async ValueTask<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100503 {
504 if (cancellationToken.IsCancellationRequested)
505 {
506 return await Task.FromCanceled<byte[]>(cancellationToken);
507 }
508
509 var size = await ReadI32Async(cancellationToken);
510 var buf = new byte[size];
511 await Trans.ReadAllAsync(buf, 0, size, cancellationToken);
512 return buf;
513 }
514
Jens Geyer5a17b132019-05-26 15:53:37 +0200515 public override async ValueTask<string> ReadStringAsync(CancellationToken cancellationToken)
516 {
517 if (cancellationToken.IsCancellationRequested)
518 {
519 return await Task.FromCanceled<string>(cancellationToken);
520 }
521
522 var size = await ReadI32Async(cancellationToken);
523 return size > 0 ? await ReadStringBodyAsync(size, cancellationToken) : string.Empty;
524 }
525
526 private async ValueTask<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100527 {
528 if (cancellationToken.IsCancellationRequested)
529 {
530 await Task.FromCanceled<string>(cancellationToken);
531 }
532
Jens Geyer5a17b132019-05-26 15:53:37 +0200533 if (size <= PreAllocatedBuffer.Length)
534 {
535 await Trans.ReadAllAsync(PreAllocatedBuffer, 0, size, cancellationToken);
536 return Encoding.UTF8.GetString(PreAllocatedBuffer, 0, size);
537 }
538
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100539 var buf = new byte[size];
540 await Trans.ReadAllAsync(buf, 0, size, cancellationToken);
541 return Encoding.UTF8.GetString(buf, 0, buf.Length);
542 }
543
Jens Geyer421444f2019-03-20 22:13:25 +0100544 public class Factory : TProtocolFactory
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100545 {
546 protected bool StrictRead;
547 protected bool StrictWrite;
548
549 public Factory()
550 : this(false, true)
551 {
552 }
553
554 public Factory(bool strictRead, bool strictWrite)
555 {
556 StrictRead = strictRead;
557 StrictWrite = strictWrite;
558 }
559
Jens Geyer421444f2019-03-20 22:13:25 +0100560 public override TProtocol GetProtocol(TTransport trans)
Jens Geyeraa0c8b32019-01-28 23:27:45 +0100561 {
562 return new TBinaryProtocol(trans, StrictRead, StrictWrite);
563 }
564 }
565 }
Jens Geyer421444f2019-03-20 22:13:25 +0100566}