| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 1 | /* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | */ |
| 19 | |
| 20 | package thrift |
| 21 | |
| 22 | import ( |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 23 | "bufio" |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 24 | "bytes" |
| John Boiles | 5785279 | 2018-01-05 14:37:05 -0800 | [diff] [blame] | 25 | "context" |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 26 | "encoding/binary" |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 27 | "fmt" |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 28 | "io" |
| 29 | ) |
| 30 | |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 31 | // Deprecated: Use DEFAULT_MAX_FRAME_SIZE instead. |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 32 | const DEFAULT_MAX_LENGTH = 16384000 |
| 33 | |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 34 | type TFramedTransport struct { |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 35 | transport TTransport |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 36 | |
| 37 | cfg *TConfiguration |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 38 | |
| 39 | writeBuf bytes.Buffer |
| 40 | |
| 41 | reader *bufio.Reader |
| 42 | readBuf bytes.Buffer |
| 43 | |
| 44 | buffer [4]byte |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 45 | } |
| 46 | |
| 47 | type tFramedTransportFactory struct { |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 48 | factory TTransportFactory |
| 49 | cfg *TConfiguration |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 50 | } |
| 51 | |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 52 | // Deprecated: Use NewTFramedTransportFactoryConf instead. |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 53 | func NewTFramedTransportFactory(factory TTransportFactory) TTransportFactory { |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 54 | return NewTFramedTransportFactoryConf(factory, &TConfiguration{ |
| 55 | MaxFrameSize: DEFAULT_MAX_LENGTH, |
| 56 | |
| 57 | noPropagation: true, |
| 58 | }) |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 59 | } |
| 60 | |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 61 | // Deprecated: Use NewTFramedTransportFactoryConf instead. |
| Jens Geyer | ca8469e | 2015-07-26 01:25:23 +0200 | [diff] [blame] | 62 | func NewTFramedTransportFactoryMaxLength(factory TTransportFactory, maxLength uint32) TTransportFactory { |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 63 | return NewTFramedTransportFactoryConf(factory, &TConfiguration{ |
| 64 | MaxFrameSize: int32(maxLength), |
| 65 | |
| 66 | noPropagation: true, |
| 67 | }) |
| 68 | } |
| 69 | |
| 70 | func NewTFramedTransportFactoryConf(factory TTransportFactory, conf *TConfiguration) TTransportFactory { |
| 71 | PropagateTConfiguration(factory, conf) |
| 72 | return &tFramedTransportFactory{ |
| 73 | factory: factory, |
| 74 | cfg: conf, |
| 75 | } |
| Jens Geyer | 417b631 | 2015-06-22 22:21:27 +0200 | [diff] [blame] | 76 | } |
| 77 | |
| D. Can Celasun | 8da0e72 | 2017-06-02 14:33:32 +0200 | [diff] [blame] | 78 | func (p *tFramedTransportFactory) GetTransport(base TTransport) (TTransport, error) { |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 79 | PropagateTConfiguration(base, p.cfg) |
| D. Can Celasun | 8da0e72 | 2017-06-02 14:33:32 +0200 | [diff] [blame] | 80 | tt, err := p.factory.GetTransport(base) |
| 81 | if err != nil { |
| 82 | return nil, err |
| 83 | } |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 84 | return NewTFramedTransportConf(tt, p.cfg), nil |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 85 | } |
| 86 | |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 87 | func (p *tFramedTransportFactory) SetTConfiguration(cfg *TConfiguration) { |
| 88 | PropagateTConfiguration(p.factory, cfg) |
| 89 | p.cfg = cfg |
| 90 | } |
| 91 | |
| 92 | // Deprecated: Use NewTFramedTransportConf instead. |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 93 | func NewTFramedTransport(transport TTransport) *TFramedTransport { |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 94 | return NewTFramedTransportConf(transport, &TConfiguration{ |
| 95 | MaxFrameSize: DEFAULT_MAX_LENGTH, |
| 96 | |
| 97 | noPropagation: true, |
| 98 | }) |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 99 | } |
| 100 | |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 101 | // Deprecated: Use NewTFramedTransportConf instead. |
| Jens Geyer | ca8469e | 2015-07-26 01:25:23 +0200 | [diff] [blame] | 102 | func NewTFramedTransportMaxLength(transport TTransport, maxLength uint32) *TFramedTransport { |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 103 | return NewTFramedTransportConf(transport, &TConfiguration{ |
| 104 | MaxFrameSize: int32(maxLength), |
| 105 | |
| 106 | noPropagation: true, |
| 107 | }) |
| 108 | } |
| 109 | |
| 110 | func NewTFramedTransportConf(transport TTransport, conf *TConfiguration) *TFramedTransport { |
| 111 | PropagateTConfiguration(transport, conf) |
| 112 | return &TFramedTransport{ |
| 113 | transport: transport, |
| 114 | reader: bufio.NewReader(transport), |
| 115 | cfg: conf, |
| 116 | } |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 117 | } |
| 118 | |
| 119 | func (p *TFramedTransport) Open() error { |
| 120 | return p.transport.Open() |
| 121 | } |
| 122 | |
| 123 | func (p *TFramedTransport) IsOpen() bool { |
| 124 | return p.transport.IsOpen() |
| 125 | } |
| 126 | |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 127 | func (p *TFramedTransport) Close() error { |
| 128 | return p.transport.Close() |
| 129 | } |
| 130 | |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 131 | func (p *TFramedTransport) Read(buf []byte) (read int, err error) { |
| 132 | read, err = p.readBuf.Read(buf) |
| 133 | if err != io.EOF { |
| 134 | return |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 135 | } |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 136 | |
| 137 | // For bytes.Buffer.Read, EOF would only happen when read is zero, |
| 138 | // but still, do a sanity check, |
| 139 | // in case that behavior is changed in a future version of go stdlib. |
| 140 | // When that happens, just return nil error, |
| 141 | // and let the caller call Read again to read the next frame. |
| 142 | if read > 0 { |
| 143 | return read, nil |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 144 | } |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 145 | |
| 146 | // Reaching here means that the last Read finished the last frame, |
| 147 | // so we need to read the next frame into readBuf now. |
| 148 | if err = p.readFrame(); err != nil { |
| 149 | return read, err |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 150 | } |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 151 | newRead, err := p.Read(buf[read:]) |
| 152 | return read + newRead, err |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 153 | } |
| 154 | |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 155 | func (p *TFramedTransport) ReadByte() (c byte, err error) { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 156 | buf := p.buffer[:1] |
| 157 | _, err = p.Read(buf) |
| 158 | if err != nil { |
| 159 | return |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 160 | } |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 161 | c = buf[0] |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 162 | return |
| 163 | } |
| 164 | |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 165 | func (p *TFramedTransport) Write(buf []byte) (int, error) { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 166 | n, err := p.writeBuf.Write(buf) |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 167 | return n, NewTTransportExceptionFromError(err) |
| 168 | } |
| 169 | |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 170 | func (p *TFramedTransport) WriteByte(c byte) error { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 171 | return p.writeBuf.WriteByte(c) |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 172 | } |
| 173 | |
| 174 | func (p *TFramedTransport) WriteString(s string) (n int, err error) { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 175 | return p.writeBuf.WriteString(s) |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 176 | } |
| 177 | |
| John Boiles | 5785279 | 2018-01-05 14:37:05 -0800 | [diff] [blame] | 178 | func (p *TFramedTransport) Flush(ctx context.Context) error { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 179 | size := p.writeBuf.Len() |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 180 | buf := p.buffer[:4] |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 181 | binary.BigEndian.PutUint32(buf, uint32(size)) |
| 182 | _, err := p.transport.Write(buf) |
| 183 | if err != nil { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 184 | p.writeBuf.Reset() |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 185 | return NewTTransportExceptionFromError(err) |
| 186 | } |
| 187 | if size > 0 { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 188 | if _, err := io.Copy(p.transport, &p.writeBuf); err != nil { |
| 189 | p.writeBuf.Reset() |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 190 | return NewTTransportExceptionFromError(err) |
| 191 | } |
| 192 | } |
| John Boiles | 5785279 | 2018-01-05 14:37:05 -0800 | [diff] [blame] | 193 | err = p.transport.Flush(ctx) |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 194 | return NewTTransportExceptionFromError(err) |
| 195 | } |
| 196 | |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 197 | func (p *TFramedTransport) readFrame() error { |
| Jens Geyer | 0997250 | 2014-05-02 01:30:13 +0200 | [diff] [blame] | 198 | buf := p.buffer[:4] |
| 199 | if _, err := io.ReadFull(p.reader, buf); err != nil { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 200 | return err |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 201 | } |
| Jens Geyer | ca8469e | 2015-07-26 01:25:23 +0200 | [diff] [blame] | 202 | size := binary.BigEndian.Uint32(buf) |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 203 | if size < 0 || size > uint32(p.cfg.GetMaxFrameSize()) { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 204 | return NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, fmt.Sprintf("Incorrect frame size (%d)", size)) |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 205 | } |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 206 | _, err := io.CopyN(&p.readBuf, p.reader, int64(size)) |
| 207 | return NewTTransportExceptionFromError(err) |
| Jens Geyer | 0e87c46 | 2013-06-18 22:25:07 +0200 | [diff] [blame] | 208 | } |
| Jens Geyer | ca8469e | 2015-07-26 01:25:23 +0200 | [diff] [blame] | 209 | |
| 210 | func (p *TFramedTransport) RemainingBytes() (num_bytes uint64) { |
| Yuxuan 'fishy' Wang | c9890cb | 2020-06-08 04:32:21 -0700 | [diff] [blame] | 211 | return uint64(p.readBuf.Len()) |
| Jens Geyer | ca8469e | 2015-07-26 01:25:23 +0200 | [diff] [blame] | 212 | } |
| Yuxuan 'fishy' Wang | c4d1c0d | 2020-12-16 17:10:48 -0800 | [diff] [blame] | 213 | |
| 214 | // SetTConfiguration implements TConfigurationSetter. |
| 215 | func (p *TFramedTransport) SetTConfiguration(cfg *TConfiguration) { |
| 216 | PropagateTConfiguration(p.transport, cfg) |
| 217 | p.cfg = cfg |
| 218 | } |
| 219 | |
| 220 | var ( |
| 221 | _ TConfigurationSetter = (*tFramedTransportFactory)(nil) |
| 222 | _ TConfigurationSetter = (*TFramedTransport)(nil) |
| 223 | ) |