blob: 3a59e7b9ea620921b037e6ca2b4e09f75d272fc1 [file] [log] [blame]
Jens Geyer0e87c462013-06-18 22:25:07 +02001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package thrift
21
22import (
23 "bytes"
24 "encoding/binary"
25 "io"
26)
27
28type TFramedTransport struct {
29 transport TTransport
30 writeBuffer *bytes.Buffer
31 readBuffer *bytes.Buffer
32}
33
34type tFramedTransportFactory struct {
35 factory TTransportFactory
36}
37
38func NewTFramedTransportFactory(factory TTransportFactory) TTransportFactory {
39 return &tFramedTransportFactory{factory: factory}
40}
41
42func (p *tFramedTransportFactory) GetTransport(base TTransport) TTransport {
43 return NewTFramedTransport(p.factory.GetTransport(base))
44}
45
46func NewTFramedTransport(transport TTransport) *TFramedTransport {
47 writeBuf := make([]byte, 0, 1024)
48 readBuf := make([]byte, 0, 1024)
49 return &TFramedTransport{transport: transport, writeBuffer: bytes.NewBuffer(writeBuf), readBuffer: bytes.NewBuffer(readBuf)}
50}
51
52func (p *TFramedTransport) Open() error {
53 return p.transport.Open()
54}
55
56func (p *TFramedTransport) IsOpen() bool {
57 return p.transport.IsOpen()
58}
59
60func (p *TFramedTransport) Peek() bool {
61 return p.transport.Peek()
62}
63
64func (p *TFramedTransport) Close() error {
65 return p.transport.Close()
66}
67
68func (p *TFramedTransport) Read(buf []byte) (int, error) {
69 if p.readBuffer.Len() > 0 {
70 got, err := p.readBuffer.Read(buf)
71 if got > 0 {
72 return got, NewTTransportExceptionFromError(err)
73 }
74 }
75
76 // Read another frame of data
77 p.readFrame()
78
79 got, err := p.readBuffer.Read(buf)
80 return got, NewTTransportExceptionFromError(err)
81}
82
83func (p *TFramedTransport) Write(buf []byte) (int, error) {
84 n, err := p.writeBuffer.Write(buf)
85 return n, NewTTransportExceptionFromError(err)
86}
87
88func (p *TFramedTransport) Flush() error {
89 size := p.writeBuffer.Len()
90 buf := []byte{0, 0, 0, 0}
91 binary.BigEndian.PutUint32(buf, uint32(size))
92 _, err := p.transport.Write(buf)
93 if err != nil {
94 return NewTTransportExceptionFromError(err)
95 }
96 if size > 0 {
97 if n, err := p.writeBuffer.WriteTo(p.transport); err != nil {
98 print("Error while flushing write buffer of size ", size, " to transport, only wrote ", n, " bytes: ", err, "\n")
99 return NewTTransportExceptionFromError(err)
100 }
101 }
102 err = p.transport.Flush()
103 return NewTTransportExceptionFromError(err)
104}
105
106func (p *TFramedTransport) readFrame() (int, error) {
107 buf := []byte{0, 0, 0, 0}
108 if _, err := io.ReadFull(p.transport, buf); err != nil {
109 return 0, err
110 }
111 size := int(binary.BigEndian.Uint32(buf))
112 if size < 0 {
113 return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, "Read a negative frame size ("+string(size)+")")
114 }
115 if size == 0 {
116 return 0, nil
117 }
118 buf2 := make([]byte, size)
119 if n, err := io.ReadFull(p.transport, buf2); err != nil {
120 return n, err
121 }
122 p.readBuffer = bytes.NewBuffer(buf2)
123 return size, nil
124}