THRIFT-4419: Fix bug where framed messages > 4K could not be read
Client: rs
This closes #1508
diff --git a/lib/rs/src/transport/framed.rs b/lib/rs/src/transport/framed.rs
index d78d2f7..7e0f8b6 100644
--- a/lib/rs/src/transport/framed.rs
+++ b/lib/rs/src/transport/framed.rs
@@ -18,7 +18,7 @@
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::cmp;
use std::io;
-use std::io::{ErrorKind, Read, Write};
+use std::io::{Read, Write};
use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
@@ -57,7 +57,7 @@
where
C: Read,
{
- buf: Box<[u8]>,
+ buf: Vec<u8>,
pos: usize,
cap: usize,
chan: C,
@@ -67,18 +67,17 @@
where
C: Read,
{
- /// Create a `TFramedTransport` with default-sized internal read and
- /// write buffers that wraps the given `TIoChannel`.
+ /// Create a `TFramedReadTransport` with a default-sized
+ /// internal read buffer that wraps the given `TIoChannel`.
pub fn new(channel: C) -> TFramedReadTransport<C> {
TFramedReadTransport::with_capacity(READ_CAPACITY, channel)
}
- /// Create a `TFramedTransport` with an internal read buffer of size
- /// `read_capacity` and an internal write buffer of size
- /// `write_capacity` that wraps the given `TIoChannel`.
+ /// Create a `TFramedTransport` with an internal read buffer
+ /// of size `read_capacity` that wraps the given `TIoChannel`.
pub fn with_capacity(read_capacity: usize, channel: C) -> TFramedReadTransport<C> {
TFramedReadTransport {
- buf: vec![0; read_capacity].into_boxed_slice(),
+ buf: vec![0; read_capacity], // FIXME: do I actually have to do this?
pos: 0,
cap: 0,
chan: channel,
@@ -93,22 +92,13 @@
fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
if self.cap - self.pos == 0 {
let message_size = self.chan.read_i32::<BigEndian>()? as usize;
- if message_size > self.buf.len() {
- return Err(
- io::Error::new(
- ErrorKind::Other,
- format!(
- "bytes to be read ({}) exceeds buffer \
- capacity ({})",
- message_size,
- self.buf.len()
- ),
- ),
- );
- }
+
+ let buf_capacity = cmp::max(message_size, READ_CAPACITY);
+ self.buf.resize(buf_capacity, 0);
+
self.chan.read_exact(&mut self.buf[..message_size])?;
- self.pos = 0;
self.cap = message_size as usize;
+ self.pos = 0;
}
let nread = cmp::min(b.len(), self.cap - self.pos);
@@ -165,8 +155,7 @@
where
C: Write,
{
- buf: Box<[u8]>,
- pos: usize,
+ buf: Vec<u8>,
channel: C,
}
@@ -174,20 +163,18 @@
where
C: Write,
{
- /// Create a `TFramedTransport` with default-sized internal read and
- /// write buffers that wraps the given `TIoChannel`.
+ /// Create a `TFramedWriteTransport` with default-sized internal
+ /// write buffer that wraps the given `TIoChannel`.
pub fn new(channel: C) -> TFramedWriteTransport<C> {
TFramedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
}
- /// Create a `TFramedTransport` with an internal read buffer of size
- /// `read_capacity` and an internal write buffer of size
- /// `write_capacity` that wraps the given `TIoChannel`.
+ /// Create a `TFramedWriteTransport` with an internal write buffer
+ /// of size `write_capacity` that wraps the given `TIoChannel`.
pub fn with_capacity(write_capacity: usize, channel: C) -> TFramedWriteTransport<C> {
TFramedWriteTransport {
- buf: vec![0; write_capacity].into_boxed_slice(),
- pos: 0,
- channel: channel,
+ buf: Vec::with_capacity(write_capacity),
+ channel,
}
}
}
@@ -197,28 +184,19 @@
C: Write,
{
fn write(&mut self, b: &[u8]) -> io::Result<usize> {
- if b.len() > (self.buf.len() - self.pos) {
- return Err(
- io::Error::new(
- ErrorKind::Other,
- format!(
- "bytes to be written ({}) exceeds buffer \
- capacity ({})",
- b.len(),
- self.buf.len() - self.pos
- ),
- ),
- );
+ let current_capacity = self.buf.capacity();
+ let available_space = current_capacity - self.buf.len();
+ if b.len() > available_space {
+ let additional_space = cmp::max(b.len() - available_space, current_capacity);
+ self.buf.reserve(additional_space);
}
- let nwrite = b.len(); // always less than available write buffer capacity
- self.buf[self.pos..(self.pos + nwrite)].clone_from_slice(b);
- self.pos += nwrite;
- Ok(nwrite)
+ self.buf.extend_from_slice(b);
+ Ok(b.len())
}
fn flush(&mut self) -> io::Result<()> {
- let message_size = self.pos;
+ let message_size = self.buf.len();
if let 0 = message_size {
return Ok(());
@@ -227,13 +205,17 @@
.write_i32::<BigEndian>(message_size as i32)?;
}
+ // will spin if the underlying channel can't be written to
let mut byte_index = 0;
- while byte_index < self.pos {
- let nwrite = self.channel.write(&self.buf[byte_index..self.pos])?;
- byte_index = cmp::min(byte_index + nwrite, self.pos);
+ while byte_index < message_size {
+ let nwrite = self.channel.write(&self.buf[byte_index..message_size])?;
+ byte_index = cmp::min(byte_index + nwrite, message_size);
}
- self.pos = 0;
+ let buf_capacity = cmp::min(self.buf.capacity(), WRITE_CAPACITY);
+ self.buf.resize(buf_capacity, 0);
+ self.buf.clear();
+
self.channel.flush()
}
}
@@ -257,8 +239,230 @@
#[cfg(test)]
mod tests {
- // use std::io::{Read, Write};
- //
- // use super::*;
- // use ::transport::mem::TBufferChannel;
+ use super::*;
+ use ::transport::mem::TBufferChannel;
+
+ // FIXME: test a forced reserve
+
+ #[test]
+ fn must_read_message_smaller_than_initial_buffer_size() {
+ let c = TBufferChannel::with_capacity(10, 10);
+ let mut t = TFramedReadTransport::with_capacity(8, c);
+
+ t.chan.set_readable_bytes(
+ &[
+ 0x00, 0x00, 0x00, 0x04, /* message size */
+ 0x00, 0x01, 0x02, 0x03 /* message body */
+ ]
+ );
+
+ let mut buf = vec![0; 8];
+
+ // we've read exactly 4 bytes
+ assert_eq!(t.read(&mut buf).unwrap(), 4);
+ assert_eq!(&buf[..4], &[0x00, 0x01, 0x02, 0x03]);
+ }
+
+ #[test]
+ fn must_read_message_greater_than_initial_buffer_size() {
+ let c = TBufferChannel::with_capacity(10, 10);
+ let mut t = TFramedReadTransport::with_capacity(2, c);
+
+ t.chan.set_readable_bytes(
+ &[
+ 0x00, 0x00, 0x00, 0x04, /* message size */
+ 0x00, 0x01, 0x02, 0x03 /* message body */
+ ]
+ );
+
+ let mut buf = vec![0; 8];
+
+ // we've read exactly 4 bytes
+ assert_eq!(t.read(&mut buf).unwrap(), 4);
+ assert_eq!(&buf[..4], &[0x00, 0x01, 0x02, 0x03]);
+ }
+
+ #[test]
+ fn must_read_multiple_messages_in_sequence_correctly() {
+ let c = TBufferChannel::with_capacity(10, 10);
+ let mut t = TFramedReadTransport::with_capacity(2, c);
+
+ //
+ // 1st message
+ //
+
+ t.chan.set_readable_bytes(
+ &[
+ 0x00, 0x00, 0x00, 0x04, /* message size */
+ 0x00, 0x01, 0x02, 0x03 /* message body */
+ ]
+ );
+
+ let mut buf = vec![0; 8];
+
+ // we've read exactly 4 bytes
+ assert_eq!(t.read(&mut buf).unwrap(), 4);
+ assert_eq!(&buf, &[0x00, 0x01, 0x02, 0x03, 0x00, 0x00, 0x00, 0x00]);
+
+ //
+ // 2nd message
+ //
+
+ t.chan.set_readable_bytes(
+ &[
+ 0x00, 0x00, 0x00, 0x01, /* message size */
+ 0x04 /* message body */
+ ]
+ );
+
+ let mut buf = vec![0; 8];
+
+ // we've read exactly 1 byte
+ assert_eq!(t.read(&mut buf).unwrap(), 1);
+ assert_eq!(&buf, &[0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
+ }
+
+ #[test]
+ fn must_write_message_smaller_than_buffer_size() {
+ let mem = TBufferChannel::with_capacity(0, 0);
+ let mut t = TFramedWriteTransport::with_capacity(20, mem);
+
+ let b = vec![0; 10];
+
+ // should have written 10 bytes
+ assert_eq!(t.write(&b).unwrap(), 10);
+ }
+
+ #[test]
+ fn must_return_zero_if_caller_calls_write_with_empty_buffer() {
+ let mem = TBufferChannel::with_capacity(0, 10);
+ let mut t = TFramedWriteTransport::with_capacity(10, mem);
+
+ let expected: [u8; 0] = [];
+
+ assert_eq!(t.write(&[]).unwrap(), 0);
+ assert_eq_transport_written_bytes!(t, expected);
+ }
+
+ #[test]
+ fn must_write_to_inner_transport_on_flush() {
+ let mem = TBufferChannel::with_capacity(10, 10);
+ let mut t = TFramedWriteTransport::new(mem);
+
+ let b: [u8; 5] = [0x00, 0x01, 0x02, 0x03, 0x04];
+ assert_eq!(t.write(&b).unwrap(), 5);
+ assert_eq_transport_num_written_bytes!(t, 0);
+
+ assert!(t.flush().is_ok());
+
+ let expected_bytes = [
+ 0x00, 0x00, 0x00, 0x05, /* message size */
+ 0x00, 0x01, 0x02, 0x03, 0x04 /* message body */
+ ];
+
+ assert_eq_transport_written_bytes!(t, expected_bytes);
+ }
+
+ #[test]
+ fn must_write_message_greater_than_buffer_size_00() {
+ let mem = TBufferChannel::with_capacity(0, 10);
+
+ // IMPORTANT: DO **NOT** CHANGE THE WRITE_CAPACITY OR THE NUMBER OF BYTES TO BE WRITTEN!
+ // these lengths were chosen to be just long enough
+ // that doubling the capacity is a **worse** choice than
+ // simply resizing the buffer to b.len()
+
+ let mut t = TFramedWriteTransport::with_capacity(1, mem);
+ let b = [0x00, 0x01, 0x02];
+
+ // should have written 3 bytes
+ assert_eq!(t.write(&b).unwrap(), 3);
+ assert_eq_transport_num_written_bytes!(t, 0);
+
+ assert!(t.flush().is_ok());
+
+ let expected_bytes = [
+ 0x00, 0x00, 0x00, 0x03, /* message size */
+ 0x00, 0x01, 0x02 /* message body */
+ ];
+
+ assert_eq_transport_written_bytes!(t, expected_bytes);
+ }
+
+ #[test]
+ fn must_write_message_greater_than_buffer_size_01() {
+ let mem = TBufferChannel::with_capacity(0, 10);
+
+ // IMPORTANT: DO **NOT** CHANGE THE WRITE_CAPACITY OR THE NUMBER OF BYTES TO BE WRITTEN!
+ // these lengths were chosen to be just long enough
+ // that doubling the capacity is a **better** choice than
+ // simply resizing the buffer to b.len()
+
+ let mut t = TFramedWriteTransport::with_capacity(2, mem);
+ let b = [0x00, 0x01, 0x02];
+
+ // should have written 3 bytes
+ assert_eq!(t.write(&b).unwrap(), 3);
+ assert_eq_transport_num_written_bytes!(t, 0);
+
+ assert!(t.flush().is_ok());
+
+ let expected_bytes = [
+ 0x00, 0x00, 0x00, 0x03, /* message size */
+ 0x00, 0x01, 0x02 /* message body */
+ ];
+
+ assert_eq_transport_written_bytes!(t, expected_bytes);
+ }
+
+ #[test]
+ fn must_return_error_if_nothing_can_be_written_to_inner_transport_on_flush() {
+ let mem = TBufferChannel::with_capacity(0, 0);
+ let mut t = TFramedWriteTransport::with_capacity(1, mem);
+
+ let b = vec![0; 10];
+
+ // should have written 10 bytes
+ assert_eq!(t.write(&b).unwrap(), 10);
+
+ // let's flush
+ let r = t.flush();
+
+ // this time we'll error out because the flush can't write to the underlying channel
+ assert!(r.is_err());
+ }
+
+ #[test]
+ fn must_write_successfully_after_flush() {
+ // IMPORTANT: write capacity *MUST* be greater
+ // than message sizes used in this test + 4-byte frame header
+ let mem = TBufferChannel::with_capacity(0, 10);
+ let mut t = TFramedWriteTransport::with_capacity(5, mem);
+
+ // write and flush
+ let first_message: [u8; 5] = [0x00, 0x01, 0x02, 0x03, 0x04];
+ assert_eq!(t.write(&first_message).unwrap(), 5);
+ assert!(t.flush().is_ok());
+
+ let mut expected = Vec::new();
+ expected.write_all(&[0x00, 0x00, 0x00, 0x05]).unwrap(); // message size
+ expected.extend_from_slice(&first_message);
+
+ // check the flushed bytes
+ assert_eq!(t.channel.write_bytes(), expected);
+
+ // reset our underlying transport
+ t.channel.empty_write_buffer();
+
+ let second_message: [u8; 3] = [0x05, 0x06, 0x07];
+ assert_eq!(t.write(&second_message).unwrap(), 3);
+ assert!(t.flush().is_ok());
+
+ expected.clear();
+ expected.write_all(&[0x00, 0x00, 0x00, 0x03]).unwrap(); // message size
+ expected.extend_from_slice(&second_message);
+
+ // check the flushed bytes
+ assert_eq!(t.channel.write_bytes(), expected);
+ }
}
diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json
index b302ae8..a8d00ec 100644
--- a/test/known_failures_Linux.json
+++ b/test/known_failures_Linux.json
@@ -307,18 +307,6 @@
"rs-cpp_multic-compact_framed-ip",
"rs-cpp_multic_buffered-ip",
"rs-cpp_multic_framed-ip",
- "rs-csharp_binary_buffered-ip",
- "rs-csharp_binary_framed-ip",
- "rs-csharp_compact_buffered-ip",
- "rs-csharp_compact_framed-ip",
- "rs-csharp_multi-binary_buffered-ip",
- "rs-csharp_multi-binary_framed-ip",
- "rs-csharp_multi_buffered-ip",
- "rs-csharp_multi_framed-ip",
- "rs-csharp_multic-compact_buffered-ip",
- "rs-csharp_multic-compact_framed-ip",
- "rs-csharp_multic_buffered-ip",
- "rs-csharp_multic_framed-ip",
"rs-dart_binary_framed-ip",
"rs-dart_compact_framed-ip",
"rs-dart_multi-binary_framed-ip",