blob: 37bca8018b135249d11e6bb0fbd51ebfce100f36 [file] [log] [blame]
Jens Geyeraa0c8b32019-01-28 23:27:45 +01001// Licensed to the Apache Software Foundation(ASF) under one
2// or more contributor license agreements.See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18using System;
19using System.Text;
20using System.Threading;
21using System.Threading.Tasks;
22using Thrift.Protocol.Entities;
23using Thrift.Transport;
24
25namespace Thrift.Protocol
26{
27 // ReSharper disable once InconsistentNaming
28 public class TBinaryProtocol : TProtocol
29 {
30 //TODO: Unit tests
31 //TODO: Localization
32 //TODO: pragma
33
34 protected const uint VersionMask = 0xffff0000;
35 protected const uint Version1 = 0x80010000;
36
37 protected bool StrictRead;
38 protected bool StrictWrite;
39
40 public TBinaryProtocol(TTransport trans)
41 : this(trans, false, true)
42 {
43 }
44
45 public TBinaryProtocol(TTransport trans, bool strictRead, bool strictWrite)
46 : base(trans)
47 {
48 StrictRead = strictRead;
49 StrictWrite = strictWrite;
50 }
51
52 public override async Task WriteMessageBeginAsync(TMessage message, CancellationToken cancellationToken)
53 {
54 if (cancellationToken.IsCancellationRequested)
55 {
56 return;
57 }
58
59 if (StrictWrite)
60 {
61 var version = Version1 | (uint) message.Type;
62 await WriteI32Async((int) version, cancellationToken);
63 await WriteStringAsync(message.Name, cancellationToken);
64 await WriteI32Async(message.SeqID, cancellationToken);
65 }
66 else
67 {
68 await WriteStringAsync(message.Name, cancellationToken);
69 await WriteByteAsync((sbyte) message.Type, cancellationToken);
70 await WriteI32Async(message.SeqID, cancellationToken);
71 }
72 }
73
74 public override async Task WriteMessageEndAsync(CancellationToken cancellationToken)
75 {
76 if (cancellationToken.IsCancellationRequested)
77 {
78 await Task.FromCanceled(cancellationToken);
79 }
80 }
81
82 public override async Task WriteStructBeginAsync(TStruct @struct, CancellationToken cancellationToken)
83 {
84 if (cancellationToken.IsCancellationRequested)
85 {
86 await Task.FromCanceled(cancellationToken);
87 }
88 }
89
90 public override async Task WriteStructEndAsync(CancellationToken cancellationToken)
91 {
92 if (cancellationToken.IsCancellationRequested)
93 {
94 await Task.FromCanceled(cancellationToken);
95 }
96 }
97
98 public override async Task WriteFieldBeginAsync(TField field, CancellationToken cancellationToken)
99 {
100 if (cancellationToken.IsCancellationRequested)
101 {
102 return;
103 }
104
105 await WriteByteAsync((sbyte) field.Type, cancellationToken);
106 await WriteI16Async(field.ID, cancellationToken);
107 }
108
109 public override async Task WriteFieldEndAsync(CancellationToken cancellationToken)
110 {
111 if (cancellationToken.IsCancellationRequested)
112 {
113 await Task.FromCanceled(cancellationToken);
114 }
115 }
116
117 public override async Task WriteFieldStopAsync(CancellationToken cancellationToken)
118 {
119 if (cancellationToken.IsCancellationRequested)
120 {
121 return;
122 }
123
124 await WriteByteAsync((sbyte) TType.Stop, cancellationToken);
125 }
126
127 public override async Task WriteMapBeginAsync(TMap map, CancellationToken cancellationToken)
128 {
129 if (cancellationToken.IsCancellationRequested)
130 {
131 return;
132 }
133
134 await WriteByteAsync((sbyte) map.KeyType, cancellationToken);
135 await WriteByteAsync((sbyte) map.ValueType, cancellationToken);
136 await WriteI32Async(map.Count, cancellationToken);
137 }
138
139 public override async Task WriteMapEndAsync(CancellationToken cancellationToken)
140 {
141 if (cancellationToken.IsCancellationRequested)
142 {
143 await Task.FromCanceled(cancellationToken);
144 }
145 }
146
147 public override async Task WriteListBeginAsync(TList list, CancellationToken cancellationToken)
148 {
149 if (cancellationToken.IsCancellationRequested)
150 {
151 return;
152 }
153
154 await WriteByteAsync((sbyte) list.ElementType, cancellationToken);
155 await WriteI32Async(list.Count, cancellationToken);
156 }
157
158 public override async Task WriteListEndAsync(CancellationToken cancellationToken)
159 {
160 if (cancellationToken.IsCancellationRequested)
161 {
162 await Task.FromCanceled(cancellationToken);
163 }
164 }
165
166 public override async Task WriteSetBeginAsync(TSet set, CancellationToken cancellationToken)
167 {
168 if (cancellationToken.IsCancellationRequested)
169 {
170 return;
171 }
172
173 await WriteByteAsync((sbyte) set.ElementType, cancellationToken);
174 await WriteI32Async(set.Count, cancellationToken);
175 }
176
177 public override async Task WriteSetEndAsync(CancellationToken cancellationToken)
178 {
179 if (cancellationToken.IsCancellationRequested)
180 {
181 await Task.FromCanceled(cancellationToken);
182 }
183 }
184
185 public override async Task WriteBoolAsync(bool b, CancellationToken cancellationToken)
186 {
187 if (cancellationToken.IsCancellationRequested)
188 {
189 return;
190 }
191
192 await WriteByteAsync(b ? (sbyte) 1 : (sbyte) 0, cancellationToken);
193 }
194
195 protected internal static byte[] CreateWriteByte(sbyte b)
196 {
197 var bout = new byte[1];
198
199 bout[0] = (byte) b;
200
201 return bout;
202 }
203
204 public override async Task WriteByteAsync(sbyte b, CancellationToken cancellationToken)
205 {
206 if (cancellationToken.IsCancellationRequested)
207 {
208 return;
209 }
210
211 var bout = CreateWriteByte(b);
212 await Trans.WriteAsync(bout, 0, 1, cancellationToken);
213 }
214
215 protected internal static byte[] CreateWriteI16(short s)
216 {
217 var i16Out = new byte[2];
218
219 i16Out[0] = (byte) (0xff & (s >> 8));
220 i16Out[1] = (byte) (0xff & s);
221
222 return i16Out;
223 }
224
225 public override async Task WriteI16Async(short i16, CancellationToken cancellationToken)
226 {
227 if (cancellationToken.IsCancellationRequested)
228 {
229 return;
230 }
231
232 var i16Out = CreateWriteI16(i16);
233 await Trans.WriteAsync(i16Out, 0, 2, cancellationToken);
234 }
235
236 protected internal static byte[] CreateWriteI32(int i32)
237 {
238 var i32Out = new byte[4];
239
240 i32Out[0] = (byte) (0xff & (i32 >> 24));
241 i32Out[1] = (byte) (0xff & (i32 >> 16));
242 i32Out[2] = (byte) (0xff & (i32 >> 8));
243 i32Out[3] = (byte) (0xff & i32);
244
245 return i32Out;
246 }
247
248 public override async Task WriteI32Async(int i32, CancellationToken cancellationToken)
249 {
250 if (cancellationToken.IsCancellationRequested)
251 {
252 return;
253 }
254
255 var i32Out = CreateWriteI32(i32);
256 await Trans.WriteAsync(i32Out, 0, 4, cancellationToken);
257 }
258
259 protected internal static byte[] CreateWriteI64(long i64)
260 {
261 var i64Out = new byte[8];
262
263 i64Out[0] = (byte) (0xff & (i64 >> 56));
264 i64Out[1] = (byte) (0xff & (i64 >> 48));
265 i64Out[2] = (byte) (0xff & (i64 >> 40));
266 i64Out[3] = (byte) (0xff & (i64 >> 32));
267 i64Out[4] = (byte) (0xff & (i64 >> 24));
268 i64Out[5] = (byte) (0xff & (i64 >> 16));
269 i64Out[6] = (byte) (0xff & (i64 >> 8));
270 i64Out[7] = (byte) (0xff & i64);
271
272 return i64Out;
273 }
274
275 public override async Task WriteI64Async(long i64, CancellationToken cancellationToken)
276 {
277 if (cancellationToken.IsCancellationRequested)
278 {
279 return;
280 }
281
282 var i64Out = CreateWriteI64(i64);
283 await Trans.WriteAsync(i64Out, 0, 8, cancellationToken);
284 }
285
286 public override async Task WriteDoubleAsync(double d, CancellationToken cancellationToken)
287 {
288 if (cancellationToken.IsCancellationRequested)
289 {
290 return;
291 }
292
293 await WriteI64Async(BitConverter.DoubleToInt64Bits(d), cancellationToken);
294 }
295
296 public override async Task WriteBinaryAsync(byte[] bytes, CancellationToken cancellationToken)
297 {
298 if (cancellationToken.IsCancellationRequested)
299 {
300 return;
301 }
302
303 await WriteI32Async(bytes.Length, cancellationToken);
304 await Trans.WriteAsync(bytes, 0, bytes.Length, cancellationToken);
305 }
306
307 public override async Task<TMessage> ReadMessageBeginAsync(CancellationToken cancellationToken)
308 {
309 if (cancellationToken.IsCancellationRequested)
310 {
311 return await Task.FromCanceled<TMessage>(cancellationToken);
312 }
313
314 var message = new TMessage();
315 var size = await ReadI32Async(cancellationToken);
316 if (size < 0)
317 {
318 var version = (uint) size & VersionMask;
319 if (version != Version1)
320 {
321 throw new TProtocolException(TProtocolException.BAD_VERSION,
322 $"Bad version in ReadMessageBegin: {version}");
323 }
324 message.Type = (TMessageType) (size & 0x000000ff);
325 message.Name = await ReadStringAsync(cancellationToken);
326 message.SeqID = await ReadI32Async(cancellationToken);
327 }
328 else
329 {
330 if (StrictRead)
331 {
332 throw new TProtocolException(TProtocolException.BAD_VERSION,
333 "Missing version in ReadMessageBegin, old client?");
334 }
335 message.Name = await ReadStringBodyAsync(size, cancellationToken);
336 message.Type = (TMessageType) await ReadByteAsync(cancellationToken);
337 message.SeqID = await ReadI32Async(cancellationToken);
338 }
339 return message;
340 }
341
342 public override async Task ReadMessageEndAsync(CancellationToken cancellationToken)
343 {
344 if (cancellationToken.IsCancellationRequested)
345 {
346 await Task.FromCanceled(cancellationToken);
347 }
348 }
349
350 public override async Task<TStruct> ReadStructBeginAsync(CancellationToken cancellationToken)
351 {
352 if (cancellationToken.IsCancellationRequested)
353 {
354 await Task.FromCanceled(cancellationToken);
355 }
356
357 //TODO: no read from internal transport?
358 return new TStruct();
359 }
360
361 public override async Task ReadStructEndAsync(CancellationToken cancellationToken)
362 {
363 if (cancellationToken.IsCancellationRequested)
364 {
365 await Task.FromCanceled(cancellationToken);
366 }
367 }
368
369 public override async Task<TField> ReadFieldBeginAsync(CancellationToken cancellationToken)
370 {
371 if (cancellationToken.IsCancellationRequested)
372 {
373 return await Task.FromCanceled<TField>(cancellationToken);
374 }
375
376 var field = new TField
377 {
378 Type = (TType) await ReadByteAsync(cancellationToken)
379 };
380
381 if (field.Type != TType.Stop)
382 {
383 field.ID = await ReadI16Async(cancellationToken);
384 }
385
386 return field;
387 }
388
389 public override async Task ReadFieldEndAsync(CancellationToken cancellationToken)
390 {
391 if (cancellationToken.IsCancellationRequested)
392 {
393 await Task.FromCanceled(cancellationToken);
394 }
395 }
396
397 public override async Task<TMap> ReadMapBeginAsync(CancellationToken cancellationToken)
398 {
399 if (cancellationToken.IsCancellationRequested)
400 {
401 return await Task.FromCanceled<TMap>(cancellationToken);
402 }
403
404 var map = new TMap
405 {
406 KeyType = (TType) await ReadByteAsync(cancellationToken),
407 ValueType = (TType) await ReadByteAsync(cancellationToken),
408 Count = await ReadI32Async(cancellationToken)
409 };
410
411 return map;
412 }
413
414 public override async Task ReadMapEndAsync(CancellationToken cancellationToken)
415 {
416 if (cancellationToken.IsCancellationRequested)
417 {
418 await Task.FromCanceled(cancellationToken);
419 }
420 }
421
422 public override async Task<TList> ReadListBeginAsync(CancellationToken cancellationToken)
423 {
424 if (cancellationToken.IsCancellationRequested)
425 {
426 return await Task.FromCanceled<TList>(cancellationToken);
427 }
428
429 var list = new TList
430 {
431 ElementType = (TType) await ReadByteAsync(cancellationToken),
432 Count = await ReadI32Async(cancellationToken)
433 };
434
435 return list;
436 }
437
438 public override async Task ReadListEndAsync(CancellationToken cancellationToken)
439 {
440 if (cancellationToken.IsCancellationRequested)
441 {
442 await Task.FromCanceled(cancellationToken);
443 }
444 }
445
446 public override async Task<TSet> ReadSetBeginAsync(CancellationToken cancellationToken)
447 {
448 if (cancellationToken.IsCancellationRequested)
449 {
450 return await Task.FromCanceled<TSet>(cancellationToken);
451 }
452
453 var set = new TSet
454 {
455 ElementType = (TType) await ReadByteAsync(cancellationToken),
456 Count = await ReadI32Async(cancellationToken)
457 };
458
459 return set;
460 }
461
462 public override async Task ReadSetEndAsync(CancellationToken cancellationToken)
463 {
464 if (cancellationToken.IsCancellationRequested)
465 {
466 await Task.FromCanceled(cancellationToken);
467 }
468 }
469
470 public override async Task<bool> ReadBoolAsync(CancellationToken cancellationToken)
471 {
472 if (cancellationToken.IsCancellationRequested)
473 {
474 return await Task.FromCanceled<bool>(cancellationToken);
475 }
476
477 return await ReadByteAsync(cancellationToken) == 1;
478 }
479
480 public override async Task<sbyte> ReadByteAsync(CancellationToken cancellationToken)
481 {
482 if (cancellationToken.IsCancellationRequested)
483 {
484 return await Task.FromCanceled<sbyte>(cancellationToken);
485 }
486
487 var bin = new byte[1];
488 await Trans.ReadAllAsync(bin, 0, 1, cancellationToken); //TODO: why readall ?
489 return (sbyte) bin[0];
490 }
491
492 public override async Task<short> ReadI16Async(CancellationToken cancellationToken)
493 {
494 if (cancellationToken.IsCancellationRequested)
495 {
496 return await Task.FromCanceled<short>(cancellationToken);
497 }
498
499 var i16In = new byte[2];
500 await Trans.ReadAllAsync(i16In, 0, 2, cancellationToken);
501 var result = (short) (((i16In[0] & 0xff) << 8) | i16In[1] & 0xff);
502 return result;
503 }
504
505 public override async Task<int> ReadI32Async(CancellationToken cancellationToken)
506 {
507 if (cancellationToken.IsCancellationRequested)
508 {
509 return await Task.FromCanceled<int>(cancellationToken);
510 }
511
512 var i32In = new byte[4];
513 await Trans.ReadAllAsync(i32In, 0, 4, cancellationToken);
514
515 var result =
516 ((i32In[0] & 0xff) << 24) |
517 ((i32In[1] & 0xff) << 16) |
518 ((i32In[2] & 0xff) << 8) |
519 i32In[3] & 0xff;
520
521 return result;
522 }
523
524#pragma warning disable 675
525
526 protected internal long CreateReadI64(byte[] buf)
527 {
528 var result =
529 ((long) (buf[0] & 0xff) << 56) |
530 ((long) (buf[1] & 0xff) << 48) |
531 ((long) (buf[2] & 0xff) << 40) |
532 ((long) (buf[3] & 0xff) << 32) |
533 ((long) (buf[4] & 0xff) << 24) |
534 ((long) (buf[5] & 0xff) << 16) |
535 ((long) (buf[6] & 0xff) << 8) |
536 buf[7] & 0xff;
537
538 return result;
539 }
540
541#pragma warning restore 675
542
543 public override async Task<long> ReadI64Async(CancellationToken cancellationToken)
544 {
545 if (cancellationToken.IsCancellationRequested)
546 {
547 return await Task.FromCanceled<long>(cancellationToken);
548 }
549
550 var i64In = new byte[8];
551 await Trans.ReadAllAsync(i64In, 0, 8, cancellationToken);
552 return CreateReadI64(i64In);
553 }
554
555 public override async Task<double> ReadDoubleAsync(CancellationToken cancellationToken)
556 {
557 if (cancellationToken.IsCancellationRequested)
558 {
559 return await Task.FromCanceled<double>(cancellationToken);
560 }
561
562 var d = await ReadI64Async(cancellationToken);
563 return BitConverter.Int64BitsToDouble(d);
564 }
565
566 public override async Task<byte[]> ReadBinaryAsync(CancellationToken cancellationToken)
567 {
568 if (cancellationToken.IsCancellationRequested)
569 {
570 return await Task.FromCanceled<byte[]>(cancellationToken);
571 }
572
573 var size = await ReadI32Async(cancellationToken);
574 var buf = new byte[size];
575 await Trans.ReadAllAsync(buf, 0, size, cancellationToken);
576 return buf;
577 }
578
579 private async Task<string> ReadStringBodyAsync(int size, CancellationToken cancellationToken)
580 {
581 if (cancellationToken.IsCancellationRequested)
582 {
583 await Task.FromCanceled<string>(cancellationToken);
584 }
585
586 var buf = new byte[size];
587 await Trans.ReadAllAsync(buf, 0, size, cancellationToken);
588 return Encoding.UTF8.GetString(buf, 0, buf.Length);
589 }
590
591 public class Factory : ITProtocolFactory
592 {
593 protected bool StrictRead;
594 protected bool StrictWrite;
595
596 public Factory()
597 : this(false, true)
598 {
599 }
600
601 public Factory(bool strictRead, bool strictWrite)
602 {
603 StrictRead = strictRead;
604 StrictWrite = strictWrite;
605 }
606
607 public TProtocol GetProtocol(TTransport trans)
608 {
609 return new TBinaryProtocol(trans, StrictRead, StrictWrite);
610 }
611 }
612 }
613}