blob: f683e7f544bd43dc53d229f28a01c4a3102440e5 [file] [log] [blame]
Jens Geyer0e87c462013-06-18 22:25:07 +02001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
Jens Geyer09972502014-05-02 01:30:13 +020023 "bufio"
Jens Geyer0e87c462013-06-18 22:25:07 +020024 "bytes"
John Boiles57852792018-01-05 14:37:05 -080025 "context"
Jens Geyer0e87c462013-06-18 22:25:07 +020026 "encoding/binary"
Jens Geyer09972502014-05-02 01:30:13 +020027 "fmt"
Jens Geyer0e87c462013-06-18 22:25:07 +020028 "io"
29)
30
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080031// Deprecated: Use DEFAULT_MAX_FRAME_SIZE instead.
Jens Geyer09972502014-05-02 01:30:13 +020032const DEFAULT_MAX_LENGTH = 16384000
33
Jens Geyer0e87c462013-06-18 22:25:07 +020034type TFramedTransport struct {
Jens Geyer09972502014-05-02 01:30:13 +020035 transport TTransport
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080036
37 cfg *TConfiguration
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -070038
39 writeBuf bytes.Buffer
40
41 reader *bufio.Reader
42 readBuf bytes.Buffer
43
44 buffer [4]byte
Jens Geyer0e87c462013-06-18 22:25:07 +020045}
46
47type tFramedTransportFactory struct {
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080048 factory TTransportFactory
49 cfg *TConfiguration
Jens Geyer0e87c462013-06-18 22:25:07 +020050}
51
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080052// Deprecated: Use NewTFramedTransportFactoryConf instead.
Jens Geyer0e87c462013-06-18 22:25:07 +020053func NewTFramedTransportFactory(factory TTransportFactory) TTransportFactory {
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080054 return NewTFramedTransportFactoryConf(factory, &TConfiguration{
55 MaxFrameSize: DEFAULT_MAX_LENGTH,
56
57 noPropagation: true,
58 })
Jens Geyer0e87c462013-06-18 22:25:07 +020059}
60
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080061// Deprecated: Use NewTFramedTransportFactoryConf instead.
Jens Geyerca8469e2015-07-26 01:25:23 +020062func NewTFramedTransportFactoryMaxLength(factory TTransportFactory, maxLength uint32) TTransportFactory {
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080063 return NewTFramedTransportFactoryConf(factory, &TConfiguration{
64 MaxFrameSize: int32(maxLength),
65
66 noPropagation: true,
67 })
68}
69
70func NewTFramedTransportFactoryConf(factory TTransportFactory, conf *TConfiguration) TTransportFactory {
71 PropagateTConfiguration(factory, conf)
72 return &tFramedTransportFactory{
73 factory: factory,
74 cfg: conf,
75 }
Jens Geyer417b6312015-06-22 22:21:27 +020076}
77
D. Can Celasun8da0e722017-06-02 14:33:32 +020078func (p *tFramedTransportFactory) GetTransport(base TTransport) (TTransport, error) {
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080079 PropagateTConfiguration(base, p.cfg)
D. Can Celasun8da0e722017-06-02 14:33:32 +020080 tt, err := p.factory.GetTransport(base)
81 if err != nil {
82 return nil, err
83 }
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080084 return NewTFramedTransportConf(tt, p.cfg), nil
Jens Geyer0e87c462013-06-18 22:25:07 +020085}
86
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080087func (p *tFramedTransportFactory) SetTConfiguration(cfg *TConfiguration) {
88 PropagateTConfiguration(p.factory, cfg)
89 p.cfg = cfg
90}
91
92// Deprecated: Use NewTFramedTransportConf instead.
Jens Geyer0e87c462013-06-18 22:25:07 +020093func NewTFramedTransport(transport TTransport) *TFramedTransport {
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -080094 return NewTFramedTransportConf(transport, &TConfiguration{
95 MaxFrameSize: DEFAULT_MAX_LENGTH,
96
97 noPropagation: true,
98 })
Jens Geyer09972502014-05-02 01:30:13 +020099}
100
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800101// Deprecated: Use NewTFramedTransportConf instead.
Jens Geyerca8469e2015-07-26 01:25:23 +0200102func NewTFramedTransportMaxLength(transport TTransport, maxLength uint32) *TFramedTransport {
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800103 return NewTFramedTransportConf(transport, &TConfiguration{
104 MaxFrameSize: int32(maxLength),
105
106 noPropagation: true,
107 })
108}
109
110func NewTFramedTransportConf(transport TTransport, conf *TConfiguration) *TFramedTransport {
111 PropagateTConfiguration(transport, conf)
112 return &TFramedTransport{
113 transport: transport,
114 reader: bufio.NewReader(transport),
115 cfg: conf,
116 }
Jens Geyer0e87c462013-06-18 22:25:07 +0200117}
118
119func (p *TFramedTransport) Open() error {
120 return p.transport.Open()
121}
122
123func (p *TFramedTransport) IsOpen() bool {
124 return p.transport.IsOpen()
125}
126
Jens Geyer0e87c462013-06-18 22:25:07 +0200127func (p *TFramedTransport) Close() error {
128 return p.transport.Close()
129}
130
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700131func (p *TFramedTransport) Read(buf []byte) (read int, err error) {
132 read, err = p.readBuf.Read(buf)
133 if err != io.EOF {
134 return
Jens Geyer0e87c462013-06-18 22:25:07 +0200135 }
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700136
137 // For bytes.Buffer.Read, EOF would only happen when read is zero,
138 // but still, do a sanity check,
139 // in case that behavior is changed in a future version of go stdlib.
140 // When that happens, just return nil error,
141 // and let the caller call Read again to read the next frame.
142 if read > 0 {
143 return read, nil
Jens Geyer09972502014-05-02 01:30:13 +0200144 }
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700145
146 // Reaching here means that the last Read finished the last frame,
147 // so we need to read the next frame into readBuf now.
148 if err = p.readFrame(); err != nil {
149 return read, err
Jens Geyer09972502014-05-02 01:30:13 +0200150 }
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700151 newRead, err := p.Read(buf[read:])
152 return read + newRead, err
Jens Geyer0e87c462013-06-18 22:25:07 +0200153}
154
Jens Geyer09972502014-05-02 01:30:13 +0200155func (p *TFramedTransport) ReadByte() (c byte, err error) {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700156 buf := p.buffer[:1]
157 _, err = p.Read(buf)
158 if err != nil {
159 return
Jens Geyer09972502014-05-02 01:30:13 +0200160 }
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700161 c = buf[0]
Jens Geyer09972502014-05-02 01:30:13 +0200162 return
163}
164
Jens Geyer0e87c462013-06-18 22:25:07 +0200165func (p *TFramedTransport) Write(buf []byte) (int, error) {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700166 n, err := p.writeBuf.Write(buf)
Jens Geyer0e87c462013-06-18 22:25:07 +0200167 return n, NewTTransportExceptionFromError(err)
168}
169
Jens Geyer09972502014-05-02 01:30:13 +0200170func (p *TFramedTransport) WriteByte(c byte) error {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700171 return p.writeBuf.WriteByte(c)
Jens Geyer09972502014-05-02 01:30:13 +0200172}
173
174func (p *TFramedTransport) WriteString(s string) (n int, err error) {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700175 return p.writeBuf.WriteString(s)
Jens Geyer09972502014-05-02 01:30:13 +0200176}
177
John Boiles57852792018-01-05 14:37:05 -0800178func (p *TFramedTransport) Flush(ctx context.Context) error {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700179 size := p.writeBuf.Len()
Jens Geyer09972502014-05-02 01:30:13 +0200180 buf := p.buffer[:4]
Jens Geyer0e87c462013-06-18 22:25:07 +0200181 binary.BigEndian.PutUint32(buf, uint32(size))
182 _, err := p.transport.Write(buf)
183 if err != nil {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700184 p.writeBuf.Reset()
Jens Geyer0e87c462013-06-18 22:25:07 +0200185 return NewTTransportExceptionFromError(err)
186 }
187 if size > 0 {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700188 if _, err := io.Copy(p.transport, &p.writeBuf); err != nil {
189 p.writeBuf.Reset()
Jens Geyer0e87c462013-06-18 22:25:07 +0200190 return NewTTransportExceptionFromError(err)
191 }
192 }
John Boiles57852792018-01-05 14:37:05 -0800193 err = p.transport.Flush(ctx)
Jens Geyer0e87c462013-06-18 22:25:07 +0200194 return NewTTransportExceptionFromError(err)
195}
196
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700197func (p *TFramedTransport) readFrame() error {
Jens Geyer09972502014-05-02 01:30:13 +0200198 buf := p.buffer[:4]
199 if _, err := io.ReadFull(p.reader, buf); err != nil {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700200 return err
Jens Geyer0e87c462013-06-18 22:25:07 +0200201 }
Jens Geyerca8469e2015-07-26 01:25:23 +0200202 size := binary.BigEndian.Uint32(buf)
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800203 if size < 0 || size > uint32(p.cfg.GetMaxFrameSize()) {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700204 return NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, fmt.Sprintf("Incorrect frame size (%d)", size))
Jens Geyer0e87c462013-06-18 22:25:07 +0200205 }
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700206 _, err := io.CopyN(&p.readBuf, p.reader, int64(size))
207 return NewTTransportExceptionFromError(err)
Jens Geyer0e87c462013-06-18 22:25:07 +0200208}
Jens Geyerca8469e2015-07-26 01:25:23 +0200209
210func (p *TFramedTransport) RemainingBytes() (num_bytes uint64) {
Yuxuan 'fishy' Wangc9890cb2020-06-08 04:32:21 -0700211 return uint64(p.readBuf.Len())
Jens Geyerca8469e2015-07-26 01:25:23 +0200212}
Yuxuan 'fishy' Wangc4d1c0d2020-12-16 17:10:48 -0800213
214// SetTConfiguration implements TConfigurationSetter.
215func (p *TFramedTransport) SetTConfiguration(cfg *TConfiguration) {
216 PropagateTConfiguration(p.transport, cfg)
217 p.cfg = cfg
218}
219
220var (
221 _ TConfigurationSetter = (*tFramedTransportFactory)(nil)
222 _ TConfigurationSetter = (*TFramedTransport)(nil)
223)