THRIFT-4390: Fix bug where binary/buffered messages > 4K could not be read/written
Client: rs

This closes #1458
diff --git a/lib/rs/src/protocol/binary.rs b/lib/rs/src/protocol/binary.rs
index 1710733..8505b63 100644
--- a/lib/rs/src/protocol/binary.rs
+++ b/lib/rs/src/protocol/binary.rs
@@ -312,13 +312,6 @@
             transport: transport,
         }
     }
-
-    fn write_transport(&mut self, buf: &[u8]) -> ::Result<()> {
-        self.transport
-            .write(buf)
-            .map(|_| ())
-            .map_err(From::from)
-    }
 }
 
 impl<T> TOutputProtocol for TBinaryOutputProtocol<T>
@@ -384,7 +377,7 @@
 
     fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
         self.write_i32(b.len() as i32)?;
-        self.write_transport(b)
+        self.transport.write_all(b).map_err(From::from)
     }
 
     fn write_bool(&mut self, b: bool) -> ::Result<()> {
diff --git a/lib/rs/src/transport/buffered.rs b/lib/rs/src/transport/buffered.rs
index b588ec1..41b941c 100644
--- a/lib/rs/src/transport/buffered.rs
+++ b/lib/rs/src/transport/buffered.rs
@@ -174,6 +174,7 @@
     C: Write,
 {
     buf: Vec<u8>,
+    cap: usize,
     channel: C,
 }
 
@@ -191,8 +192,11 @@
     /// `read_capacity` and an internal write buffer of size
     /// `write_capacity` that wraps the given `TIoChannel`.
     pub fn with_capacity(write_capacity: usize, channel: C) -> TBufferedWriteTransport<C> {
+        assert!(write_capacity > 0, "write buffer size must be a positive integer");
+
         TBufferedWriteTransport {
             buf: Vec::with_capacity(write_capacity),
+            cap: write_capacity,
             channel: channel,
         }
     }
@@ -203,13 +207,28 @@
     C: Write,
 {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let avail_bytes = cmp::min(buf.len(), self.buf.capacity() - self.buf.len());
-        self.buf.extend_from_slice(&buf[..avail_bytes]);
-        assert!(
-            self.buf.len() <= self.buf.capacity(),
-            "copy overflowed buffer"
-        );
-        Ok(avail_bytes)
+        if !buf.is_empty() {
+            let mut avail_bytes;
+
+            loop {
+                avail_bytes = cmp::min(buf.len(), self.cap - self.buf.len());
+
+                if avail_bytes == 0 {
+                    self.flush()?;
+                } else {
+                    break;
+                }
+            }
+
+            let avail_bytes = avail_bytes;
+
+            self.buf.extend_from_slice(&buf[..avail_bytes]);
+            assert!(self.buf.len() <= self.cap, "copy overflowed buffer");
+
+            Ok(avail_bytes)
+        } else {
+            Ok(0)
+        }
     }
 
     fn flush(&mut self) -> io::Result<()> {
@@ -364,14 +383,21 @@
     }
 
     #[test]
-    fn must_return_zero_if_nothing_can_be_written() {
+    fn must_return_error_when_nothing_can_be_written_to_underlying_channel() {
         let mem = TBufferChannel::with_capacity(0, 0);
-        let mut t = TBufferedWriteTransport::with_capacity(0, mem);
+        let mut t = TBufferedWriteTransport::with_capacity(1, mem);
 
         let b = vec![0; 10];
         let r = t.write(&b);
 
-        assert_eq!(r.unwrap(), 0);
+        // should have written 1 byte
+        assert_eq!(r.unwrap(), 1);
+
+        // let's try again...
+        let r = t.write(&b[1..]);
+
+        // this time we'll error out because the auto-flush failed
+        assert!(r.is_err());
     }
 
     #[test]
@@ -387,23 +413,35 @@
     }
 
     #[test]
-    fn must_return_zero_if_write_buffer_full() {
-        let mem = TBufferChannel::with_capacity(0, 0);
+    fn must_auto_flush_if_write_buffer_full() {
+        let mem = TBufferChannel::with_capacity(0, 8);
         let mut t = TBufferedWriteTransport::with_capacity(4, mem);
 
-        let b = [0x00, 0x01, 0x02, 0x03];
+        let b0 = [0x00, 0x01, 0x02, 0x03];
+        let b1 = [0x04, 0x05, 0x06, 0x07];
 
-        // we've now filled the write buffer
-        let r = t.write(&b);
+        // write the first 4 bytes; we've now filled the transport's write buffer
+        let r = t.write(&b0);
         assert_eq!(r.unwrap(), 4);
 
-        // try write the same bytes again - nothing should be writable
-        let r = t.write(&b);
-        assert_eq!(r.unwrap(), 0);
+        // try write the next 4 bytes; this causes the transport to auto-flush the first 4 bytes
+        let r = t.write(&b1);
+        assert_eq!(r.unwrap(), 4);
+
+        // check that in writing the second 4 bytes we auto-flushed the first 4 bytes
+        assert_eq_transport_num_written_bytes!(t, 4);
+        assert_eq_transport_written_bytes!(t, b0);
+        t.channel.empty_write_buffer();
+
+        // now flush the transport to push the second 4 bytes to the underlying channel
+        assert!(t.flush().is_ok());
+
+        // check that we wrote out the second 4 bytes
+        assert_eq_transport_written_bytes!(t, b1);
     }
 
     #[test]
-    fn must_only_write_to_inner_transport_on_flush() {
+    fn must_write_to_inner_transport_on_flush() {
         let mem = TBufferChannel::with_capacity(10, 10);
         let mut t = TBufferedWriteTransport::new(mem);
 
diff --git a/lib/rs/src/transport/socket.rs b/lib/rs/src/transport/socket.rs
index 727bba3..a6f780a 100644
--- a/lib/rs/src/transport/socket.rs
+++ b/lib/rs/src/transport/socket.rs
@@ -156,7 +156,7 @@
 
 impl Write for TTcpChannel {
     fn write(&mut self, b: &[u8]) -> io::Result<usize> {
-        self.if_set(|s| s.write_all(b)).map(|_| b.len())
+        self.if_set(|s| s.write(b))
     }
 
     fn flush(&mut self) -> io::Result<()> {