THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/lib/rs/src/transport/framed.rs b/lib/rs/src/transport/framed.rs
index 75c12f4..d78d2f7 100644
--- a/lib/rs/src/transport/framed.rs
+++ b/lib/rs/src/transport/framed.rs
@@ -16,165 +16,242 @@
 // under the License.
 
 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
-use std::cell::RefCell;
 use std::cmp;
 use std::io;
 use std::io::{ErrorKind, Read, Write};
-use std::rc::Rc;
 
-use super::{TTransport, TTransportFactory};
+use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
 
 /// Default capacity of the read buffer in bytes.
-const WRITE_BUFFER_CAPACITY: usize = 4096;
+const READ_CAPACITY: usize = 4096;
 
-/// Default capacity of the write buffer in bytes..
-const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+/// Default capacity of the write buffer in bytes.
+const WRITE_CAPACITY: usize = 4096;
 
-/// Transport that communicates with endpoints using framed messages.
+/// Transport that reads framed messages.
 ///
-/// A `TFramedTransport` maintains a fixed-size internal write buffer. All
-/// writes are made to this buffer and are sent to the wrapped transport only
-/// when `TTransport::flush()` is called. On a flush a fixed-length header with a
-/// count of the buffered bytes is written, followed by the bytes themselves.
-///
-/// A `TFramedTransport` also maintains a fixed-size internal read buffer.
-/// On a call to `TTransport::read(...)` one full message - both fixed-length
-/// header and bytes - is read from the wrapped transport and buffered.
-/// Subsequent read calls are serviced from the internal buffer until it is
-/// exhausted, at which point the next full message is read from the wrapped
-/// transport.
+/// A `TFramedReadTransport` maintains a fixed-size internal read buffer.
+/// On a call to `TFramedReadTransport::read(...)` one full message - both
+/// fixed-length header and bytes - is read from the wrapped channel and
+/// buffered. Subsequent read calls are serviced from the internal buffer
+/// until it is exhausted, at which point the next full message is read
+/// from the wrapped channel.
 ///
 /// # Examples
 ///
-/// Create and use a `TFramedTransport`.
+/// Create and use a `TFramedReadTransport`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use std::io::{Read, Write};
-/// use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+/// use std::io::Read;
+/// use thrift::transport::{TFramedReadTransport, TTcpChannel};
 ///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
 ///
-/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
-/// let mut t = TFramedTransport::new(t);
+/// let mut t = TFramedReadTransport::new(c);
 ///
-/// // read
 /// t.read(&mut vec![0u8; 1]).unwrap();
-///
-/// // write
-/// t.write(&[0x00]).unwrap();
-/// t.flush().unwrap();
 /// ```
