THRIFT-4390: Fix bug where binary/buffered messages > 4K could not be read/written
Client: rs
This closes #1458
diff --git a/lib/rs/src/protocol/binary.rs b/lib/rs/src/protocol/binary.rs
index 1710733..8505b63 100644
--- a/lib/rs/src/protocol/binary.rs
+++ b/lib/rs/src/protocol/binary.rs
@@ -312,13 +312,6 @@
transport: transport,
}
}
-
- fn write_transport(&mut self, buf: &[u8]) -> ::Result<()> {
- self.transport
- .write(buf)
- .map(|_| ())
- .map_err(From::from)
- }
}
impl<T> TOutputProtocol for TBinaryOutputProtocol<T>
@@ -384,7 +377,7 @@
fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
self.write_i32(b.len() as i32)?;
- self.write_transport(b)
+ self.transport.write_all(b).map_err(From::from)
}
fn write_bool(&mut self, b: bool) -> ::Result<()> {
diff --git a/lib/rs/src/transport/buffered.rs b/lib/rs/src/transport/buffered.rs
index b588ec1..41b941c 100644
--- a/lib/rs/src/transport/buffered.rs
+++ b/lib/rs/src/transport/buffered.rs
@@ -174,6 +174,7 @@
C: Write,
{
buf: Vec<u8>,
+ cap: usize,
channel: C,
}
@@ -191,8 +192,11 @@
/// `read_capacity` and an internal write buffer of size
/// `write_capacity` that wraps the given `TIoChannel`.
pub fn with_capacity(write_capacity: usize, channel: C) -> TBufferedWriteTransport<C> {
+ assert!(write_capacity > 0, "write buffer size must be a positive integer");
+
TBufferedWriteTransport {
buf: Vec::with_capacity(write_capacity),
+ cap: write_capacity,
channel: channel,
}
}
@@ -203,13 +207,28 @@
C: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- let avail_bytes = cmp::min(buf.len(), self.buf.capacity() - self.buf.len());
- self.buf.extend_from_slice(&buf[..avail_bytes]);
- assert!(
- self.buf.len() <= self.buf.capacity(),
- "copy overflowed buffer"
- );
- Ok(avail_bytes)
+ if !buf.is_empty() {
+ let mut avail_bytes;
+
+ loop {
+ avail_bytes = cmp::min(buf.len(), self.cap - self.buf.len());
+
+ if avail_bytes == 0 {
+ self.flush()?;
+ } else {
+ break;
+ }
+ }
+
+ let avail_bytes = avail_bytes;
+
+ self.buf.extend_from_slice(&buf[..avail_bytes]);
+ assert!(self.buf.len() <= self.cap, "copy overflowed buffer");
+
+ Ok(avail_bytes)
+ } else {
+ Ok(0)
+ }
}
fn flush(&mut self) -> io::Result<()> {
@@ -364,14 +383,21 @@
}
#[test]
- fn must_return_zero_if_nothing_can_be_written() {
+ fn must_return_error_when_nothing_can_be_written_to_underlying_channel() {
let mem = TBufferChannel::with_capacity(0, 0);
- let mut t = TBufferedWriteTransport::with_capacity(0, mem);
+ let mut t = TBufferedWriteTransport::with_capacity(1, mem);
let b = vec![0; 10];
let r = t.write(&b);
- assert_eq!(r.unwrap(), 0);
+ // should have written 1 byte
+ assert_eq!(r.unwrap(), 1);
+
+ // let's try again...
+ let r = t.write(&b[1..]);
+
+ // this time we'll error out because the auto-flush failed
+ assert!(r.is_err());
}
#[test]
@@ -387,23 +413,35 @@
}
#[test]
- fn must_return_zero_if_write_buffer_full() {
- let mem = TBufferChannel::with_capacity(0, 0);
+ fn must_auto_flush_if_write_buffer_full() {
+ let mem = TBufferChannel::with_capacity(0, 8);
let mut t = TBufferedWriteTransport::with_capacity(4, mem);
- let b = [0x00, 0x01, 0x02, 0x03];
+ let b0 = [0x00, 0x01, 0x02, 0x03];
+ let b1 = [0x04, 0x05, 0x06, 0x07];
- // we've now filled the write buffer
- let r = t.write(&b);
+ // write the first 4 bytes; we've now filled the transport's write buffer
+ let r = t.write(&b0);
assert_eq!(r.unwrap(), 4);
- // try write the same bytes again - nothing should be writable
- let r = t.write(&b);
- assert_eq!(r.unwrap(), 0);
+ // try write the next 4 bytes; this causes the transport to auto-flush the first 4 bytes
+ let r = t.write(&b1);
+ assert_eq!(r.unwrap(), 4);
+
+ // check that in writing the second 4 bytes we auto-flushed the first 4 bytes
+ assert_eq_transport_num_written_bytes!(t, 4);
+ assert_eq_transport_written_bytes!(t, b0);
+ t.channel.empty_write_buffer();
+
+ // now flush the transport to push the second 4 bytes to the underlying channel
+ assert!(t.flush().is_ok());
+
+ // check that we wrote out the second 4 bytes
+ assert_eq_transport_written_bytes!(t, b1);
}
#[test]
- fn must_only_write_to_inner_transport_on_flush() {
+ fn must_write_to_inner_transport_on_flush() {
let mem = TBufferChannel::with_capacity(10, 10);
let mut t = TBufferedWriteTransport::new(mem);
diff --git a/lib/rs/src/transport/socket.rs b/lib/rs/src/transport/socket.rs
index 727bba3..a6f780a 100644
--- a/lib/rs/src/transport/socket.rs
+++ b/lib/rs/src/transport/socket.rs
@@ -156,7 +156,7 @@
impl Write for TTcpChannel {
fn write(&mut self, b: &[u8]) -> io::Result<usize> {
- self.if_set(|s| s.write_all(b)).map(|_| b.len())
+ self.if_set(|s| s.write(b))
}
fn flush(&mut self) -> io::Result<()> {