blob: 06f71b3a30dab50d7204ddaec1014eef12623477 [file] [log] [blame]
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +08001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
23 "bufio"
24 "bytes"
25 "compress/zlib"
26 "context"
27 "encoding/binary"
28 "errors"
29 "fmt"
30 "io"
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +080031)
32
33// Size in bytes for 32-bit ints.
34const size32 = 4
35
36type headerMeta struct {
37 MagicFlags uint32
38 SequenceID int32
39 HeaderLength uint16
40}
41
42const headerMetaSize = 10
43
44type clientType int
45
46const (
47 clientUnknown clientType = iota
48 clientHeaders
49 clientFramedBinary
50 clientUnframedBinary
51 clientFramedCompact
52 clientUnframedCompact
53)
54
55// Constants defined in THeader format:
56// https://github.com/apache/thrift/blob/master/doc/specs/HeaderFormat.md
57const (
58 THeaderHeaderMagic uint32 = 0x0fff0000
59 THeaderHeaderMask uint32 = 0xffff0000
60 THeaderFlagsMask uint32 = 0x0000ffff
61 THeaderMaxFrameSize uint32 = 0x3fffffff
62)
63
64// THeaderMap is the type of the header map in THeader transport.
65type THeaderMap map[string]string
66
67// THeaderProtocolID is the wrapped protocol id used in THeader.
68type THeaderProtocolID int32
69
70// Supported THeaderProtocolID values.
71const (
72 THeaderProtocolBinary THeaderProtocolID = 0x00
73 THeaderProtocolCompact THeaderProtocolID = 0x02
74 THeaderProtocolDefault = THeaderProtocolBinary
75)
76
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -070077// Declared globally to avoid repetitive allocations, not really used.
78var globalMemoryBuffer = NewTMemoryBuffer()
79
80// Validate checks whether the THeaderProtocolID is a valid/supported one.
81func (id THeaderProtocolID) Validate() error {
82 _, err := id.GetProtocol(globalMemoryBuffer)
83 return err
84}
85
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +080086// GetProtocol gets the corresponding TProtocol from the wrapped protocol id.
87func (id THeaderProtocolID) GetProtocol(trans TTransport) (TProtocol, error) {
88 switch id {
89 default:
90 return nil, NewTApplicationException(
91 INVALID_PROTOCOL,
92 fmt.Sprintf("THeader protocol id %d not supported", id),
93 )
94 case THeaderProtocolBinary:
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -070095 return NewTBinaryProtocolTransport(trans), nil
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +080096 case THeaderProtocolCompact:
97 return NewTCompactProtocol(trans), nil
98 }
99}
100
101// THeaderTransformID defines the numeric id of the transform used.
102type THeaderTransformID int32
103
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -0700104// THeaderTransformID values.
105//
106// Values not defined here are not currently supported, namely HMAC and Snappy.
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800107const (
108 TransformNone THeaderTransformID = iota // 0, no special handling
109 TransformZlib // 1, zlib
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800110)
111
112var supportedTransformIDs = map[THeaderTransformID]bool{
113 TransformNone: true,
114 TransformZlib: true,
115}
116
117// TransformReader is an io.ReadCloser that handles transforms reading.
118type TransformReader struct {
119 io.Reader
120
121 closers []io.Closer
122}
123
124var _ io.ReadCloser = (*TransformReader)(nil)
125
126// NewTransformReaderWithCapacity initializes a TransformReader with expected
127// closers capacity.
128//
129// If you don't know the closers capacity beforehand, just use
130//
Yuxuan 'fishy' Wangfa9af0a2024-05-09 17:58:43 -0700131// &TransformReader{Reader: baseReader}
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800132//
133// instead would be sufficient.
134func NewTransformReaderWithCapacity(baseReader io.Reader, capacity int) *TransformReader {
135 return &TransformReader{
136 Reader: baseReader,
137 closers: make([]io.Closer, 0, capacity),
138 }
139}
140
141// Close calls the underlying closers in appropriate order,
142// stops at and returns the first error encountered.
143func (tr *TransformReader) Close() error {
144 // Call closers in reversed order
145 for i := len(tr.closers) - 1; i >= 0; i-- {
146 if err := tr.closers[i].Close(); err != nil {
147 return err
148 }
149 }
150 return nil
151}
152
153// AddTransform adds a transform.
Yuxuan 'fishy' Wangb20f6752024-05-02 16:50:08 -0700154//
155// Deprecated: This only applies to the next message written, and the next read
156// message will cause write transforms to be reset from what's configured in
157// TConfiguration. For sticky transforms, use TConfiguration.THeaderTransforms
158// instead.
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800159func (tr *TransformReader) AddTransform(id THeaderTransformID) error {
160 switch id {
161 default:
162 return NewTApplicationException(
163 INVALID_TRANSFORM,
164 fmt.Sprintf("THeaderTransformID %d not supported", id),
165 )
166 case TransformNone:
167 // no-op
168 case TransformZlib:
Yuxuan 'fishy' Wang270696c2025-05-28 10:54:04 -0700169 readCloser, err := newZlibReader(tr.Reader)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800170 if err != nil {
171 return err
172 }
173 tr.Reader = readCloser
174 tr.closers = append(tr.closers, readCloser)
175 }
176 return nil
177}
178
179// TransformWriter is an io.WriteCloser that handles transforms writing.
180type TransformWriter struct {
181 io.Writer
182
183 closers []io.Closer
184}
185
186var _ io.WriteCloser = (*TransformWriter)(nil)
187
188// NewTransformWriter creates a new TransformWriter with base writer and transforms.
189func NewTransformWriter(baseWriter io.Writer, transforms []THeaderTransformID) (io.WriteCloser, error) {
190 writer := &TransformWriter{
191 Writer: baseWriter,
192 closers: make([]io.Closer, 0, len(transforms)),
193 }
194 for _, id := range transforms {
195 if err := writer.AddTransform(id); err != nil {
196 return nil, err
197 }
198 }
199 return writer, nil
200}
201
202// Close calls the underlying closers in appropriate order,
203// stops at and returns the first error encountered.
204func (tw *TransformWriter) Close() error {
205 // Call closers in reversed order
206 for i := len(tw.closers) - 1; i >= 0; i-- {
207 if err := tw.closers[i].Close(); err != nil {
208 return err
209 }
210 }
211 return nil
212}
213
214// AddTransform adds a transform.
215func (tw *TransformWriter) AddTransform(id THeaderTransformID) error {
216 switch id {
217 default:
218 return NewTApplicationException(
219 INVALID_TRANSFORM,
220 fmt.Sprintf("THeaderTransformID %d not supported", id),
221 )
222 case TransformNone:
223 // no-op
224 case TransformZlib:
Yuxuan 'fishy' Wang270696c2025-05-28 10:54:04 -0700225 writer, closer, err := newZlibWriterCloserLevel(tw.Writer, zlib.DefaultCompression)
226 if err != nil {
227 return err
228 }
229 tw.Writer = writer
230 tw.closers = append(tw.closers, closer)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800231 }
232 return nil
233}
234
235// THeaderInfoType is the type id of the info headers.
236type THeaderInfoType int32
237
238// Supported THeaderInfoType values.
239const (
240 _ THeaderInfoType = iota // Skip 0
241 InfoKeyValue // 1
242 // Rest of the info types are not supported.
243)
244
245// THeaderTransport is a Transport mode that implements THeader.
246//
247// Note that THeaderTransport handles frame and zlib by itself,
248// so the underlying transport should be a raw socket transports (TSocket or TSSLSocket),
249// instead of rich transports like TZlibTransport or TFramedTransport.
250type THeaderTransport struct {
251 SequenceID int32
252 Flags uint32
253
254 transport TTransport
255
256 // THeaderMap for read and write
257 readHeaders THeaderMap
258 writeHeaders THeaderMap
259
260 // Reading related variables.
261 reader *bufio.Reader
262 // When frame is detected, we read the frame fully into frameBuffer.
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800263 frameBuffer *bytes.Buffer
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800264 // When it's non-nil, Read should read from frameReader instead of
265 // reader, and EOF error indicates end of frame instead of end of all
266 // transport.
267 frameReader io.ReadCloser
268
269 // Writing related variables
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800270 writeBuffer *bytes.Buffer
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800271 writeTransforms []THeaderTransformID
272
273 clientType clientType
Yuxuan 'fishy' Wang8dd04f42021-01-22 15:41:41 -0800274 protocolID THeaderProtocolID
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800275 cfg *TConfiguration
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800276
277 // buffer is used in the following scenarios to avoid repetitive
278 // allocations, while 4 is big enough for all those scenarios:
279 //
280 // * header padding (max size 4)
281 // * write the frame size (size 4)
282 buffer [4]byte
283}
284
285var _ TTransport = (*THeaderTransport)(nil)
286
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800287// Deprecated: Use NewTHeaderTransportConf instead.
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800288func NewTHeaderTransport(trans TTransport) *THeaderTransport {
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800289 return NewTHeaderTransportConf(trans, &TConfiguration{
290 noPropagation: true,
291 })
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800292}
293
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800294// NewTHeaderTransportConf creates THeaderTransport from the
295// underlying transport, with given TConfiguration attached.
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -0700296//
297// If trans is already a *THeaderTransport, it will be returned as is,
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800298// but with TConfiguration overridden by the value passed in.
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -0700299//
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800300// The protocol ID in TConfiguration is only useful for client transports.
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -0700301// For servers,
302// the protocol ID will be overridden again to the one set by the client,
303// to ensure that servers always speak the same dialect as the client.
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800304func NewTHeaderTransportConf(trans TTransport, conf *TConfiguration) *THeaderTransport {
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -0700305 if ht, ok := trans.(*THeaderTransport); ok {
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800306 ht.SetTConfiguration(conf)
307 return ht
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -0700308 }
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800309 PropagateTConfiguration(trans, conf)
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -0700310 return &THeaderTransport{
Yuxuan 'fishy' Wangb20f6752024-05-02 16:50:08 -0700311 transport: trans,
312 reader: bufio.NewReader(trans),
313 writeHeaders: make(THeaderMap),
314 writeTransforms: conf.GetTHeaderTransforms(),
315 protocolID: conf.GetTHeaderProtocolID(),
316 cfg: conf,
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800317 }
Yuxuan 'fishy' Wanga2c44662020-09-21 12:33:26 -0700318}
319
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800320// Open calls the underlying transport's Open function.
321func (t *THeaderTransport) Open() error {
322 return t.transport.Open()
323}
324
325// IsOpen calls the underlying transport's IsOpen function.
326func (t *THeaderTransport) IsOpen() bool {
327 return t.transport.IsOpen()
328}
329
330// ReadFrame tries to read the frame header, guess the client type, and handle
331// unframed clients.
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700332func (t *THeaderTransport) ReadFrame(ctx context.Context) error {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800333 if !t.needReadFrame() {
334 // No need to read frame, skipping.
335 return nil
336 }
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700337
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800338 // Peek and handle the first 32 bits.
339 // They could either be the length field of a framed message,
340 // or the first bytes of an unframed message.
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700341 var buf []byte
342 var err error
343 // This is also usually the first read from a connection,
344 // so handle retries around socket timeouts.
345 _, deadlineSet := ctx.Deadline()
346 for {
347 buf, err = t.reader.Peek(size32)
348 if deadlineSet && isTimeoutError(err) && ctx.Err() == nil {
349 // This is I/O timeout and we still have time,
350 // continue trying
351 continue
352 }
353 // For anything else, do not retry
354 break
355 }
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800356 if err != nil {
357 return err
358 }
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700359
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800360 frameSize := binary.BigEndian.Uint32(buf)
361 if frameSize&VERSION_MASK == VERSION_1 {
362 t.clientType = clientUnframedBinary
363 return nil
364 }
365 if buf[0] == COMPACT_PROTOCOL_ID && buf[1]&COMPACT_VERSION_MASK == COMPACT_VERSION {
366 t.clientType = clientUnframedCompact
367 return nil
368 }
369
370 // At this point it should be a framed message,
371 // sanity check on frameSize then discard the peeked part.
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800372 if frameSize > THeaderMaxFrameSize || frameSize > uint32(t.cfg.GetMaxFrameSize()) {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800373 return NewTProtocolExceptionWithType(
374 SIZE_LIMIT,
375 errors.New("frame too large"),
376 )
377 }
378 t.reader.Discard(size32)
379
380 // Read the frame fully into frameBuffer.
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800381 if t.frameBuffer == nil {
Yuxuan 'fishy' Wangbdfde852022-08-08 22:12:40 -0700382 t.frameBuffer = bufPool.get()
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800383 }
384 _, err = io.CopyN(t.frameBuffer, t.reader, int64(frameSize))
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800385 if err != nil {
386 return err
387 }
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800388 t.frameReader = io.NopCloser(t.frameBuffer)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800389
390 // Peek and handle the next 32 bits.
391 buf = t.frameBuffer.Bytes()[:size32]
392 version := binary.BigEndian.Uint32(buf)
393 if version&THeaderHeaderMask == THeaderHeaderMagic {
394 t.clientType = clientHeaders
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700395 return t.parseHeaders(ctx, frameSize)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800396 }
397 if version&VERSION_MASK == VERSION_1 {
398 t.clientType = clientFramedBinary
399 return nil
400 }
401 if buf[0] == COMPACT_PROTOCOL_ID && buf[1]&COMPACT_VERSION_MASK == COMPACT_VERSION {
402 t.clientType = clientFramedCompact
403 return nil
404 }
405 if err := t.endOfFrame(); err != nil {
406 return err
407 }
408 return NewTProtocolExceptionWithType(
409 NOT_IMPLEMENTED,
410 errors.New("unsupported client transport type"),
411 )
412}
413
414// endOfFrame does end of frame handling.
415//
416// It closes frameReader, and also resets frame related states.
417func (t *THeaderTransport) endOfFrame() error {
418 defer func() {
Yuxuan 'fishy' Wangbdfde852022-08-08 22:12:40 -0700419 bufPool.put(&t.frameBuffer)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800420 t.frameReader = nil
421 }()
422 return t.frameReader.Close()
423}
424
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700425func (t *THeaderTransport) parseHeaders(ctx context.Context, frameSize uint32) error {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800426 if t.clientType != clientHeaders {
427 return nil
428 }
429
430 var err error
431 var meta headerMeta
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800432 if err = binary.Read(t.frameBuffer, binary.BigEndian, &meta); err != nil {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800433 return err
434 }
435 frameSize -= headerMetaSize
436 t.Flags = meta.MagicFlags & THeaderFlagsMask
437 t.SequenceID = meta.SequenceID
438 headerLength := int64(meta.HeaderLength) * 4
439 if int64(frameSize) < headerLength {
440 return NewTProtocolExceptionWithType(
441 SIZE_LIMIT,
442 errors.New("header size is larger than the whole frame"),
443 )
444 }
445 headerBuf := NewTMemoryBuffer()
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800446 _, err = io.CopyN(headerBuf, t.frameBuffer, headerLength)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800447 if err != nil {
448 return err
449 }
450 hp := NewTCompactProtocol(headerBuf)
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800451 hp.SetTConfiguration(t.cfg)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800452
453 // At this point the header is already read into headerBuf,
454 // and t.frameBuffer starts from the actual payload.
455 protoID, err := hp.readVarint32()
456 if err != nil {
457 return err
458 }
Yuxuan 'fishy' Wang8dd04f42021-01-22 15:41:41 -0800459 t.protocolID = THeaderProtocolID(protoID)
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800460
Yuxuan 'fishy' Wangb20f6752024-05-02 16:50:08 -0700461 // Reset writeTransforms to the ones from cfg, as we are going to add
462 // compression transforms from what we read, we don't want to accumulate
463 // different transforms read from different requests
464 t.writeTransforms = t.cfg.GetTHeaderTransforms()
465
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800466 var transformCount int32
467 transformCount, err = hp.readVarint32()
468 if err != nil {
469 return err
470 }
471 if transformCount > 0 {
472 reader := NewTransformReaderWithCapacity(
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800473 t.frameBuffer,
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800474 int(transformCount),
475 )
476 t.frameReader = reader
477 transformIDs := make([]THeaderTransformID, transformCount)
Yuxuan 'fishy' Wang91565d42024-08-14 09:01:15 -0700478 for i := range int(transformCount) {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800479 id, err := hp.readVarint32()
480 if err != nil {
481 return err
482 }
Yuxuan 'fishy' Wangb20f6752024-05-02 16:50:08 -0700483 tID := THeaderTransformID(id)
484 transformIDs[i] = tID
485
486 // For compression transforms, we should also add them
487 // to writeTransforms so that the response (assuming we
488 // are reading a request) would do the same compression.
489 switch tID {
490 case TransformZlib:
491 t.addWriteTransformsDedupe(tID)
492 }
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800493 }
494 // The transform IDs on the wire was added based on the order of
495 // writing, so on the reading side we need to reverse the order.
496 for i := transformCount - 1; i >= 0; i-- {
497 id := transformIDs[i]
498 if err := reader.AddTransform(id); err != nil {
499 return err
500 }
501 }
502 }
503
504 // The info part does not use the transforms yet, so it's
505 // important to continue using headerBuf.
506 headers := make(THeaderMap)
507 for {
508 infoType, err := hp.readVarint32()
Yuxuan 'fishy' Wange27e82c2021-01-19 11:07:58 -0800509 if errors.Is(err, io.EOF) {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800510 break
511 }
512 if err != nil {
513 return err
514 }
515 if THeaderInfoType(infoType) == InfoKeyValue {
516 count, err := hp.readVarint32()
517 if err != nil {
518 return err
519 }
Yuxuan 'fishy' Wang91565d42024-08-14 09:01:15 -0700520 for range int(count) {
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700521 key, err := hp.ReadString(ctx)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800522 if err != nil {
523 return err
524 }
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700525 value, err := hp.ReadString(ctx)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800526 if err != nil {
527 return err
528 }
529 headers[key] = value
530 }
531 } else {
532 // Skip reading info section on the first
533 // unsupported info type.
534 break
535 }
536 }
537 t.readHeaders = headers
538
539 return nil
540}
541
542func (t *THeaderTransport) needReadFrame() bool {
543 if t.clientType == clientUnknown {
544 // This is a new connection that's never read before.
545 return true
546 }
547 if t.isFramed() && t.frameReader == nil {
548 // We just finished the last frame.
549 return true
550 }
551 return false
552}
553
554func (t *THeaderTransport) Read(p []byte) (read int, err error) {
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700555 // Here using context.Background instead of a context passed in is safe.
556 // First is that there's no way to pass context into this function.
557 // Then, 99% of the case when calling this Read frame is already read
558 // into frameReader. ReadFrame here is more of preventing bugs that
559 // didn't call ReadFrame before calling Read.
560 err = t.ReadFrame(context.Background())
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800561 if err != nil {
562 return
563 }
564 if t.frameReader != nil {
565 read, err = t.frameReader.Read(p)
Yuxuan 'fishy' Wang64f419b2020-07-29 10:12:53 -0700566 if err == nil && t.frameBuffer.Len() <= 0 {
567 // the last Read finished the frame, do endOfFrame
568 // handling here.
569 err = t.endOfFrame()
Yuxuan 'fishy' Wangfa9af0a2024-05-09 17:58:43 -0700570 } else if errors.Is(err, io.EOF) {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800571 err = t.endOfFrame()
572 if err != nil {
573 return
574 }
Yuxuan 'fishy' Wange98ba9c2020-04-23 23:39:04 -0700575 if read == 0 {
576 // Try to read the next frame when we hit EOF
577 // (end of frame) immediately.
578 // When we got here, it means the last read
579 // finished the previous frame, but didn't
580 // do endOfFrame handling yet.
581 // We have to read the next frame here,
582 // as otherwise we would return 0 and nil,
583 // which is a case not handled well by most
584 // protocol implementations.
585 return t.Read(p)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800586 }
587 }
588 return
589 }
590 return t.reader.Read(p)
591}
592
593// Write writes data to the write buffer.
594//
595// You need to call Flush to actually write them to the transport.
596func (t *THeaderTransport) Write(p []byte) (int, error) {
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800597 if t.writeBuffer == nil {
Yuxuan 'fishy' Wangbdfde852022-08-08 22:12:40 -0700598 t.writeBuffer = bufPool.get()
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800599 }
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800600 return t.writeBuffer.Write(p)
601}
602
603// Flush writes the appropriate header and the write buffer to the underlying transport.
604func (t *THeaderTransport) Flush(ctx context.Context) error {
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800605 if t.writeBuffer == nil || t.writeBuffer.Len() == 0 {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800606 return nil
607 }
608
Yuxuan 'fishy' Wangbdfde852022-08-08 22:12:40 -0700609 defer bufPool.put(&t.writeBuffer)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800610
611 switch t.clientType {
612 default:
613 fallthrough
614 case clientUnknown:
615 t.clientType = clientHeaders
616 fallthrough
617 case clientHeaders:
618 headers := NewTMemoryBuffer()
619 hp := NewTCompactProtocol(headers)
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800620 hp.SetTConfiguration(t.cfg)
Yuxuan 'fishy' Wang8dd04f42021-01-22 15:41:41 -0800621 if _, err := hp.writeVarint32(int32(t.protocolID)); err != nil {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800622 return NewTTransportExceptionFromError(err)
623 }
624 if _, err := hp.writeVarint32(int32(len(t.writeTransforms))); err != nil {
625 return NewTTransportExceptionFromError(err)
626 }
627 for _, transform := range t.writeTransforms {
628 if _, err := hp.writeVarint32(int32(transform)); err != nil {
629 return NewTTransportExceptionFromError(err)
630 }
631 }
632 if len(t.writeHeaders) > 0 {
633 if _, err := hp.writeVarint32(int32(InfoKeyValue)); err != nil {
634 return NewTTransportExceptionFromError(err)
635 }
636 if _, err := hp.writeVarint32(int32(len(t.writeHeaders))); err != nil {
637 return NewTTransportExceptionFromError(err)
638 }
639 for key, value := range t.writeHeaders {
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700640 if err := hp.WriteString(ctx, key); err != nil {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800641 return NewTTransportExceptionFromError(err)
642 }
Yuxuan 'fishy' Wange79f7642020-06-12 22:22:35 -0700643 if err := hp.WriteString(ctx, value); err != nil {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800644 return NewTTransportExceptionFromError(err)
645 }
646 }
647 }
648 padding := 4 - headers.Len()%4
649 if padding < 4 {
650 buf := t.buffer[:padding]
651 for i := range buf {
652 buf[i] = 0
653 }
654 if _, err := headers.Write(buf); err != nil {
655 return NewTTransportExceptionFromError(err)
656 }
657 }
658
Yuxuan 'fishy' Wangbdfde852022-08-08 22:12:40 -0700659 payload := bufPool.get()
660 defer bufPool.put(&payload)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800661 meta := headerMeta{
662 MagicFlags: THeaderHeaderMagic + t.Flags&THeaderFlagsMask,
663 SequenceID: t.SequenceID,
664 HeaderLength: uint16(headers.Len() / 4),
665 }
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800666 if err := binary.Write(payload, binary.BigEndian, meta); err != nil {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800667 return NewTTransportExceptionFromError(err)
668 }
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800669 if _, err := io.Copy(payload, headers); err != nil {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800670 return NewTTransportExceptionFromError(err)
671 }
672
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800673 writer, err := NewTransformWriter(payload, t.writeTransforms)
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800674 if err != nil {
675 return NewTTransportExceptionFromError(err)
676 }
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800677 if _, err := io.Copy(writer, t.writeBuffer); err != nil {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800678 return NewTTransportExceptionFromError(err)
679 }
680 if err := writer.Close(); err != nil {
681 return NewTTransportExceptionFromError(err)
682 }
683
684 // First write frame length
685 buf := t.buffer[:size32]
686 binary.BigEndian.PutUint32(buf, uint32(payload.Len()))
687 if _, err := t.transport.Write(buf); err != nil {
688 return NewTTransportExceptionFromError(err)
689 }
690 // Then write the payload
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800691 if _, err := io.Copy(t.transport, payload); err != nil {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800692 return NewTTransportExceptionFromError(err)
693 }
694
695 case clientFramedBinary, clientFramedCompact:
696 buf := t.buffer[:size32]
697 binary.BigEndian.PutUint32(buf, uint32(t.writeBuffer.Len()))
698 if _, err := t.transport.Write(buf); err != nil {
699 return NewTTransportExceptionFromError(err)
700 }
701 fallthrough
702 case clientUnframedBinary, clientUnframedCompact:
Yuxuan 'fishy' Wangd582a862021-12-16 14:44:47 -0800703 if _, err := io.Copy(t.transport, t.writeBuffer); err != nil {
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800704 return NewTTransportExceptionFromError(err)
705 }
706 }
707
708 select {
709 default:
710 case <-ctx.Done():
711 return NewTTransportExceptionFromError(ctx.Err())
712 }
713
714 return t.transport.Flush(ctx)
715}
716
717// Close closes the transport, along with its underlying transport.
718func (t *THeaderTransport) Close() error {
719 if err := t.Flush(context.Background()); err != nil {
720 return err
721 }
722 return t.transport.Close()
723}
724
725// RemainingBytes calls underlying transport's RemainingBytes.
726//
727// Even in framed cases, because of all the possible compression transforms
728// involved, the remaining frame size is likely to be different from the actual
729// remaining readable bytes, so we don't bother to keep tracking the remaining
730// frame size by ourselves and just use the underlying transport's
731// RemainingBytes directly.
732func (t *THeaderTransport) RemainingBytes() uint64 {
733 return t.transport.RemainingBytes()
734}
735
736// GetReadHeaders returns the THeaderMap read from transport.
737func (t *THeaderTransport) GetReadHeaders() THeaderMap {
738 return t.readHeaders
739}
740
741// SetWriteHeader sets a header for write.
742func (t *THeaderTransport) SetWriteHeader(key, value string) {
743 t.writeHeaders[key] = value
744}
745
746// ClearWriteHeaders clears all write headers previously set.
747func (t *THeaderTransport) ClearWriteHeaders() {
748 t.writeHeaders = make(THeaderMap)
749}
750
751// AddTransform add a transform for writing.
Yuxuan 'fishy' Wangb20f6752024-05-02 16:50:08 -0700752//
753// NOTE: This is provided as a low-level API, but in general you should use
754// TConfiguration.THeaderTransforms to set transforms for writing instead.
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800755func (t *THeaderTransport) AddTransform(transform THeaderTransformID) error {
756 if !supportedTransformIDs[transform] {
757 return NewTProtocolExceptionWithType(
758 NOT_IMPLEMENTED,
759 fmt.Errorf("THeaderTransformID %d not supported", transform),
760 )
761 }
762 t.writeTransforms = append(t.writeTransforms, transform)
763 return nil
764}
765
766// Protocol returns the wrapped protocol id used in this THeaderTransport.
767func (t *THeaderTransport) Protocol() THeaderProtocolID {
768 switch t.clientType {
769 default:
Yuxuan 'fishy' Wang8dd04f42021-01-22 15:41:41 -0800770 return t.protocolID
Yuxuan 'fishy' Wang4d46c112019-06-07 20:47:18 +0800771 case clientFramedBinary, clientUnframedBinary:
772 return THeaderProtocolBinary
773 case clientFramedCompact, clientUnframedCompact:
774 return THeaderProtocolCompact
775 }
776}
777
778func (t *THeaderTransport) isFramed() bool {
779 switch t.clientType {
780 default:
781 return false
782 case clientHeaders, clientFramedBinary, clientFramedCompact:
783 return true
784 }
785}
Yuxuan 'fishy' Wangebb6b2e2019-07-24 08:42:06 -0700786
Yuxuan 'fishy' Wangb20f6752024-05-02 16:50:08 -0700787// addWriteTransformsDedupe adds id to writeTransforms only if it's not already
788// there.
789func (t *THeaderTransport) addWriteTransformsDedupe(id THeaderTransformID) {
790 for _, existingID := range t.writeTransforms {
791 if existingID == id {
792 return
793 }
794 }
795 t.writeTransforms = append(t.writeTransforms, id)
796}
797
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800798// SetTConfiguration implements TConfigurationSetter.
799func (t *THeaderTransport) SetTConfiguration(cfg *TConfiguration) {
800 PropagateTConfiguration(t.transport, cfg)
801 t.cfg = cfg
802}
803
Yuxuan 'fishy' Wangebb6b2e2019-07-24 08:42:06 -0700804// THeaderTransportFactory is a TTransportFactory implementation to create
805// THeaderTransport.
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800806//
807// It also implements TConfigurationSetter.
Yuxuan 'fishy' Wangebb6b2e2019-07-24 08:42:06 -0700808type THeaderTransportFactory struct {
809 // The underlying factory, could be nil.
810 Factory TTransportFactory
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800811
812 cfg *TConfiguration
Yuxuan 'fishy' Wangebb6b2e2019-07-24 08:42:06 -0700813}
814
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800815// Deprecated: Use NewTHeaderTransportFactoryConf instead.
Yuxuan 'fishy' Wangebb6b2e2019-07-24 08:42:06 -0700816func NewTHeaderTransportFactory(factory TTransportFactory) TTransportFactory {
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800817 return NewTHeaderTransportFactoryConf(factory, &TConfiguration{
818 noPropagation: true,
819 })
820}
821
822// NewTHeaderTransportFactoryConf creates a new *THeaderTransportFactory with
823// the given *TConfiguration.
824func NewTHeaderTransportFactoryConf(factory TTransportFactory, conf *TConfiguration) TTransportFactory {
Yuxuan 'fishy' Wangebb6b2e2019-07-24 08:42:06 -0700825 return &THeaderTransportFactory{
826 Factory: factory,
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800827
828 cfg: conf,
Yuxuan 'fishy' Wangebb6b2e2019-07-24 08:42:06 -0700829 }
830}
831
832// GetTransport implements TTransportFactory.
833func (f *THeaderTransportFactory) GetTransport(trans TTransport) (TTransport, error) {
834 if f.Factory != nil {
835 t, err := f.Factory.GetTransport(trans)
836 if err != nil {
837 return nil, err
838 }
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800839 return NewTHeaderTransportConf(t, f.cfg), nil
Yuxuan 'fishy' Wangebb6b2e2019-07-24 08:42:06 -0700840 }
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800841 return NewTHeaderTransportConf(trans, f.cfg), nil
Yuxuan 'fishy' Wangebb6b2e2019-07-24 08:42:06 -0700842}
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800843
844// SetTConfiguration implements TConfigurationSetter.
845func (f *THeaderTransportFactory) SetTConfiguration(cfg *TConfiguration) {
846 PropagateTConfiguration(f.Factory, f.cfg)
847 f.cfg = cfg
848}
849
850var (
851 _ TConfigurationSetter = (*THeaderTransportFactory)(nil)
852 _ TConfigurationSetter = (*THeaderTransport)(nil)
853)