-pub struct TFramedTransport {
-    rbuf: Box<[u8]>,
-    rpos: usize,
-    rcap: usize,
-    wbuf: Box<[u8]>,
-    wpos: usize,
-    inner: Rc<RefCell<Box<TTransport>>>,
+#[derive(Debug)]
+pub struct TFramedReadTransport<C>
+where
+    C: Read,
+{
+    buf: Box<[u8]>,
+    pos: usize,
+    cap: usize,
+    chan: C,
 }
 
-impl TFramedTransport {
+impl<C> TFramedReadTransport<C>
+where
+    C: Read,
+{
     /// Create a `TFramedTransport` with default-sized internal read and
-    /// write buffers that wraps an `inner` `TTransport`.
-    pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TFramedTransport {
-        TFramedTransport::with_capacity(WRITE_BUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner)
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TFramedReadTransport<C> {
+        TFramedReadTransport::with_capacity(READ_CAPACITY, channel)
     }
 
     /// Create a `TFramedTransport` with an internal read buffer of size
-    /// `read_buffer_capacity` and an internal write buffer of size
-    /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
-    pub fn with_capacity(read_buffer_capacity: usize,
-                         write_buffer_capacity: usize,
-                         inner: Rc<RefCell<Box<TTransport>>>)
-                         -> TFramedTransport {
-        TFramedTransport {
-            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
-            rpos: 0,
-            rcap: 0,
-            wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
-            wpos: 0,
-            inner: inner,
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(read_capacity: usize, channel: C) -> TFramedReadTransport<C> {
+        TFramedReadTransport {
+            buf: vec![0; read_capacity].into_boxed_slice(),
+            pos: 0,
+            cap: 0,
+            chan: channel,
         }
     }
 }
 
-impl Read for TFramedTransport {
+impl<C> Read for TFramedReadTransport<C>
+where
+    C: Read,
+{
     fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
-        if self.rcap - self.rpos == 0 {
-            let message_size = self.inner.borrow_mut().read_i32::<BigEndian>()? as usize;
-            if message_size > self.rbuf.len() {
-                return Err(io::Error::new(ErrorKind::Other,
-                                          format!("bytes to be read ({}) exceeds buffer \
+        if self.cap - self.pos == 0 {
+            let message_size = self.chan.read_i32::<BigEndian>()? as usize;
+            if message_size > self.buf.len() {
+                return Err(
+                    io::Error::new(
+                        ErrorKind::Other,
+                        format!(
+                            "bytes to be read ({}) exceeds buffer \
                                                    capacity ({})",
-                                                  message_size,
-                                                  self.rbuf.len())));
+                            message_size,
+                            self.buf.len()
+                        ),
+                    ),
+                );
             }
-            self.inner.borrow_mut().read_exact(&mut self.rbuf[..message_size])?;
-            self.rpos = 0;
-            self.rcap = message_size as usize;
+            self.chan.read_exact(&mut self.buf[..message_size])?;
+            self.pos = 0;
+            self.cap = message_size as usize;
         }
 
-        let nread = cmp::min(b.len(), self.rcap - self.rpos);
-        b[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
-        self.rpos += nread;
+        let nread = cmp::min(b.len(), self.cap - self.pos);
+        b[..nread].clone_from_slice(&self.buf[self.pos..self.pos + nread]);
+        self.pos += nread;
 
         Ok(nread)
     }
 }
 
-impl Write for TFramedTransport {
+/// Factory for creating instances of `TFramedReadTransport`.
+#[derive(Default)]
+pub struct TFramedReadTransportFactory;
+
+impl TFramedReadTransportFactory {
+    pub fn new() -> TFramedReadTransportFactory {
+        TFramedReadTransportFactory {}
+    }
+}
+
+impl TReadTransportFactory for TFramedReadTransportFactory {
+    /// Create a `TFramedReadTransport`.
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+        Box::new(TFramedReadTransport::new(channel))
+    }
+}
+
+/// Transport that writes framed messages.
+///
+/// A `TFramedWriteTransport` maintains a fixed-size internal write buffer. All
+/// writes are made to this buffer and are sent to the wrapped channel only
+/// when `TFramedWriteTransport::flush()` is called. On a flush a fixed-length
+/// header with a count of the buffered bytes is written, followed by the bytes
+/// themselves.
+///
+/// # Examples
+///
+/// Create and use a `TFramedWriteTransport`.
+///
+/// ```no_run
+/// use std::io::Write;
+/// use thrift::transport::{TFramedWriteTransport, TTcpChannel};
+///
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
+///
+/// let mut t = TFramedWriteTransport::new(c);
+///
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+#[derive(Debug)]
+pub struct TFramedWriteTransport<C>
+where
+    C: Write,
+{
+    buf: Box<[u8]>,
+    pos: usize,
+    channel: C,
+}
+
+impl<C> TFramedWriteTransport<C>
+where
+    C: Write,
+{
+    /// Create a `TFramedTransport` with default-sized internal read and
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TFramedWriteTransport<C> {
+        TFramedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
+    }
+
+    /// Create a `TFramedTransport` with an internal read buffer of size
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(write_capacity: usize, channel: C) -> TFramedWriteTransport<C> {
+        TFramedWriteTransport {
+            buf: vec![0; write_capacity].into_boxed_slice(),
+            pos: 0,
+            channel: channel,
+        }
+    }
+}
+
+impl<C> Write for TFramedWriteTransport<C>
+where
+    C: Write,
+{
     fn write(&mut self, b: &[u8]) -> io::Result<usize> {
-        if b.len() > (self.wbuf.len() - self.wpos) {
-            return Err(io::Error::new(ErrorKind::Other,
-                                      format!("bytes to be written ({}) exceeds buffer \
+        if b.len() > (self.buf.len() - self.pos) {
+            return Err(
+                io::Error::new(
+                    ErrorKind::Other,
+                    format!(
+                        "bytes to be written ({}) exceeds buffer \
                                                capacity ({})",
-                                              b.len(),
-                                              self.wbuf.len() - self.wpos)));
+                        b.len(),
+                        self.buf.len() - self.pos
+                    ),
+                ),
+            );
         }
 
         let nwrite = b.len(); // always less than available write buffer capacity
-        self.wbuf[self.wpos..(self.wpos + nwrite)].clone_from_slice(b);
-        self.wpos += nwrite;
+        self.buf[self.pos..(self.pos + nwrite)].clone_from_slice(b);
+        self.pos += nwrite;
         Ok(nwrite)
     }
 
     fn flush(&mut self) -> io::Result<()> {
-        let message_size = self.wpos;
+        let message_size = self.pos;
 
         if let 0 = message_size {
             return Ok(());
         } else {
-            self.inner.borrow_mut().write_i32::<BigEndian>(message_size as i32)?;
+            self.channel
+                .write_i32::<BigEndian>(message_size as i32)?;
         }
 
         let mut byte_index = 0;
-        while byte_index < self.wpos {
-            let nwrite = self.inner.borrow_mut().write(&self.wbuf[byte_index..self.wpos])?;
-            byte_index = cmp::min(byte_index + nwrite, self.wpos);
+        while byte_index < self.pos {
+            let nwrite = self.channel.write(&self.buf[byte_index..self.pos])?;
+            byte_index = cmp::min(byte_index + nwrite, self.pos);
         }
 
-        self.wpos = 0;
-        self.inner.borrow_mut().flush()
+        self.pos = 0;
+        self.channel.flush()
     }
 }
 
-/// Factory for creating instances of `TFramedTransport`.
+/// Factory for creating instances of `TFramedWriteTransport`.
 #[derive(Default)]
-pub struct TFramedTransportFactory;
+pub struct TFramedWriteTransportFactory;
 
-impl TFramedTransportFactory {
-    // Create a `TFramedTransportFactory`.
-    pub fn new() -> TFramedTransportFactory {
-        TFramedTransportFactory {}
+impl TFramedWriteTransportFactory {
+    pub fn new() -> TFramedWriteTransportFactory {
+        TFramedWriteTransportFactory {}
     }
 }
 
-impl TTransportFactory for TFramedTransportFactory {
-    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
-        Box::new(TFramedTransport::new(inner)) as Box<TTransport>
+impl TWriteTransportFactory for TFramedWriteTransportFactory {
+    /// Create a `TFramedWriteTransport`.
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+        Box::new(TFramedWriteTransport::new(channel))
     }
 }
 
@@ -183,5 +260,5 @@
     //    use std::io::{Read, Write};
     //
     //    use super::*;
-    //    use ::transport::mem::TBufferTransport;
+    //    use ::transport::mem::TBufferChannel;
 }