THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/lib/rs/src/transport/buffered.rs b/lib/rs/src/transport/buffered.rs
index 3f240d8..b588ec1 100644
--- a/lib/rs/src/transport/buffered.rs
+++ b/lib/rs/src/transport/buffered.rs
@@ -15,104 +15,94 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cell::RefCell;
 use std::cmp;
 use std::io;
 use std::io::{Read, Write};
-use std::rc::Rc;
 
-use super::{TTransport, TTransportFactory};
+use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
 
 /// Default capacity of the read buffer in bytes.
-const DEFAULT_RBUFFER_CAPACITY: usize = 4096;
+const READ_CAPACITY: usize = 4096;
 
 /// Default capacity of the write buffer in bytes..
-const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+const WRITE_CAPACITY: usize = 4096;
 
-/// Transport that communicates with endpoints using a byte stream.
+/// Transport that reads messages via an internal buffer.
 ///
-/// A `TBufferedTransport` maintains a fixed-size internal write buffer. All
-/// writes are made to this buffer and are sent to the wrapped transport only
-/// when `TTransport::flush()` is called. On a flush a fixed-length header with a
-/// count of the buffered bytes is written, followed by the bytes themselves.
-///
-/// A `TBufferedTransport` also maintains a fixed-size internal read buffer.
-/// On a call to `TTransport::read(...)` one full message - both fixed-length
-/// header and bytes - is read from the wrapped transport and buffered.
+/// A `TBufferedReadTransport` maintains a fixed-size internal read buffer.
+/// On a call to `TBufferedReadTransport::read(...)` one full message - both
+/// fixed-length header and bytes - is read from the wrapped channel and buffered.
 /// Subsequent read calls are serviced from the internal buffer until it is
 /// exhausted, at which point the next full message is read from the wrapped
-/// transport.
+/// channel.
 ///
 /// # Examples
 ///
-/// Create and use a `TBufferedTransport`.
+/// Create and use a `TBufferedReadTransport`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use std::io::{Read, Write};
-/// use thrift::transport::{TBufferedTransport, TTcpTransport, TTransport};
+/// use std::io::Read;
+/// use thrift::transport::{TBufferedReadTransport, TTcpChannel};
 ///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
 ///
-/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
-/// let mut t = TBufferedTransport::new(t);
+/// let mut t = TBufferedReadTransport::new(c);
 ///
-/// // read
 /// t.read(&mut vec![0u8; 1]).unwrap();
-///
-/// // write
-/// t.write(&[0x00]).unwrap();
-/// t.flush().unwrap();
 /// ```
