THRIFT-5214: Partial rewrite of TFramedTransport

Client: go

While debugging the issue in THRIFT-5214, I original thought that was a
bug in TFramedTransport implementation, so I took some time scrutinize
the TFramedTransport code. Although in the end there's no bug, the
current implementation of TFramedTransport, especially in the Read
function, has some weird handling while at frame boundary, which is both
error-prone and hard to follow (I did found and fixed one bug there in
https://github.com/apache/thrift/pull/1810 before).

The new implementation reads the whole frame into a buffer, which would
use slightly more memory, but it follows the pattern of TFramedTransport
implementation of other languages (e.g. Java), and also the pattern we
handle frame in THeaderTransport. It also reduced the complexity and
weirdness of the frame boundary handling in Read implementation.

This rewrite also removes the print call from library code.
diff --git a/lib/go/thrift/framed_transport.go b/lib/go/thrift/framed_transport.go
index 34275b5..f192075 100644
--- a/lib/go/thrift/framed_transport.go
+++ b/lib/go/thrift/framed_transport.go
@@ -32,11 +32,14 @@
 
 type TFramedTransport struct {
 	transport TTransport
-	buf       bytes.Buffer
-	reader    *bufio.Reader
-	frameSize uint32 //Current remaining size of the frame. if ==0 read next frame header
-	buffer    [4]byte
 	maxLength uint32
+
+	writeBuf bytes.Buffer
+
+	reader  *bufio.Reader
+	readBuf bytes.Buffer
+
+	buffer [4]byte
 }
 
 type tFramedTransportFactory struct {
@@ -80,89 +83,65 @@
 	return p.transport.Close()
 }
 
-func (p *TFramedTransport) Read(buf []byte) (l int, err error) {
-	if p.frameSize == 0 {
-		p.frameSize, err = p.readFrameHeader()
-		if err != nil {
-			return
-		}
+func (p *TFramedTransport) Read(buf []byte) (read int, err error) {
+	read, err = p.readBuf.Read(buf)
+	if err != io.EOF {
+		return
 	}
-	if p.frameSize < uint32(len(buf)) {
-		frameSize := p.frameSize
-		tmp := make([]byte, p.frameSize)
-		l, err = p.Read(tmp)
-		copy(buf, tmp)
-		if err == nil {
-			// Note: It's important to only return an error when l
-			// is zero.
-			// In io.Reader.Read interface, it's perfectly fine to
-			// return partial data and nil error, which means
-			// "This is all the data we have right now without
-			// blocking. If you need the full data, call Read again
-			// or use io.ReadFull instead".
-			// Returning partial data with an error actually means
-			// there's no more data after the partial data just
-			// returned, which is not true in this case
-			// (it might be that the other end just haven't written
-			// them yet).
-			if l == 0 {
-				err = NewTTransportExceptionFromError(fmt.Errorf("Not enough frame size %d to read %d bytes", frameSize, len(buf)))
-			}
-			return
-		}
+
+	// For bytes.Buffer.Read, EOF would only happen when read is zero,
+	// but still, do a sanity check,
+	// in case that behavior is changed in a future version of go stdlib.
+	// When that happens, just return nil error,
+	// and let the caller call Read again to read the next frame.
+	if read > 0 {
+		return read, nil
 	}
-	got, err := p.reader.Read(buf)
-	p.frameSize = p.frameSize - uint32(got)
-	//sanity check
-	if p.frameSize < 0 {
-		return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, "Negative frame size")
+
+	// Reaching here means that the last Read finished the last frame,
+	// so we need to read the next frame into readBuf now.
+	if err = p.readFrame(); err != nil {
+		return read, err
 	}
-	return got, NewTTransportExceptionFromError(err)
+	newRead, err := p.Read(buf[read:])
+	return read + newRead, err
 }
 
 func (p *TFramedTransport) ReadByte() (c byte, err error) {
-	if p.frameSize == 0 {
-		p.frameSize, err = p.readFrameHeader()
-		if err != nil {
-			return
-		}
+	buf := p.buffer[:1]
+	_, err = p.Read(buf)
+	if err != nil {
+		return
 	}
-	if p.frameSize < 1 {
-		return 0, NewTTransportExceptionFromError(fmt.Errorf("Not enough frame size %d to read %d bytes", p.frameSize, 1))
-	}
-	c, err = p.reader.ReadByte()
-	if err == nil {
-		p.frameSize--
-	}
+	c = buf[0]
 	return
 }
 
 func (p *TFramedTransport) Write(buf []byte) (int, error) {
-	n, err := p.buf.Write(buf)
+	n, err := p.writeBuf.Write(buf)
 	return n, NewTTransportExceptionFromError(err)
 }
 
 func (p *TFramedTransport) WriteByte(c byte) error {
-	return p.buf.WriteByte(c)
+	return p.writeBuf.WriteByte(c)
 }
 
 func (p *TFramedTransport) WriteString(s string) (n int, err error) {
-	return p.buf.WriteString(s)
+	return p.writeBuf.WriteString(s)
 }
 
 func (p *TFramedTransport) Flush(ctx context.Context) error {
-	size := p.buf.Len()
+	size := p.writeBuf.Len()
 	buf := p.buffer[:4]
 	binary.BigEndian.PutUint32(buf, uint32(size))
 	_, err := p.transport.Write(buf)
 	if err != nil {
-		p.buf.Truncate(0)
+		p.writeBuf.Reset()
 		return NewTTransportExceptionFromError(err)
 	}
 	if size > 0 {
-		if n, err := p.buf.WriteTo(p.transport); err != nil {
-			print("Error while flushing write buffer of size ", size, " to transport, only wrote ", n, " bytes: ", err.Error(), "\n")
-			p.buf.Truncate(0)
+		if _, err := io.Copy(p.transport, &p.writeBuf); err != nil {
+			p.writeBuf.Reset()
 			return NewTTransportExceptionFromError(err)
 		}
 	}
@@ -170,18 +149,19 @@
 	return NewTTransportExceptionFromError(err)
 }
 
-func (p *TFramedTransport) readFrameHeader() (uint32, error) {
+func (p *TFramedTransport) readFrame() error {
 	buf := p.buffer[:4]
 	if _, err := io.ReadFull(p.reader, buf); err != nil {
-		return 0, err
+		return err
 	}
 	size := binary.BigEndian.Uint32(buf)
 	if size < 0 || size > p.maxLength {
-		return 0, NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, fmt.Sprintf("Incorrect frame size (%d)", size))
+		return NewTTransportException(UNKNOWN_TRANSPORT_EXCEPTION, fmt.Sprintf("Incorrect frame size (%d)", size))
 	}
-	return size, nil
+	_, err := io.CopyN(&p.readBuf, p.reader, int64(size))
+	return NewTTransportExceptionFromError(err)
 }
 
 func (p *TFramedTransport) RemainingBytes() (num_bytes uint64) {
-	return uint64(p.frameSize)
+	return uint64(p.readBuf.Len())
 }