blob: 81fa65aaae5056a177195d05bac63bdc2ddcc706 [file] [log] [blame]
Jens Geyer0e87c462013-06-18 22:25:07 +02001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
Jens Geyer09972502014-05-02 01:30:13 +020023 "bufio"
Jens Geyer0e87c462013-06-18 22:25:07 +020024 "bytes"
John Boiles57852792018-01-05 14:37:05 -080025 "context"
Jens Geyer0e87c462013-06-18 22:25:07 +020026 "encoding/binary"
Jens Geyer09972502014-05-02 01:30:13 +020027 "fmt"
Jens Geyer0e87c462013-06-18 22:25:07 +020028 "io"
29)
30
Jens Geyer09972502014-05-02 01:30:13 +020031const DEFAULT_MAX_LENGTH = 16384000
32
Jens Geyer0e87c462013-06-18 22:25:07 +020033type TFramedTransport struct {
Jens Geyer09972502014-05-02 01:30:13 +020034 transport TTransport
35 buf bytes.Buffer
36 reader *bufio.Reader
Jens Geyerca8469e2015-07-26 01:25:23 +020037 frameSize uint32 //Current remaining size of the frame. if ==0 read next frame header
Jens Geyer09972502014-05-02 01:30:13 +020038 buffer [4]byte
Jens Geyerca8469e2015-07-26 01:25:23 +020039 maxLength uint32
Jens Geyer0e87c462013-06-18 22:25:07 +020040}
41
42type tFramedTransportFactory struct {
Jens Geyer09972502014-05-02 01:30:13 +020043 factory TTransportFactory
Jens Geyerca8469e2015-07-26 01:25:23 +020044 maxLength uint32
Jens Geyer0e87c462013-06-18 22:25:07 +020045}
46
47func NewTFramedTransportFactory(factory TTransportFactory) TTransportFactory {
Jens Geyer09972502014-05-02 01:30:13 +020048 return &tFramedTransportFactory{factory: factory, maxLength: DEFAULT_MAX_LENGTH}
Jens Geyer0e87c462013-06-18 22:25:07 +020049}
50
Jens Geyerca8469e2015-07-26 01:25:23 +020051func NewTFramedTransportFactoryMaxLength(factory TTransportFactory, maxLength uint32) TTransportFactory {
D. Can Celasun8da0e722017-06-02 14:33:32 +020052 return &tFramedTransportFactory{factory: factory, maxLength: maxLength}
Jens Geyer417b6312015-06-22 22:21:27 +020053}
54
D. Can Celasun8da0e722017-06-02 14:33:32 +020055func (p *tFramedTransportFactory) GetTransport(base TTransport) (TTransport, error) {
56 tt, err := p.factory.GetTransport(base)
57 if err != nil {
58 return nil, err
59 }
60 return NewTFramedTransportMaxLength(tt, p.maxLength), nil
Jens Geyer0e87c462013-06-18 22:25:07 +020061}
62
63func NewTFramedTransport(transport TTransport) *TFramedTransport {
Jens Geyer09972502014-05-02 01:30:13 +020064 return &TFramedTransport{transport: transport, reader: bufio.NewReader(transport), maxLength: DEFAULT_MAX_LENGTH}
65}
66
Jens Geyerca8469e2015-07-26 01:25:23 +020067func NewTFramedTransportMaxLength(transport TTransport, maxLength uint32) *TFramedTransport {
Jens Geyer09972502014-05-02 01:30:13 +020068 return &TFramedTransport{transport: transport, reader: bufio.NewReader(transport), maxLength: maxLength}
Jens Geyer0e87c462013-06-18 22:25:07 +020069}
70
71func (p *TFramedTransport) Open() error {
72 return p.transport.Open()
73}
74
75func (p *TFramedTransport) IsOpen() bool {
76 return p.transport.IsOpen()
77}
78
Jens Geyer0e87c462013-06-18 22:25:07 +020079func (p *TFramedTransport) Close() error {
80 return p.transport.Close()
81}
82
Jens Geyer09972502014-05-02 01:30:13 +020083func (p *TFramedTransport) Read(buf []byte) (l int, err error) {
84 if p.frameSize == 0 {
85 p.frameSize, err = p.readFrameHeader()
86 if err != nil {
87 return
Jens Geyer0e87c462013-06-18 22:25:07 +020088 }
89 }
Jens Geyerca8469e2015-07-26 01:25:23 +020090 if p.frameSize < uint32(len(buf)) {
Jens Geyer0dd48012014-11-18 21:55:30 +010091 frameSize := p.frameSize
92 tmp := make([]byte, p.frameSize)
93 l, err = p.Read(tmp)
94 copy(buf, tmp)
95 if err == nil {
96 err = NewTTransportExceptionFromError(fmt.Errorf("Not enough frame size %d to read %d bytes", frameSize, len(buf)))
97 return
98 }
Jens Geyer09972502014-05-02 01:30:13 +020099 }
100 got, err := p.reader.Read(buf)
Jens Geyerca8469e2015-07-26 01:25:23 +0200101 p.frameSize = p.frameSize - uint32(got)
Jens Geyer09972502014-05-02 01:30:13 +0200102 //sanity check
103 if p.frameSize < 0 {
104 return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, "Negative frame size")
105 }
Jens Geyer0e87c462013-06-18 22:25:07 +0200106 return got, NewTTransportExceptionFromError(err)
107}
108
Jens Geyer09972502014-05-02 01:30:13 +0200109func (p *TFramedTransport) ReadByte() (c byte, err error) {
110 if p.frameSize == 0 {
111 p.frameSize, err = p.readFrameHeader()
112 if err != nil {
113 return
114 }
115 }
116 if p.frameSize < 1 {
Jens Geyer0dd48012014-11-18 21:55:30 +0100117 return 0, NewTTransportExceptionFromError(fmt.Errorf("Not enough frame size %d to read %d bytes", p.frameSize, 1))
Jens Geyer09972502014-05-02 01:30:13 +0200118 }
119 c, err = p.reader.ReadByte()
120 if err == nil {
121 p.frameSize--
122 }
123 return
124}
125
Jens Geyer0e87c462013-06-18 22:25:07 +0200126func (p *TFramedTransport) Write(buf []byte) (int, error) {
Jens Geyer09972502014-05-02 01:30:13 +0200127 n, err := p.buf.Write(buf)
Jens Geyer0e87c462013-06-18 22:25:07 +0200128 return n, NewTTransportExceptionFromError(err)
129}
130
Jens Geyer09972502014-05-02 01:30:13 +0200131func (p *TFramedTransport) WriteByte(c byte) error {
132 return p.buf.WriteByte(c)
133}
134
135func (p *TFramedTransport) WriteString(s string) (n int, err error) {
136 return p.buf.WriteString(s)
137}
138
John Boiles57852792018-01-05 14:37:05 -0800139func (p *TFramedTransport) Flush(ctx context.Context) error {
Jens Geyer09972502014-05-02 01:30:13 +0200140 size := p.buf.Len()
141 buf := p.buffer[:4]
Jens Geyer0e87c462013-06-18 22:25:07 +0200142 binary.BigEndian.PutUint32(buf, uint32(size))
143 _, err := p.transport.Write(buf)
144 if err != nil {
liduo0449e51032017-07-05 22:01:44 +0800145 p.buf.Truncate(0)
Jens Geyer0e87c462013-06-18 22:25:07 +0200146 return NewTTransportExceptionFromError(err)
147 }
148 if size > 0 {
Jens Geyer09972502014-05-02 01:30:13 +0200149 if n, err := p.buf.WriteTo(p.transport); err != nil {
Jens Geyera5960382013-12-03 22:57:59 +0100150 print("Error while flushing write buffer of size ", size, " to transport, only wrote ", n, " bytes: ", err.Error(), "\n")
liduo0449e51032017-07-05 22:01:44 +0800151 p.buf.Truncate(0)
Jens Geyer0e87c462013-06-18 22:25:07 +0200152 return NewTTransportExceptionFromError(err)
153 }
154 }
John Boiles57852792018-01-05 14:37:05 -0800155 err = p.transport.Flush(ctx)
Jens Geyer0e87c462013-06-18 22:25:07 +0200156 return NewTTransportExceptionFromError(err)
157}
158
Jens Geyerca8469e2015-07-26 01:25:23 +0200159func (p *TFramedTransport) readFrameHeader() (uint32, error) {
Jens Geyer09972502014-05-02 01:30:13 +0200160 buf := p.buffer[:4]
161 if _, err := io.ReadFull(p.reader, buf); err != nil {
Jens Geyer0e87c462013-06-18 22:25:07 +0200162 return 0, err
163 }
Jens Geyerca8469e2015-07-26 01:25:23 +0200164 size := binary.BigEndian.Uint32(buf)
Jens Geyer09972502014-05-02 01:30:13 +0200165 if size < 0 || size > p.maxLength {
166 return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, fmt.Sprintf("Incorrect frame size (%d)", size))
Jens Geyer0e87c462013-06-18 22:25:07 +0200167 }
Jens Geyer0e87c462013-06-18 22:25:07 +0200168 return size, nil
169}
Jens Geyerca8469e2015-07-26 01:25:23 +0200170
171func (p *TFramedTransport) RemainingBytes() (num_bytes uint64) {
172 return uint64(p.frameSize)
173}