-pub struct TBufferedTransport {
-    rbuf: Box<[u8]>,
-    rpos: usize,
-    rcap: usize,
-    wbuf: Vec<u8>,
-    inner: Rc<RefCell<Box<TTransport>>>,
+#[derive(Debug)]
+pub struct TBufferedReadTransport<C>
+where
+    C: Read,
+{
+    buf: Box<[u8]>,
+    pos: usize,
+    cap: usize,
+    chan: C,
 }
 
-impl TBufferedTransport {
+impl<C> TBufferedReadTransport<C>
+where
+    C: Read,
+{
     /// Create a `TBufferedTransport` with default-sized internal read and
-    /// write buffers that wraps an `inner` `TTransport`.
-    pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TBufferedTransport {
-        TBufferedTransport::with_capacity(DEFAULT_RBUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner)
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TBufferedReadTransport<C> {
+        TBufferedReadTransport::with_capacity(READ_CAPACITY, channel)
     }
 
     /// Create a `TBufferedTransport` with an internal read buffer of size
-    /// `read_buffer_capacity` and an internal write buffer of size
-    /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
-    pub fn with_capacity(read_buffer_capacity: usize,
-                         write_buffer_capacity: usize,
-                         inner: Rc<RefCell<Box<TTransport>>>)
-                         -> TBufferedTransport {
-        TBufferedTransport {
-            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
-            rpos: 0,
-            rcap: 0,
-            wbuf: Vec::with_capacity(write_buffer_capacity),
-            inner: inner,
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(read_capacity: usize, channel: C) -> TBufferedReadTransport<C> {
+        TBufferedReadTransport {
+            buf: vec![0; read_capacity].into_boxed_slice(),
+            pos: 0,
+            cap: 0,
+            chan: channel,
         }
     }
 
     fn get_bytes(&mut self) -> io::Result<&[u8]> {
-        if self.rcap - self.rpos == 0 {
-            self.rpos = 0;
-            self.rcap = self.inner.borrow_mut().read(&mut self.rbuf)?;
+        if self.cap - self.pos == 0 {
+            self.pos = 0;
+            self.cap = self.chan.read(&mut self.buf)?;
         }
 
-        Ok(&self.rbuf[self.rpos..self.rcap])
+        Ok(&self.buf[self.pos..self.cap])
     }
 
     fn consume(&mut self, consumed: usize) {
         // TODO: was a bug here += <-- test somehow
-        self.rpos = cmp::min(self.rcap, self.rpos + consumed);
+        self.pos = cmp::min(self.cap, self.pos + consumed);
     }
 }
 
-impl Read for TBufferedTransport {
+impl<C> Read for TBufferedReadTransport<C>
+where
+    C: Read,
+{
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
         let mut bytes_read = 0;
 
@@ -137,65 +127,127 @@
     }
 }
 
-impl Write for TBufferedTransport {
+/// Factory for creating instances of `TBufferedReadTransport`.
+#[derive(Default)]
+pub struct TBufferedReadTransportFactory;
+
+impl TBufferedReadTransportFactory {
+    pub fn new() -> TBufferedReadTransportFactory {
+        TBufferedReadTransportFactory {}
+    }
+}
+
+impl TReadTransportFactory for TBufferedReadTransportFactory {
+    /// Create a `TBufferedReadTransport`.
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+        Box::new(TBufferedReadTransport::new(channel))
+    }
+}
+
+/// Transport that writes messages via an internal buffer.
+///
+/// A `TBufferedWriteTransport` maintains a fixed-size internal write buffer.
+/// All writes are made to this buffer and are sent to the wrapped channel only
+/// when `TBufferedWriteTransport::flush()` is called. On a flush a fixed-length
+/// header with a count of the buffered bytes is written, followed by the bytes
+/// themselves.
+///
+/// # Examples
+///
+/// Create and use a `TBufferedWriteTransport`.
+///
+/// ```no_run
+/// use std::io::Write;
+/// use thrift::transport::{TBufferedWriteTransport, TTcpChannel};
+///
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
+///
+/// let mut t = TBufferedWriteTransport::new(c);
+///
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+#[derive(Debug)]
+pub struct TBufferedWriteTransport<C>
+where
+    C: Write,
+{
+    buf: Vec<u8>,
+    channel: C,
+}
+
+impl<C> TBufferedWriteTransport<C>
+where
+    C: Write,
+{
+    /// Create a `TBufferedTransport` with default-sized internal read and
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TBufferedWriteTransport<C> {
+        TBufferedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
+    }
+
+    /// Create a `TBufferedTransport` with an internal read buffer of size
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(write_capacity: usize, channel: C) -> TBufferedWriteTransport<C> {
+        TBufferedWriteTransport {
+            buf: Vec::with_capacity(write_capacity),
+            channel: channel,
+        }
+    }
+}
+
+impl<C> Write for TBufferedWriteTransport<C>
+where
+    C: Write,
+{
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let avail_bytes = cmp::min(buf.len(), self.wbuf.capacity() - self.wbuf.len());
-        self.wbuf.extend_from_slice(&buf[..avail_bytes]);
-        assert!(self.wbuf.len() <= self.wbuf.capacity(),
-                "copy overflowed buffer");
+        let avail_bytes = cmp::min(buf.len(), self.buf.capacity() - self.buf.len());
+        self.buf.extend_from_slice(&buf[..avail_bytes]);
+        assert!(
+            self.buf.len() <= self.buf.capacity(),
+            "copy overflowed buffer"
+        );
         Ok(avail_bytes)
     }
 
     fn flush(&mut self) -> io::Result<()> {
-        self.inner.borrow_mut().write_all(&self.wbuf)?;
-        self.inner.borrow_mut().flush()?;
-        self.wbuf.clear();
+        self.channel.write_all(&self.buf)?;
+        self.channel.flush()?;
+        self.buf.clear();
         Ok(())
     }
 }
 
-/// Factory for creating instances of `TBufferedTransport`
+/// Factory for creating instances of `TBufferedWriteTransport`.
 #[derive(Default)]
-pub struct TBufferedTransportFactory;
+pub struct TBufferedWriteTransportFactory;
 
-impl TBufferedTransportFactory {
-    /// Create a `TBufferedTransportFactory`.
-    pub fn new() -> TBufferedTransportFactory {
-        TBufferedTransportFactory {}
+impl TBufferedWriteTransportFactory {
+    pub fn new() -> TBufferedWriteTransportFactory {
+        TBufferedWriteTransportFactory {}
     }
 }
 
-impl TTransportFactory for TBufferedTransportFactory {
-    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
-        Box::new(TBufferedTransport::new(inner)) as Box<TTransport>
+impl TWriteTransportFactory for TBufferedWriteTransportFactory {
+    /// Create a `TBufferedWriteTransport`.
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+        Box::new(TBufferedWriteTransport::new(channel))
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::cell::RefCell;
     use std::io::{Read, Write};
-    use std::rc::Rc;
 
     use super::*;
-    use ::transport::{TPassThruTransport, TTransport};
-    use ::transport::mem::TBufferTransport;
-
-    macro_rules! new_transports {
-        ($wbc:expr, $rbc:expr) => (
-            {
-                let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity($wbc, $rbc))));
-                let thru: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
-                let thru = Rc::new(RefCell::new(thru));
-                (mem, thru)
-            }
-        );
-    }
+    use transport::TBufferChannel;
 
     #[test]
     fn must_return_zero_if_read_buffer_is_empty() {
-        let (_, thru) = new_transports!(10, 0);
-        let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+        let mem = TBufferChannel::with_capacity(10, 0);
+        let mut t = TBufferedReadTransport::with_capacity(10, mem);
 
         let mut b = vec![0; 10];
         let read_result = t.read(&mut b);
@@ -205,8 +257,8 @@
 
     #[test]
     fn must_return_zero_if_caller_reads_into_zero_capacity_buffer() {
-        let (_, thru) = new_transports!(10, 0);
-        let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+        let mem = TBufferChannel::with_capacity(10, 0);
+        let mut t = TBufferedReadTransport::with_capacity(10, mem);
 
         let read_result = t.read(&mut []);
 
@@ -215,10 +267,10 @@
 
     #[test]
     fn must_return_zero_if_nothing_more_can_be_read() {
-        let (mem, thru) = new_transports!(4, 0);
-        let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+        let mem = TBufferChannel::with_capacity(4, 0);
+        let mut t = TBufferedReadTransport::with_capacity(4, mem);
 
-        mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+        t.chan.set_readable_bytes(&[0, 1, 2, 3]);
 
         // read buffer is exactly the same size as bytes available
         let mut buf = vec![0u8; 4];
@@ -239,10 +291,10 @@
 
     #[test]
     fn must_fill_user_buffer_with_only_as_many_bytes_as_available() {
-        let (mem, thru) = new_transports!(4, 0);
-        let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+        let mem = TBufferChannel::with_capacity(4, 0);
+        let mut t = TBufferedReadTransport::with_capacity(4, mem);
 
-        mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+        t.chan.set_readable_bytes(&[0, 1, 2, 3]);
 
         // read buffer is much larger than the bytes available
         let mut buf = vec![0u8; 8];
@@ -268,15 +320,16 @@
 
         // we have a much smaller buffer than the
         // underlying transport has bytes available
-        let (mem, thru) = new_transports!(10, 0);
-        let mut t = TBufferedTransport::with_capacity(2, 0, thru);
+        let mem = TBufferChannel::with_capacity(10, 0);
+        let mut t = TBufferedReadTransport::with_capacity(2, mem);
 
         // fill the underlying transport's byte buffer
         let mut readable_bytes = [0u8; 10];
         for i in 0..10 {
             readable_bytes[i] = i as u8;
         }
-        mem.borrow_mut().set_readable_bytes(&readable_bytes);
+
+        t.chan.set_readable_bytes(&readable_bytes);
 
         // we ask to read into a buffer that's much larger
         // than the one the buffered transport has; as a result
@@ -312,8 +365,8 @@
 
     #[test]
     fn must_return_zero_if_nothing_can_be_written() {
-        let (_, thru) = new_transports!(0, 0);
-        let mut t = TBufferedTransport::with_capacity(0, 0, thru);
+        let mem = TBufferChannel::with_capacity(0, 0);
+        let mut t = TBufferedWriteTransport::with_capacity(0, mem);
 
         let b = vec![0; 10];
         let r = t.write(&b);
@@ -323,19 +376,20 @@
 
     #[test]
     fn must_return_zero_if_caller_calls_write_with_empty_buffer() {
-        let (mem, thru) = new_transports!(0, 10);
-        let mut t = TBufferedTransport::with_capacity(0, 10, thru);
+        let mem = TBufferChannel::with_capacity(0, 10);
+        let mut t = TBufferedWriteTransport::with_capacity(10, mem);
 
         let r = t.write(&[]);
+        let expected: [u8; 0] = [];
 
         assert_eq!(r.unwrap(), 0);
-        assert_eq!(mem.borrow_mut().write_buffer_as_ref(), &[]);
+        assert_eq_transport_written_bytes!(t, expected);
     }
 
     #[test]
     fn must_return_zero_if_write_buffer_full() {
-        let (_, thru) = new_transports!(0, 0);
-        let mut t = TBufferedTransport::with_capacity(0, 4, thru);
+        let mem = TBufferChannel::with_capacity(0, 0);
+        let mut t = TBufferedWriteTransport::with_capacity(4, mem);
 
         let b = [0x00, 0x01, 0x02, 0x03];
 
@@ -350,26 +404,22 @@
 
     #[test]
     fn must_only_write_to_inner_transport_on_flush() {
-        let (mem, thru) = new_transports!(10, 10);
-        let mut t = TBufferedTransport::new(thru);
+        let mem = TBufferChannel::with_capacity(10, 10);
+        let mut t = TBufferedWriteTransport::new(mem);
 
         let b: [u8; 5] = [0, 1, 2, 3, 4];
         assert_eq!(t.write(&b).unwrap(), 5);
-        assert_eq!(mem.borrow_mut().write_buffer_as_ref().len(), 0);
+        assert_eq_transport_num_written_bytes!(t, 0);
 
         assert!(t.flush().is_ok());
 
-        {
-            let inner = mem.borrow_mut();
-            let underlying_buffer = inner.write_buffer_as_ref();
-            assert_eq!(b, underlying_buffer);
-        }
+        assert_eq_transport_written_bytes!(t, b);
     }
 
     #[test]
     fn must_write_successfully_after_flush() {
-        let (mem, thru) = new_transports!(0, 5);
-        let mut t = TBufferedTransport::with_capacity(0, 5, thru);
+        let mem = TBufferChannel::with_capacity(0, 5);
+        let mut t = TBufferedWriteTransport::with_capacity(5, mem);
 
         // write and flush
         let b: [u8; 5] = [0, 1, 2, 3, 4];
@@ -377,24 +427,16 @@
         assert!(t.flush().is_ok());
 
         // check the flushed bytes
-        {
-            let inner = mem.borrow_mut();
-            let underlying_buffer = inner.write_buffer_as_ref();
-            assert_eq!(b, underlying_buffer);
-        }
+        assert_eq_transport_written_bytes!(t, b);
 
         // reset our underlying transport
-        mem.borrow_mut().empty_write_buffer();
+        t.channel.empty_write_buffer();
 
         // write and flush again
         assert_eq!(t.write(&b).unwrap(), 5);
         assert!(t.flush().is_ok());
 
         // check the flushed bytes
-        {
-            let inner = mem.borrow_mut();
-            let underlying_buffer = inner.write_buffer_as_ref();
-            assert_eq!(b, underlying_buffer);
-        }
+        assert_eq_transport_written_bytes!(t, b);
     }
 }