THRIFT-2083 Improve the go lib: buffered Transport, save memory allocation, handle concurrent request
Patch: Feng Shen
diff --git a/lib/go/thrift/binary_protocol.go b/lib/go/thrift/binary_protocol.go
index 5880f65..6fb8624 100644
--- a/lib/go/thrift/binary_protocol.go
+++ b/lib/go/thrift/binary_protocol.go
@@ -24,15 +24,15 @@
"fmt"
"io"
"math"
- "strings"
)
type TBinaryProtocol struct {
- trans TTransport
+ trans TTransport
strictRead bool
strictWrite bool
readLength int
checkReadLength bool
+ buffer [8]byte
}
type TBinaryProtocolFactory struct {
@@ -180,33 +180,22 @@
}
func (p *TBinaryProtocol) WriteI16(value int16) error {
- h := byte(0xff & (value >> 8))
- l := byte(0xff & value)
- v := []byte{h, l}
+ v := p.buffer[0:2]
+ binary.BigEndian.PutUint16(v, uint16(value))
_, e := p.trans.Write(v)
return NewTProtocolException(e)
}
func (p *TBinaryProtocol) WriteI32(value int32) error {
- a := byte(0xff & (value >> 24))
- b := byte(0xff & (value >> 16))
- c := byte(0xff & (value >> 8))
- d := byte(0xff & value)
- v := []byte{a, b, c, d}
+ v := p.buffer[0:4]
+ binary.BigEndian.PutUint32(v, uint32(value))
_, e := p.trans.Write(v)
return NewTProtocolException(e)
}
func (p *TBinaryProtocol) WriteI64(value int64) error {
- a := byte(0xff & (value >> 56))
- b := byte(0xff & (value >> 48))
- c := byte(0xff & (value >> 40))
- d := byte(0xff & (value >> 32))
- e := byte(0xff & (value >> 24))
- f := byte(0xff & (value >> 16))
- g := byte(0xff & (value >> 8))
- h := byte(0xff & value)
- v := []byte{a, b, c, d, e, f, g, h}
+ v := p.buffer[:]
+ binary.BigEndian.PutUint64(v, uint64(value))
_, err := p.trans.Write(v)
return NewTProtocolException(err)
}
@@ -216,7 +205,7 @@
}
func (p *TBinaryProtocol) WriteString(value string) error {
- return p.WriteBinaryFromReader(strings.NewReader(value), len(value))
+ return p.WriteBinary([]byte(value))
}
func (p *TBinaryProtocol) WriteBinary(value []byte) error {
@@ -228,15 +217,6 @@
return NewTProtocolException(err)
}
-func (p *TBinaryProtocol) WriteBinaryFromReader(reader io.Reader, size int) error {
- e := p.WriteI32(int32(size))
- if e != nil {
- return e
- }
- _, err := io.CopyN(p.trans, reader, int64(size))
- return NewTProtocolException(err)
-}
-
/**
* Reading methods
*/
@@ -385,34 +365,34 @@
}
func (p *TBinaryProtocol) ReadByte() (value byte, err error) {
- buf := []byte{0}
+ buf := p.buffer[0:1]
err = p.readAll(buf)
return buf[0], err
}
func (p *TBinaryProtocol) ReadI16() (value int16, err error) {
- buf := []byte{0, 0}
+ buf := p.buffer[0:2]
err = p.readAll(buf)
value = int16(binary.BigEndian.Uint16(buf))
return value, err
}
func (p *TBinaryProtocol) ReadI32() (value int32, err error) {
- buf := []byte{0, 0, 0, 0}
+ buf := p.buffer[0:4]
err = p.readAll(buf)
value = int32(binary.BigEndian.Uint32(buf))
return value, err
}
func (p *TBinaryProtocol) ReadI64() (value int64, err error) {
- buf := []byte{0, 0, 0, 0, 0, 0, 0, 0}
+ buf := p.buffer[0:8]
err = p.readAll(buf)
value = int64(binary.BigEndian.Uint64(buf))
return value, err
}
func (p *TBinaryProtocol) ReadDouble() (value float64, err error) {
- buf := []byte{0, 0, 0, 0, 0, 0, 0, 0}
+ buf := p.buffer[0:8]
err = p.readAll(buf)
value = math.Float64frombits(binary.BigEndian.Uint64(buf))
return value, err
diff --git a/lib/go/thrift/buffed_transport.go b/lib/go/thrift/buffed_transport.go
new file mode 100644
index 0000000..1ba3053
--- /dev/null
+++ b/lib/go/thrift/buffed_transport.go
@@ -0,0 +1,87 @@
+package thrift
+
+type TBufferedTransportFactory struct {
+ size int
+}
+
+type TBuffer struct {
+ buffer []byte
+ pos, limit int
+}
+
+type TBufferedTransport struct {
+ tp TTransport
+ rbuf *TBuffer
+ wbuf *TBuffer
+}
+
+func (p *TBufferedTransportFactory) GetTransport(trans TTransport) TTransport {
+ return NewTBufferedTransport(trans, p.size)
+}
+
+func NewTBufferedTransportFactory(bufferSize int) *TBufferedTransportFactory {
+ return &TBufferedTransportFactory{size: bufferSize}
+}
+
+func NewTBufferedTransport(trans TTransport, bufferSize int) *TBufferedTransport {
+ rb := &TBuffer{buffer: make([]byte, bufferSize)}
+ wb := &TBuffer{buffer: make([]byte, bufferSize), limit: bufferSize}
+ return &TBufferedTransport{tp: trans, rbuf: rb, wbuf: wb}
+}
+
+func (p *TBufferedTransport) IsOpen() bool {
+ return p.tp.IsOpen()
+}
+
+func (p *TBufferedTransport) Open() (err error) {
+ return p.tp.Open()
+}
+
+func (p *TBufferedTransport) Close() (err error) {
+ return p.tp.Close()
+}
+
+func (p *TBufferedTransport) Read(buf []byte) (n int, err error) {
+ rbuf := p.rbuf
+ if rbuf.pos == rbuf.limit { // no more data to read from buffer
+ rbuf.pos = 0
+ // read data, fill buffer
+ rbuf.limit, err = p.tp.Read(rbuf.buffer)
+ if err != nil {
+ return 0, err
+ }
+ }
+ n = copy(buf, rbuf.buffer[rbuf.pos:rbuf.limit])
+ rbuf.pos += n
+ return n, nil
+}
+
+func (p *TBufferedTransport) Write(buf []byte) (n int, err error) {
+ wbuf := p.wbuf
+ size := len(buf)
+ if wbuf.pos+size > wbuf.limit { // buffer is full, flush buffer
+ p.Flush()
+ }
+ n = copy(wbuf.buffer[wbuf.pos:], buf)
+ wbuf.pos += n
+ return n, nil
+}
+
+func (p *TBufferedTransport) Flush() error {
+ start := 0
+ wbuf := p.wbuf
+ for start < wbuf.pos {
+ n, err := p.tp.Write(wbuf.buffer[start:wbuf.pos])
+ if err != nil {
+ return err
+ }
+ start += n
+ }
+
+ wbuf.pos = 0
+ return p.tp.Flush()
+}
+
+func (p *TBufferedTransport) Peek() bool {
+ return p.rbuf.pos < p.rbuf.limit || p.tp.Peek()
+}
diff --git a/lib/go/thrift/simple_server.go b/lib/go/thrift/simple_server.go
index 17be8d8..b5cb0e1 100644
--- a/lib/go/thrift/simple_server.go
+++ b/lib/go/thrift/simple_server.go
@@ -120,12 +120,14 @@
for !p.stopped {
client, err := p.serverTransport.Accept()
if err != nil {
- return err
+ log.Println("Accept err: ", err)
}
if client != nil {
- if err := p.processRequest(client); err != nil {
- log.Println("error processing request:", err)
- }
+ go func() {
+ if err := p.processRequest(client); err != nil {
+ log.Println("error processing request:", err)
+ }
+ }()
}
}
return nil