THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/lib/rs/src/transport/buffered.rs b/lib/rs/src/transport/buffered.rs
index 3f240d8..b588ec1 100644
--- a/lib/rs/src/transport/buffered.rs
+++ b/lib/rs/src/transport/buffered.rs
@@ -15,104 +15,94 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cell::RefCell;
 use std::cmp;
 use std::io;
 use std::io::{Read, Write};
-use std::rc::Rc;
 
-use super::{TTransport, TTransportFactory};
+use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
 
 /// Default capacity of the read buffer in bytes.
-const DEFAULT_RBUFFER_CAPACITY: usize = 4096;
+const READ_CAPACITY: usize = 4096;
 
 /// Default capacity of the write buffer in bytes..
-const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+const WRITE_CAPACITY: usize = 4096;
 
-/// Transport that communicates with endpoints using a byte stream.
+/// Transport that reads messages via an internal buffer.
 ///
-/// A `TBufferedTransport` maintains a fixed-size internal write buffer. All
-/// writes are made to this buffer and are sent to the wrapped transport only
-/// when `TTransport::flush()` is called. On a flush a fixed-length header with a
-/// count of the buffered bytes is written, followed by the bytes themselves.
-///
-/// A `TBufferedTransport` also maintains a fixed-size internal read buffer.
-/// On a call to `TTransport::read(...)` one full message - both fixed-length
-/// header and bytes - is read from the wrapped transport and buffered.
+/// A `TBufferedReadTransport` maintains a fixed-size internal read buffer.
+/// On a call to `TBufferedReadTransport::read(...)` one full message - both
+/// fixed-length header and bytes - is read from the wrapped channel and buffered.
 /// Subsequent read calls are serviced from the internal buffer until it is
 /// exhausted, at which point the next full message is read from the wrapped
-/// transport.
+/// channel.
 ///
 /// # Examples
 ///
-/// Create and use a `TBufferedTransport`.
+/// Create and use a `TBufferedReadTransport`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use std::io::{Read, Write};
-/// use thrift::transport::{TBufferedTransport, TTcpTransport, TTransport};
+/// use std::io::Read;
+/// use thrift::transport::{TBufferedReadTransport, TTcpChannel};
 ///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
 ///
-/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
-/// let mut t = TBufferedTransport::new(t);
+/// let mut t = TBufferedReadTransport::new(c);
 ///
-/// // read
 /// t.read(&mut vec![0u8; 1]).unwrap();
-///
-/// // write
-/// t.write(&[0x00]).unwrap();
-/// t.flush().unwrap();
 /// ```
-pub struct TBufferedTransport {
-    rbuf: Box<[u8]>,
-    rpos: usize,
-    rcap: usize,
-    wbuf: Vec<u8>,
-    inner: Rc<RefCell<Box<TTransport>>>,
+#[derive(Debug)]
+pub struct TBufferedReadTransport<C>
+where
+    C: Read,
+{
+    buf: Box<[u8]>,
+    pos: usize,
+    cap: usize,
+    chan: C,
 }
 
-impl TBufferedTransport {
+impl<C> TBufferedReadTransport<C>
+where
+    C: Read,
+{
     /// Create a `TBufferedTransport` with default-sized internal read and
-    /// write buffers that wraps an `inner` `TTransport`.
-    pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TBufferedTransport {
-        TBufferedTransport::with_capacity(DEFAULT_RBUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner)
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TBufferedReadTransport<C> {
+        TBufferedReadTransport::with_capacity(READ_CAPACITY, channel)
     }
 
     /// Create a `TBufferedTransport` with an internal read buffer of size
-    /// `read_buffer_capacity` and an internal write buffer of size
-    /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
-    pub fn with_capacity(read_buffer_capacity: usize,
-                         write_buffer_capacity: usize,
-                         inner: Rc<RefCell<Box<TTransport>>>)
-                         -> TBufferedTransport {
-        TBufferedTransport {
-            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
-            rpos: 0,
-            rcap: 0,
-            wbuf: Vec::with_capacity(write_buffer_capacity),
-            inner: inner,
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(read_capacity: usize, channel: C) -> TBufferedReadTransport<C> {
+        TBufferedReadTransport {
+            buf: vec![0; read_capacity].into_boxed_slice(),
+            pos: 0,
+            cap: 0,
+            chan: channel,
         }
     }
 
     fn get_bytes(&mut self) -> io::Result<&[u8]> {
-        if self.rcap - self.rpos == 0 {
-            self.rpos = 0;
-            self.rcap = self.inner.borrow_mut().read(&mut self.rbuf)?;
+        if self.cap - self.pos == 0 {
+            self.pos = 0;
+            self.cap = self.chan.read(&mut self.buf)?;
         }
 
-        Ok(&self.rbuf[self.rpos..self.rcap])
+        Ok(&self.buf[self.pos..self.cap])
     }
 
     fn consume(&mut self, consumed: usize) {
         // TODO: was a bug here += <-- test somehow
-        self.rpos = cmp::min(self.rcap, self.rpos + consumed);
+        self.pos = cmp::min(self.cap, self.pos + consumed);
     }
 }
 
-impl Read for TBufferedTransport {
+impl<C> Read for TBufferedReadTransport<C>
+where
+    C: Read,
+{
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
         let mut bytes_read = 0;
 
@@ -137,65 +127,127 @@
     }
 }
 
-impl Write for TBufferedTransport {
+/// Factory for creating instances of `TBufferedReadTransport`.
+#[derive(Default)]
+pub struct TBufferedReadTransportFactory;
+
+impl TBufferedReadTransportFactory {
+    pub fn new() -> TBufferedReadTransportFactory {
+        TBufferedReadTransportFactory {}
+    }
+}
+
+impl TReadTransportFactory for TBufferedReadTransportFactory {
+    /// Create a `TBufferedReadTransport`.
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+        Box::new(TBufferedReadTransport::new(channel))
+    }
+}
+
+/// Transport that writes messages via an internal buffer.
+///
+/// A `TBufferedWriteTransport` maintains a fixed-size internal write buffer.
+/// All writes are made to this buffer and are sent to the wrapped channel only
+/// when `TBufferedWriteTransport::flush()` is called. On a flush a fixed-length
+/// header with a count of the buffered bytes is written, followed by the bytes
+/// themselves.
+///
+/// # Examples
+///
+/// Create and use a `TBufferedWriteTransport`.
+///
+/// ```no_run
+/// use std::io::Write;
+/// use thrift::transport::{TBufferedWriteTransport, TTcpChannel};
+///
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
+///
+/// let mut t = TBufferedWriteTransport::new(c);
+///
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+#[derive(Debug)]
+pub struct TBufferedWriteTransport<C>
+where
+    C: Write,
+{
+    buf: Vec<u8>,
+    channel: C,
+}
+
+impl<C> TBufferedWriteTransport<C>
+where
+    C: Write,
+{
+    /// Create a `TBufferedTransport` with default-sized internal read and
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TBufferedWriteTransport<C> {
+        TBufferedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
+    }
+
+    /// Create a `TBufferedTransport` with an internal read buffer of size
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(write_capacity: usize, channel: C) -> TBufferedWriteTransport<C> {
+        TBufferedWriteTransport {
+            buf: Vec::with_capacity(write_capacity),
+            channel: channel,
+        }
+    }
+}
+
+impl<C> Write for TBufferedWriteTransport<C>
+where
+    C: Write,
+{
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let avail_bytes = cmp::min(buf.len(), self.wbuf.capacity() - self.wbuf.len());
-        self.wbuf.extend_from_slice(&buf[..avail_bytes]);
-        assert!(self.wbuf.len() <= self.wbuf.capacity(),
-                "copy overflowed buffer");
+        let avail_bytes = cmp::min(buf.len(), self.buf.capacity() - self.buf.len());
+        self.buf.extend_from_slice(&buf[..avail_bytes]);
+        assert!(
+            self.buf.len() <= self.buf.capacity(),
+            "copy overflowed buffer"
+        );
         Ok(avail_bytes)
     }
 
     fn flush(&mut self) -> io::Result<()> {
-        self.inner.borrow_mut().write_all(&self.wbuf)?;
-        self.inner.borrow_mut().flush()?;
-        self.wbuf.clear();
+        self.channel.write_all(&self.buf)?;
+        self.channel.flush()?;
+        self.buf.clear();
         Ok(())
     }
 }
 
-/// Factory for creating instances of `TBufferedTransport`
+/// Factory for creating instances of `TBufferedWriteTransport`.
 #[derive(Default)]
-pub struct TBufferedTransportFactory;
+pub struct TBufferedWriteTransportFactory;
 
-impl TBufferedTransportFactory {
-    /// Create a `TBufferedTransportFactory`.
-    pub fn new() -> TBufferedTransportFactory {
-        TBufferedTransportFactory {}
+impl TBufferedWriteTransportFactory {
+    pub fn new() -> TBufferedWriteTransportFactory {
+        TBufferedWriteTransportFactory {}
     }
 }
 
-impl TTransportFactory for TBufferedTransportFactory {
-    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
-        Box::new(TBufferedTransport::new(inner)) as Box<TTransport>
+impl TWriteTransportFactory for TBufferedWriteTransportFactory {
+    /// Create a `TBufferedWriteTransport`.
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+        Box::new(TBufferedWriteTransport::new(channel))
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::cell::RefCell;
     use std::io::{Read, Write};
-    use std::rc::Rc;
 
     use super::*;
-    use ::transport::{TPassThruTransport, TTransport};
-    use ::transport::mem::TBufferTransport;
-
-    macro_rules! new_transports {
-        ($wbc:expr, $rbc:expr) => (
-            {
-                let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity($wbc, $rbc))));
-                let thru: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
-                let thru = Rc::new(RefCell::new(thru));
-                (mem, thru)
-            }
-        );
-    }
+    use transport::TBufferChannel;
 
     #[test]
     fn must_return_zero_if_read_buffer_is_empty() {
-        let (_, thru) = new_transports!(10, 0);
-        let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+        let mem = TBufferChannel::with_capacity(10, 0);
+        let mut t = TBufferedReadTransport::with_capacity(10, mem);
 
         let mut b = vec![0; 10];
         let read_result = t.read(&mut b);
@@ -205,8 +257,8 @@
 
     #[test]
     fn must_return_zero_if_caller_reads_into_zero_capacity_buffer() {
-        let (_, thru) = new_transports!(10, 0);
-        let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+        let mem = TBufferChannel::with_capacity(10, 0);
+        let mut t = TBufferedReadTransport::with_capacity(10, mem);
 
         let read_result = t.read(&mut []);
 
@@ -215,10 +267,10 @@
 
     #[test]
     fn must_return_zero_if_nothing_more_can_be_read() {
-        let (mem, thru) = new_transports!(4, 0);
-        let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+        let mem = TBufferChannel::with_capacity(4, 0);
+        let mut t = TBufferedReadTransport::with_capacity(4, mem);
 
-        mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+        t.chan.set_readable_bytes(&[0, 1, 2, 3]);
 
         // read buffer is exactly the same size as bytes available
         let mut buf = vec![0u8; 4];
@@ -239,10 +291,10 @@
 
     #[test]
     fn must_fill_user_buffer_with_only_as_many_bytes_as_available() {
-        let (mem, thru) = new_transports!(4, 0);
-        let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+        let mem = TBufferChannel::with_capacity(4, 0);
+        let mut t = TBufferedReadTransport::with_capacity(4, mem);
 
-        mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+        t.chan.set_readable_bytes(&[0, 1, 2, 3]);
 
         // read buffer is much larger than the bytes available
         let mut buf = vec![0u8; 8];
@@ -268,15 +320,16 @@
 
         // we have a much smaller buffer than the
         // underlying transport has bytes available
-        let (mem, thru) = new_transports!(10, 0);
-        let mut t = TBufferedTransport::with_capacity(2, 0, thru);
+        let mem = TBufferChannel::with_capacity(10, 0);
+        let mut t = TBufferedReadTransport::with_capacity(2, mem);
 
         // fill the underlying transport's byte buffer
         let mut readable_bytes = [0u8; 10];
         for i in 0..10 {
             readable_bytes[i] = i as u8;
         }
-        mem.borrow_mut().set_readable_bytes(&readable_bytes);
+
+        t.chan.set_readable_bytes(&readable_bytes);
 
         // we ask to read into a buffer that's much larger
         // than the one the buffered transport has; as a result
@@ -312,8 +365,8 @@
 
     #[test]
     fn must_return_zero_if_nothing_can_be_written() {
-        let (_, thru) = new_transports!(0, 0);
-        let mut t = TBufferedTransport::with_capacity(0, 0, thru);
+        let mem = TBufferChannel::with_capacity(0, 0);
+        let mut t = TBufferedWriteTransport::with_capacity(0, mem);
 
         let b = vec![0; 10];
         let r = t.write(&b);
@@ -323,19 +376,20 @@
 
     #[test]
     fn must_return_zero_if_caller_calls_write_with_empty_buffer() {
-        let (mem, thru) = new_transports!(0, 10);
-        let mut t = TBufferedTransport::with_capacity(0, 10, thru);
+        let mem = TBufferChannel::with_capacity(0, 10);
+        let mut t = TBufferedWriteTransport::with_capacity(10, mem);
 
         let r = t.write(&[]);
+        let expected: [u8; 0] = [];
 
         assert_eq!(r.unwrap(), 0);
-        assert_eq!(mem.borrow_mut().write_buffer_as_ref(), &[]);
+        assert_eq_transport_written_bytes!(t, expected);
     }
 
     #[test]
     fn must_return_zero_if_write_buffer_full() {
-        let (_, thru) = new_transports!(0, 0);
-        let mut t = TBufferedTransport::with_capacity(0, 4, thru);
+        let mem = TBufferChannel::with_capacity(0, 0);
+        let mut t = TBufferedWriteTransport::with_capacity(4, mem);
 
         let b = [0x00, 0x01, 0x02, 0x03];
 
@@ -350,26 +404,22 @@
 
     #[test]
     fn must_only_write_to_inner_transport_on_flush() {
-        let (mem, thru) = new_transports!(10, 10);
-        let mut t = TBufferedTransport::new(thru);
+        let mem = TBufferChannel::with_capacity(10, 10);
+        let mut t = TBufferedWriteTransport::new(mem);
 
         let b: [u8; 5] = [0, 1, 2, 3, 4];
         assert_eq!(t.write(&b).unwrap(), 5);
-        assert_eq!(mem.borrow_mut().write_buffer_as_ref().len(), 0);
+        assert_eq_transport_num_written_bytes!(t, 0);
 
         assert!(t.flush().is_ok());
 
-        {
-            let inner = mem.borrow_mut();
-            let underlying_buffer = inner.write_buffer_as_ref();
-            assert_eq!(b, underlying_buffer);
-        }
+        assert_eq_transport_written_bytes!(t, b);
     }
 
     #[test]
     fn must_write_successfully_after_flush() {
-        let (mem, thru) = new_transports!(0, 5);
-        let mut t = TBufferedTransport::with_capacity(0, 5, thru);
+        let mem = TBufferChannel::with_capacity(0, 5);
+        let mut t = TBufferedWriteTransport::with_capacity(5, mem);
 
         // write and flush
         let b: [u8; 5] = [0, 1, 2, 3, 4];
@@ -377,24 +427,16 @@
         assert!(t.flush().is_ok());
 
         // check the flushed bytes
-        {
-            let inner = mem.borrow_mut();
-            let underlying_buffer = inner.write_buffer_as_ref();
-            assert_eq!(b, underlying_buffer);
-        }
+        assert_eq_transport_written_bytes!(t, b);
 
         // reset our underlying transport
-        mem.borrow_mut().empty_write_buffer();
+        t.channel.empty_write_buffer();
 
         // write and flush again
         assert_eq!(t.write(&b).unwrap(), 5);
         assert!(t.flush().is_ok());
 
         // check the flushed bytes
-        {
-            let inner = mem.borrow_mut();
-            let underlying_buffer = inner.write_buffer_as_ref();
-            assert_eq!(b, underlying_buffer);
-        }
+        assert_eq_transport_written_bytes!(t, b);
     }
 }
diff --git a/lib/rs/src/transport/framed.rs b/lib/rs/src/transport/framed.rs
index 75c12f4..d78d2f7 100644
--- a/lib/rs/src/transport/framed.rs
+++ b/lib/rs/src/transport/framed.rs
@@ -16,165 +16,242 @@
 // under the License.
 
 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
-use std::cell::RefCell;
 use std::cmp;
 use std::io;
 use std::io::{ErrorKind, Read, Write};
-use std::rc::Rc;
 
-use super::{TTransport, TTransportFactory};
+use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
 
 /// Default capacity of the read buffer in bytes.
-const WRITE_BUFFER_CAPACITY: usize = 4096;
+const READ_CAPACITY: usize = 4096;
 
-/// Default capacity of the write buffer in bytes..
-const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+/// Default capacity of the write buffer in bytes.
+const WRITE_CAPACITY: usize = 4096;
 
-/// Transport that communicates with endpoints using framed messages.
+/// Transport that reads framed messages.
 ///
-/// A `TFramedTransport` maintains a fixed-size internal write buffer. All
-/// writes are made to this buffer and are sent to the wrapped transport only
-/// when `TTransport::flush()` is called. On a flush a fixed-length header with a
-/// count of the buffered bytes is written, followed by the bytes themselves.
-///
-/// A `TFramedTransport` also maintains a fixed-size internal read buffer.
-/// On a call to `TTransport::read(...)` one full message - both fixed-length
-/// header and bytes - is read from the wrapped transport and buffered.
-/// Subsequent read calls are serviced from the internal buffer until it is
-/// exhausted, at which point the next full message is read from the wrapped
-/// transport.
+/// A `TFramedReadTransport` maintains a fixed-size internal read buffer.
+/// On a call to `TFramedReadTransport::read(...)` one full message - both
+/// fixed-length header and bytes - is read from the wrapped channel and
+/// buffered. Subsequent read calls are serviced from the internal buffer
+/// until it is exhausted, at which point the next full message is read
+/// from the wrapped channel.
 ///
 /// # Examples
 ///
-/// Create and use a `TFramedTransport`.
+/// Create and use a `TFramedReadTransport`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use std::io::{Read, Write};
-/// use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+/// use std::io::Read;
+/// use thrift::transport::{TFramedReadTransport, TTcpChannel};
 ///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
 ///
-/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
-/// let mut t = TFramedTransport::new(t);
+/// let mut t = TFramedReadTransport::new(c);
 ///
-/// // read
 /// t.read(&mut vec![0u8; 1]).unwrap();
-///
-/// // write
-/// t.write(&[0x00]).unwrap();
-/// t.flush().unwrap();
 /// ```
-pub struct TFramedTransport {
-    rbuf: Box<[u8]>,
-    rpos: usize,
-    rcap: usize,
-    wbuf: Box<[u8]>,
-    wpos: usize,
-    inner: Rc<RefCell<Box<TTransport>>>,
+#[derive(Debug)]
+pub struct TFramedReadTransport<C>
+where
+    C: Read,
+{
+    buf: Box<[u8]>,
+    pos: usize,
+    cap: usize,
+    chan: C,
 }
 
-impl TFramedTransport {
+impl<C> TFramedReadTransport<C>
+where
+    C: Read,
+{
     /// Create a `TFramedTransport` with default-sized internal read and
-    /// write buffers that wraps an `inner` `TTransport`.
-    pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TFramedTransport {
-        TFramedTransport::with_capacity(WRITE_BUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner)
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TFramedReadTransport<C> {
+        TFramedReadTransport::with_capacity(READ_CAPACITY, channel)
     }
 
     /// Create a `TFramedTransport` with an internal read buffer of size
-    /// `read_buffer_capacity` and an internal write buffer of size
-    /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
-    pub fn with_capacity(read_buffer_capacity: usize,
-                         write_buffer_capacity: usize,
-                         inner: Rc<RefCell<Box<TTransport>>>)
-                         -> TFramedTransport {
-        TFramedTransport {
-            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
-            rpos: 0,
-            rcap: 0,
-            wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
-            wpos: 0,
-            inner: inner,
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(read_capacity: usize, channel: C) -> TFramedReadTransport<C> {
+        TFramedReadTransport {
+            buf: vec![0; read_capacity].into_boxed_slice(),
+            pos: 0,
+            cap: 0,
+            chan: channel,
         }
     }
 }
 
-impl Read for TFramedTransport {
+impl<C> Read for TFramedReadTransport<C>
+where
+    C: Read,
+{
     fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
-        if self.rcap - self.rpos == 0 {
-            let message_size = self.inner.borrow_mut().read_i32::<BigEndian>()? as usize;
-            if message_size > self.rbuf.len() {
-                return Err(io::Error::new(ErrorKind::Other,
-                                          format!("bytes to be read ({}) exceeds buffer \
+        if self.cap - self.pos == 0 {
+            let message_size = self.chan.read_i32::<BigEndian>()? as usize;
+            if message_size > self.buf.len() {
+                return Err(
+                    io::Error::new(
+                        ErrorKind::Other,
+                        format!(
+                            "bytes to be read ({}) exceeds buffer \
                                                    capacity ({})",
-                                                  message_size,
-                                                  self.rbuf.len())));
+                            message_size,
+                            self.buf.len()
+                        ),
+                    ),
+                );
             }
-            self.inner.borrow_mut().read_exact(&mut self.rbuf[..message_size])?;
-            self.rpos = 0;
-            self.rcap = message_size as usize;
+            self.chan.read_exact(&mut self.buf[..message_size])?;
+            self.pos = 0;
+            self.cap = message_size as usize;
         }
 
-        let nread = cmp::min(b.len(), self.rcap - self.rpos);
-        b[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
-        self.rpos += nread;
+        let nread = cmp::min(b.len(), self.cap - self.pos);
+        b[..nread].clone_from_slice(&self.buf[self.pos..self.pos + nread]);
+        self.pos += nread;
 
         Ok(nread)
     }
 }
 
-impl Write for TFramedTransport {
+/// Factory for creating instances of `TFramedReadTransport`.
+#[derive(Default)]
+pub struct TFramedReadTransportFactory;
+
+impl TFramedReadTransportFactory {
+    pub fn new() -> TFramedReadTransportFactory {
+        TFramedReadTransportFactory {}
+    }
+}
+
+impl TReadTransportFactory for TFramedReadTransportFactory {
+    /// Create a `TFramedReadTransport`.
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+        Box::new(TFramedReadTransport::new(channel))
+    }
+}
+
+/// Transport that writes framed messages.
+///
+/// A `TFramedWriteTransport` maintains a fixed-size internal write buffer. All
+/// writes are made to this buffer and are sent to the wrapped channel only
+/// when `TFramedWriteTransport::flush()` is called. On a flush a fixed-length
+/// header with a count of the buffered bytes is written, followed by the bytes
+/// themselves.
+///
+/// # Examples
+///
+/// Create and use a `TFramedWriteTransport`.
+///
+/// ```no_run
+/// use std::io::Write;
+/// use thrift::transport::{TFramedWriteTransport, TTcpChannel};
+///
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
+///
+/// let mut t = TFramedWriteTransport::new(c);
+///
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+#[derive(Debug)]
+pub struct TFramedWriteTransport<C>
+where
+    C: Write,
+{
+    buf: Box<[u8]>,
+    pos: usize,
+    channel: C,
+}
+
+impl<C> TFramedWriteTransport<C>
+where
+    C: Write,
+{
+    /// Create a `TFramedTransport` with default-sized internal read and
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TFramedWriteTransport<C> {
+        TFramedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
+    }
+
+    /// Create a `TFramedTransport` with an internal read buffer of size
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(write_capacity: usize, channel: C) -> TFramedWriteTransport<C> {
+        TFramedWriteTransport {
+            buf: vec![0; write_capacity].into_boxed_slice(),
+            pos: 0,
+            channel: channel,
+        }
+    }
+}
+
+impl<C> Write for TFramedWriteTransport<C>
+where
+    C: Write,
+{
     fn write(&mut self, b: &[u8]) -> io::Result<usize> {
-        if b.len() > (self.wbuf.len() - self.wpos) {
-            return Err(io::Error::new(ErrorKind::Other,
-                                      format!("bytes to be written ({}) exceeds buffer \
+        if b.len() > (self.buf.len() - self.pos) {
+            return Err(
+                io::Error::new(
+                    ErrorKind::Other,
+                    format!(
+                        "bytes to be written ({}) exceeds buffer \
                                                capacity ({})",
-                                              b.len(),
-                                              self.wbuf.len() - self.wpos)));
+                        b.len(),
+                        self.buf.len() - self.pos
+                    ),
+                ),
+            );
         }
 
         let nwrite = b.len(); // always less than available write buffer capacity
-        self.wbuf[self.wpos..(self.wpos + nwrite)].clone_from_slice(b);
-        self.wpos += nwrite;
+        self.buf[self.pos..(self.pos + nwrite)].clone_from_slice(b);
+        self.pos += nwrite;
         Ok(nwrite)
     }
 
     fn flush(&mut self) -> io::Result<()> {
-        let message_size = self.wpos;
+        let message_size = self.pos;
 
         if let 0 = message_size {
             return Ok(());
         } else {
-            self.inner.borrow_mut().write_i32::<BigEndian>(message_size as i32)?;
+            self.channel
+                .write_i32::<BigEndian>(message_size as i32)?;
         }
 
         let mut byte_index = 0;
-        while byte_index < self.wpos {
-            let nwrite = self.inner.borrow_mut().write(&self.wbuf[byte_index..self.wpos])?;
-            byte_index = cmp::min(byte_index + nwrite, self.wpos);
+        while byte_index < self.pos {
+            let nwrite = self.channel.write(&self.buf[byte_index..self.pos])?;
+            byte_index = cmp::min(byte_index + nwrite, self.pos);
         }
 
-        self.wpos = 0;
-        self.inner.borrow_mut().flush()
+        self.pos = 0;
+        self.channel.flush()
     }
 }
 
-/// Factory for creating instances of `TFramedTransport`.
+/// Factory for creating instances of `TFramedWriteTransport`.
 #[derive(Default)]
-pub struct TFramedTransportFactory;
+pub struct TFramedWriteTransportFactory;
 
-impl TFramedTransportFactory {
-    // Create a `TFramedTransportFactory`.
-    pub fn new() -> TFramedTransportFactory {
-        TFramedTransportFactory {}
+impl TFramedWriteTransportFactory {
+    pub fn new() -> TFramedWriteTransportFactory {
+        TFramedWriteTransportFactory {}
     }
 }
 
-impl TTransportFactory for TFramedTransportFactory {
-    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
-        Box::new(TFramedTransport::new(inner)) as Box<TTransport>
+impl TWriteTransportFactory for TFramedWriteTransportFactory {
+    /// Create a `TFramedWriteTransport`.
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+        Box::new(TFramedWriteTransport::new(channel))
     }
 }
 
@@ -183,5 +260,5 @@
     //    use std::io::{Read, Write};
     //
     //    use super::*;
-    //    use ::transport::mem::TBufferTransport;
+    //    use ::transport::mem::TBufferChannel;
 }
diff --git a/lib/rs/src/transport/mem.rs b/lib/rs/src/transport/mem.rs
index 97ec503..86ac6bb 100644
--- a/lib/rs/src/transport/mem.rs
+++ b/lib/rs/src/transport/mem.rs
@@ -17,9 +17,11 @@
 
 use std::cmp;
 use std::io;
+use std::sync::{Arc, Mutex};
 
-/// Simple transport that contains both a fixed-length internal read buffer and
-/// a fixed-length internal write buffer.
+use super::{ReadHalf, TIoChannel, WriteHalf};
+
+/// In-memory read and write channel with fixed-size read and write buffers.
 ///
 /// On a `write` bytes are written to the internal write buffer. Writes are no
 /// longer accepted once this buffer is full. Callers must `empty_write_buffer()`
@@ -29,37 +31,61 @@
 /// `set_readable_bytes(...)`. Callers can then read until the buffer is
 /// depleted. No further reads are accepted until the internal read buffer is
 /// replenished again.
-pub struct TBufferTransport {
-    rbuf: Box<[u8]>,
-    rpos: usize,
-    ridx: usize,
-    rcap: usize,
-    wbuf: Box<[u8]>,
-    wpos: usize,
-    wcap: usize,
+#[derive(Debug)]
+pub struct TBufferChannel {
+    read: Arc<Mutex<ReadData>>,
+    write: Arc<Mutex<WriteData>>,
 }
 
-impl TBufferTransport {
-    /// Constructs a new, empty `TBufferTransport` with the given
+#[derive(Debug)]
+struct ReadData {
+    buf: Box<[u8]>,
+    pos: usize,
+    idx: usize,
+    cap: usize,
+}
+
+#[derive(Debug)]
+struct WriteData {
+    buf: Box<[u8]>,
+    pos: usize,
+    cap: usize,
+}
+
+impl TBufferChannel {
+    /// Constructs a new, empty `TBufferChannel` with the given
     /// read buffer capacity and write buffer capacity.
-    pub fn with_capacity(read_buffer_capacity: usize,
-                         write_buffer_capacity: usize)
-                         -> TBufferTransport {
-        TBufferTransport {
-            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
-            ridx: 0,
-            rpos: 0,
-            rcap: read_buffer_capacity,
-            wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
-            wpos: 0,
-            wcap: write_buffer_capacity,
+    pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel {
+        TBufferChannel {
+            read: Arc::new(
+                Mutex::new(
+                    ReadData {
+                        buf: vec![0; read_capacity].into_boxed_slice(),
+                        idx: 0,
+                        pos: 0,
+                        cap: read_capacity,
+                    },
+                ),
+            ),
+            write: Arc::new(
+                Mutex::new(
+                    WriteData {
+                        buf: vec![0; write_capacity].into_boxed_slice(),
+                        pos: 0,
+                        cap: write_capacity,
+                    },
+                ),
+            ),
         }
     }
 
-    /// Return a slice containing the bytes held by the internal read buffer.
-    /// Returns an empty slice if no readable bytes are present.
-    pub fn read_buffer(&self) -> &[u8] {
-        &self.rbuf[..self.ridx]
+    /// Return a copy of the bytes held by the internal read buffer.
+    /// Returns an empty vector if no readable bytes are present.
+    pub fn read_bytes(&self) -> Vec<u8> {
+        let rdata = self.read.as_ref().lock().unwrap();
+        let mut buf = vec![0u8; rdata.idx];
+        buf.copy_from_slice(&rdata.buf[..rdata.idx]);
+        buf
     }
 
     // FIXME: do I really need this API call?
@@ -68,8 +94,9 @@
     ///
     /// Subsequent calls to `read` will return nothing.
     pub fn empty_read_buffer(&mut self) {
-        self.rpos = 0;
-        self.ridx = 0;
+        let mut rdata = self.read.as_ref().lock().unwrap();
+        rdata.pos = 0;
+        rdata.idx = 0;
     }
 
     /// Copy bytes from the source buffer `buf` into the internal read buffer,
@@ -77,37 +104,36 @@
     /// which is `min(buf.len(), internal_read_buf.len())`.
     pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
         self.empty_read_buffer();
-        let max_bytes = cmp::min(self.rcap, buf.len());
-        self.rbuf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
-        self.ridx = max_bytes;
+        let mut rdata = self.read.as_ref().lock().unwrap();
+        let max_bytes = cmp::min(rdata.cap, buf.len());
+        rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
+        rdata.idx = max_bytes;
         max_bytes
     }
 
-    /// Return a slice containing the bytes held by the internal write buffer.
-    /// Returns an empty slice if no bytes were written.
-    pub fn write_buffer_as_ref(&self) -> &[u8] {
-        &self.wbuf[..self.wpos]
-    }
-
-    /// Return a vector with a copy of the bytes held by the internal write buffer.
+    /// Return a copy of the bytes held by the internal write buffer.
     /// Returns an empty vector if no bytes were written.
-    pub fn write_buffer_to_vec(&self) -> Vec<u8> {
-        let mut buf = vec![0u8; self.wpos];
-        buf.copy_from_slice(&self.wbuf[..self.wpos]);
+    pub fn write_bytes(&self) -> Vec<u8> {
+        let wdata = self.write.as_ref().lock().unwrap();
+        let mut buf = vec![0u8; wdata.pos];
+        buf.copy_from_slice(&wdata.buf[..wdata.pos]);
         buf
     }
 
     /// Resets the internal write buffer, making it seem like no bytes were
-    /// written. Calling `write_buffer` after this returns an empty slice.
+    /// written. Calling `write_buffer` after this returns an empty vector.
     pub fn empty_write_buffer(&mut self) {
-        self.wpos = 0;
+        let mut wdata = self.write.as_ref().lock().unwrap();
+        wdata.pos = 0;
     }
 
     /// Overwrites the contents of the read buffer with the contents of the
     /// write buffer. The write buffer is emptied after this operation.
     pub fn copy_write_buffer_to_read_buffer(&mut self) {
+        // FIXME: redo this entire method
         let buf = {
-            let b = self.write_buffer_as_ref();
+            let wdata = self.write.as_ref().lock().unwrap();
+            let b = &wdata.buf[..wdata.pos];
             let mut b_ret = vec![0; b.len()];
             b_ret.copy_from_slice(b);
             b_ret
@@ -120,20 +146,45 @@
     }
 }
 
-impl io::Read for TBufferTransport {
+impl TIoChannel for TBufferChannel {
+    fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
+    where
+        Self: Sized,
+    {
+        Ok(
+            (ReadHalf {
+                 handle: TBufferChannel {
+                     read: self.read.clone(),
+                     write: self.write.clone(),
+                 },
+             },
+             WriteHalf {
+                 handle: TBufferChannel {
+                     read: self.read.clone(),
+                     write: self.write.clone(),
+                 },
+             }),
+        )
+    }
+}
+
+impl io::Read for TBufferChannel {
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        let nread = cmp::min(buf.len(), self.ridx - self.rpos);
-        buf[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
-        self.rpos += nread;
+        let mut rdata = self.read.as_ref().lock().unwrap();
+        let nread = cmp::min(buf.len(), rdata.idx - rdata.pos);
+        buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]);
+        rdata.pos += nread;
         Ok(nread)
     }
 }
 
-impl io::Write for TBufferTransport {
+impl io::Write for TBufferChannel {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let nwrite = cmp::min(buf.len(), self.wcap - self.wpos);
-        self.wbuf[self.wpos..self.wpos + nwrite].clone_from_slice(&buf[..nwrite]);
-        self.wpos += nwrite;
+        let mut wdata = self.write.as_ref().lock().unwrap();
+        let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos);
+        let (start, end) = (wdata.pos, wdata.pos + nwrite);
+        wdata.buf[start..end].clone_from_slice(&buf[..nwrite]);
+        wdata.pos += nwrite;
         Ok(nwrite)
     }
 
@@ -146,68 +197,68 @@
 mod tests {
     use std::io::{Read, Write};
 
-    use super::TBufferTransport;
+    use super::TBufferChannel;
 
     #[test]
     fn must_empty_write_buffer() {
-        let mut t = TBufferTransport::with_capacity(0, 1);
+        let mut t = TBufferChannel::with_capacity(0, 1);
 
         let bytes_to_write: [u8; 1] = [0x01];
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 1);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
+        assert_eq!(&t.write_bytes(), &bytes_to_write);
 
         t.empty_write_buffer();
-        assert_eq!(t.write_buffer_as_ref().len(), 0);
+        assert_eq!(t.write_bytes().len(), 0);
     }
 
     #[test]
     fn must_accept_writes_after_buffer_emptied() {
-        let mut t = TBufferTransport::with_capacity(0, 2);
+        let mut t = TBufferChannel::with_capacity(0, 2);
 
         let bytes_to_write: [u8; 2] = [0x01, 0x02];
 
         // first write (all bytes written)
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 2);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
+        assert_eq!(&t.write_bytes(), &bytes_to_write);
 
         // try write again (nothing should be written)
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 0);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write); // still the same as before
+        assert_eq!(&t.write_bytes(), &bytes_to_write); // still the same as before
 
         // now reset the buffer
         t.empty_write_buffer();
-        assert_eq!(t.write_buffer_as_ref().len(), 0);
+        assert_eq!(t.write_bytes().len(), 0);
 
         // now try write again - the write should succeed
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 2);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
+        assert_eq!(&t.write_bytes(), &bytes_to_write);
     }
 
     #[test]
     fn must_accept_multiple_writes_until_buffer_is_full() {
-        let mut t = TBufferTransport::with_capacity(0, 10);
+        let mut t = TBufferChannel::with_capacity(0, 10);
 
         // first write (all bytes written)
         let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
         let write_0_result = t.write(&bytes_to_write_0);
         assert_eq!(write_0_result.unwrap(), 2);
-        assert_eq!(t.write_buffer_as_ref(), &bytes_to_write_0);
+        assert_eq!(t.write_bytes(), &bytes_to_write_0);
 
         // second write (all bytes written, starting at index 2)
         let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
         let write_1_result = t.write(&bytes_to_write_1);
         assert_eq!(write_1_result.unwrap(), 7);
-        assert_eq!(&t.write_buffer_as_ref()[2..], &bytes_to_write_1);
+        assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1);
 
         // third write (only 1 byte written - that's all we have space for)
         let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
         let write_2_result = t.write(&bytes_to_write_2);
         assert_eq!(write_2_result.unwrap(), 1);
-        assert_eq!(&t.write_buffer_as_ref()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
+        assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
 
         // fourth write (no writes are accepted)
         let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
@@ -219,50 +270,50 @@
         expected.extend_from_slice(&bytes_to_write_0);
         expected.extend_from_slice(&bytes_to_write_1);
         expected.extend_from_slice(&bytes_to_write_2[0..1]);
-        assert_eq!(t.write_buffer_as_ref(), &expected[..]);
+        assert_eq!(t.write_bytes(), &expected[..]);
     }
 
     #[test]
     fn must_empty_read_buffer() {
-        let mut t = TBufferTransport::with_capacity(1, 0);
+        let mut t = TBufferChannel::with_capacity(1, 0);
 
         let bytes_to_read: [u8; 1] = [0x01];
         let result = t.set_readable_bytes(&bytes_to_read);
         assert_eq!(result, 1);
-        assert_eq!(&t.read_buffer(), &bytes_to_read);
+        assert_eq!(t.read_bytes(), &bytes_to_read);
 
         t.empty_read_buffer();
-        assert_eq!(t.read_buffer().len(), 0);
+        assert_eq!(t.read_bytes().len(), 0);
     }
 
     #[test]
     fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
-        let mut t = TBufferTransport::with_capacity(1, 0);
+        let mut t = TBufferChannel::with_capacity(1, 0);
 
         let bytes_to_read_0: [u8; 1] = [0x01];
         let result = t.set_readable_bytes(&bytes_to_read_0);
         assert_eq!(result, 1);
-        assert_eq!(&t.read_buffer(), &bytes_to_read_0);
+        assert_eq!(t.read_bytes(), &bytes_to_read_0);
 
         t.empty_read_buffer();
-        assert_eq!(t.read_buffer().len(), 0);
+        assert_eq!(t.read_bytes().len(), 0);
 
         let bytes_to_read_1: [u8; 1] = [0x02];
         let result = t.set_readable_bytes(&bytes_to_read_1);
         assert_eq!(result, 1);
-        assert_eq!(&t.read_buffer(), &bytes_to_read_1);
+        assert_eq!(t.read_bytes(), &bytes_to_read_1);
     }
 
     #[test]
     fn must_accept_multiple_reads_until_all_bytes_read() {
-        let mut t = TBufferTransport::with_capacity(10, 0);
+        let mut t = TBufferChannel::with_capacity(10, 0);
 
         let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
 
         // check that we're able to set the bytes to be read
         let result = t.set_readable_bytes(&readable_bytes);
         assert_eq!(result, 10);
-        assert_eq!(&t.read_buffer(), &readable_bytes);
+        assert_eq!(t.read_bytes(), &readable_bytes);
 
         // first read
         let mut read_buf_0 = vec![0; 5];
@@ -300,21 +351,21 @@
 
     #[test]
     fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
-        let mut t = TBufferTransport::with_capacity(3, 0);
+        let mut t = TBufferChannel::with_capacity(3, 0);
 
         let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
 
         // check that we're able to set the bytes to be read
         let result = t.set_readable_bytes(&readable_bytes_0);
         assert_eq!(result, 3);
-        assert_eq!(&t.read_buffer(), &readable_bytes_0);
+        assert_eq!(t.read_bytes(), &readable_bytes_0);
 
         let mut read_buf = vec![0; 4];
 
         // drain the read buffer
         let read_result = t.read(&mut read_buf);
         assert_eq!(read_result.unwrap(), 3);
-        assert_eq!(t.read_buffer(), &read_buf[0..3]);
+        assert_eq!(t.read_bytes(), &read_buf[0..3]);
 
         // check that a subsequent read fails
         let read_result = t.read(&mut read_buf);
@@ -332,11 +383,11 @@
         // check that we're able to set the bytes to be read
         let result = t.set_readable_bytes(&readable_bytes_1);
         assert_eq!(result, 2);
-        assert_eq!(&t.read_buffer(), &readable_bytes_1);
+        assert_eq!(t.read_bytes(), &readable_bytes_1);
 
         // read again
         let read_result = t.read(&mut read_buf);
         assert_eq!(read_result.unwrap(), 2);
-        assert_eq!(t.read_buffer(), &read_buf[0..2]);
+        assert_eq!(t.read_bytes(), &read_buf[0..2]);
     }
 }
diff --git a/lib/rs/src/transport/mod.rs b/lib/rs/src/transport/mod.rs
index 1c39f50..9392786 100644
--- a/lib/rs/src/transport/mod.rs
+++ b/lib/rs/src/transport/mod.rs
@@ -15,37 +15,266 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Types required to send and receive bytes over an I/O channel.
+//! Types used to send and receive bytes over an I/O channel.
 //!
-//! The core type is the `TTransport` trait, through which a `TProtocol` can
-//! send and receive primitives over the wire. While `TProtocol` instances deal
-//! with primitive types, `TTransport` instances understand only bytes.
+//! The core types are the `TReadTransport`, `TWriteTransport` and the
+//! `TIoChannel` traits, through which `TInputProtocol` or
+//! `TOutputProtocol` can receive and send primitives over the wire. While
+//! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives
+//! the types in this module understand only bytes.
 
-use std::cell::RefCell;
 use std::io;
-use std::rc::Rc;
+use std::io::{Read, Write};
+use std::ops::{Deref, DerefMut};
+
+#[cfg(test)]
+macro_rules! assert_eq_transport_num_written_bytes {
+    ($transport:ident, $num_written_bytes:expr) => {
+        {
+            assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes);
+        }
+    };
+}
+
+
+#[cfg(test)]
+macro_rules! assert_eq_transport_written_bytes {
+    ($transport:ident, $expected_bytes:ident) => {
+        {
+            assert_eq!($transport.channel.write_bytes(), &$expected_bytes);
+        }
+    };
+}
 
 mod buffered;
 mod framed;
-mod passthru;
 mod socket;
+mod mem;
 
-pub mod mem;
+pub use self::buffered::{TBufferedReadTransport, TBufferedReadTransportFactory,
+                         TBufferedWriteTransport, TBufferedWriteTransportFactory};
+pub use self::framed::{TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport,
+                       TFramedWriteTransportFactory};
+pub use self::mem::TBufferChannel;
+pub use self::socket::TTcpChannel;
 
-pub use self::mem::TBufferTransport;
-pub use self::buffered::{TBufferedTransport, TBufferedTransportFactory};
-pub use self::framed::{TFramedTransport, TFramedTransportFactory};
-pub use self::passthru::TPassThruTransport;
-pub use self::socket::TTcpTransport;
+/// Identifies a transport used by a `TInputProtocol` to receive bytes.
+pub trait TReadTransport: Read {}
 
-/// Identifies an I/O channel that can be used to send and receive bytes.
-pub trait TTransport: io::Read + io::Write {}
-impl<I: io::Read + io::Write> TTransport for I {}
+/// Helper type used by a server to create `TReadTransport` instances for
+/// accepted client connections.
+pub trait TReadTransportFactory {
+    /// Create a `TTransport` that wraps a channel over which bytes are to be read.
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send>;
+}
 
-/// Helper type used by servers to create `TTransport` instances for accepted
-/// client connections.
-pub trait TTransportFactory {
-    /// Create a `TTransport` that wraps an `inner` transport, thus creating
-    /// a transport stack.
-    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport>;
+/// Identifies a transport used by `TOutputProtocol` to send bytes.
+pub trait TWriteTransport: Write {}
+
+/// Helper type used by a server to create `TWriteTransport` instances for
+/// accepted client connections.
+pub trait TWriteTransportFactory {
+    /// Create a `TTransport` that wraps a channel over which bytes are to be sent.
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send>;
+}
+
+impl<T> TReadTransport for T
+where
+    T: Read,
+{
+}
+
+impl<T> TWriteTransport for T
+where
+    T: Write,
+{
+}
+
+// FIXME: implement the Debug trait for boxed transports
+
+impl<T> TReadTransportFactory for Box<T>
+where
+    T: TReadTransportFactory + ?Sized,
+{
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+        (**self).create(channel)
+    }
+}
+
+impl<T> TWriteTransportFactory for Box<T>
+where
+    T: TWriteTransportFactory + ?Sized,
+{
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+        (**self).create(channel)
+    }
+}
+
+/// Identifies a splittable bidirectional I/O channel used to send and receive bytes.
+pub trait TIoChannel: Read + Write {
+    /// Split the channel into a readable half and a writable half, where the
+    /// readable half implements `io::Read` and the writable half implements
+    /// `io::Write`. Returns `None` if the channel was not initialized, or if it
+    /// cannot be split safely.
+    ///
+    /// Returned halves may share the underlying OS channel or buffer resources.
+    /// Implementations **should ensure** that these two halves can be safely
+    /// used independently by concurrent threads.
+    fn split(self) -> ::Result<(::transport::ReadHalf<Self>, ::transport::WriteHalf<Self>)>
+    where
+        Self: Sized;
+}
+
+/// The readable half of an object returned from `TIoChannel::split`.
+#[derive(Debug)]
+pub struct ReadHalf<C>
+where
+    C: Read,
+{
+    handle: C,
+}
+
+/// The writable half of an object returned from `TIoChannel::split`.
+#[derive(Debug)]
+pub struct WriteHalf<C>
+where
+    C: Write,
+{
+    handle: C,
+}
+
+impl<C> Read for ReadHalf<C>
+where
+    C: Read,
+{
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.handle.read(buf)
+    }
+}
+
+impl<C> Write for WriteHalf<C>
+where
+    C: Write,
+{
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.handle.write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.handle.flush()
+    }
+}
+
+impl<C> Deref for ReadHalf<C>
+where
+    C: Read,
+{
+    type Target = C;
+
+    fn deref(&self) -> &Self::Target {
+        &self.handle
+    }
+}
+
+impl<C> DerefMut for ReadHalf<C>
+where
+    C: Read,
+{
+    fn deref_mut(&mut self) -> &mut C {
+        &mut self.handle
+    }
+}
+
+impl<C> Deref for WriteHalf<C>
+where
+    C: Write,
+{
+    type Target = C;
+
+    fn deref(&self) -> &Self::Target {
+        &self.handle
+    }
+}
+
+impl<C> DerefMut for WriteHalf<C>
+where
+    C: Write,
+{
+    fn deref_mut(&mut self) -> &mut C {
+        &mut self.handle
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use std::io::Cursor;
+
+    use super::*;
+
+    #[test]
+    fn must_create_usable_read_channel_from_concrete_read_type() {
+        let r = Cursor::new([0, 1, 2]);
+        let _ = TBufferedReadTransport::new(r);
+    }
+
+    #[test]
+    fn must_create_usable_read_channel_from_boxed_read() {
+        let r: Box<Read> = Box::new(Cursor::new([0, 1, 2]));
+        let _ = TBufferedReadTransport::new(r);
+    }
+
+    #[test]
+    fn must_create_usable_write_channel_from_concrete_write_type() {
+        let w = vec![0u8; 10];
+        let _ = TBufferedWriteTransport::new(w);
+    }
+
+    #[test]
+    fn must_create_usable_write_channel_from_boxed_write() {
+        let w: Box<Write> = Box::new(vec![0u8; 10]);
+        let _ = TBufferedWriteTransport::new(w);
+    }
+
+    #[test]
+    fn must_create_usable_read_transport_from_concrete_read_transport() {
+        let r = Cursor::new([0, 1, 2]);
+        let mut t = TBufferedReadTransport::new(r);
+        takes_read_transport(&mut t)
+    }
+
+    #[test]
+    fn must_create_usable_read_transport_from_boxed_read() {
+        let r = Cursor::new([0, 1, 2]);
+        let mut t: Box<TReadTransport> = Box::new(TBufferedReadTransport::new(r));
+        takes_read_transport(&mut t)
+    }
+
+    #[test]
+    fn must_create_usable_write_transport_from_concrete_write_transport() {
+        let w = vec![0u8; 10];
+        let mut t = TBufferedWriteTransport::new(w);
+        takes_write_transport(&mut t)
+    }
+
+    #[test]
+    fn must_create_usable_write_transport_from_boxed_write() {
+        let w = vec![0u8; 10];
+        let mut t: Box<TWriteTransport> = Box::new(TBufferedWriteTransport::new(w));
+        takes_write_transport(&mut t)
+    }
+
+    fn takes_read_transport<R>(t: &mut R)
+    where
+        R: TReadTransport,
+    {
+        t.bytes();
+    }
+
+    fn takes_write_transport<W>(t: &mut W)
+    where
+        W: TWriteTransport,
+    {
+        t.flush().unwrap();
+    }
 }
diff --git a/lib/rs/src/transport/passthru.rs b/lib/rs/src/transport/passthru.rs
deleted file mode 100644
index 60dc3a6..0000000
--- a/lib/rs/src/transport/passthru.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::cell::RefCell;
-use std::rc::Rc;
-use std::io;
-use std::io::{Read, Write};
-
-use super::TTransport;
-
-/// Proxy that wraps an inner `TTransport` and delegates all calls to it.
-///
-/// Unlike other `TTransport` wrappers, `TPassThruTransport` is generic with
-/// regards to the wrapped transport. This allows callers to use methods
-/// specific to the type being wrapped instead of being constrained to methods
-/// on the `TTransport` trait.
-///
-/// # Examples
-///
-/// Create and use a `TPassThruTransport`.
-///
-/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use thrift::transport::{TPassThruTransport, TTcpTransport};
-///
-/// let t = TTcpTransport::new();
-/// let t = TPassThruTransport::new(Rc::new(RefCell::new(Box::new(t))));
-///
-/// // since the type parameter is maintained, we are able
-/// // to use functions specific to `TTcpTransport`
-/// t.inner.borrow_mut().open("localhost:9090").unwrap();
-/// ```
-pub struct TPassThruTransport<I: TTransport> {
-    pub inner: Rc<RefCell<Box<I>>>,
-}
-
-impl<I: TTransport> TPassThruTransport<I> {
-    /// Create a `TPassThruTransport` that wraps an `inner` TTransport.
-    pub fn new(inner: Rc<RefCell<Box<I>>>) -> TPassThruTransport<I> {
-        TPassThruTransport { inner: inner }
-    }
-}
-
-impl<I: TTransport> Read for TPassThruTransport<I> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        self.inner.borrow_mut().read(buf)
-    }
-}
-
-impl<I: TTransport> Write for TPassThruTransport<I> {
-    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        self.inner.borrow_mut().write(buf)
-    }
-
-    fn flush(&mut self) -> io::Result<()> {
-        self.inner.borrow_mut().flush()
-    }
-}
diff --git a/lib/rs/src/transport/socket.rs b/lib/rs/src/transport/socket.rs
index 9f2b8ba..16b59ef 100644
--- a/lib/rs/src/transport/socket.rs
+++ b/lib/rs/src/transport/socket.rs
@@ -21,69 +21,74 @@
 use std::net::{Shutdown, TcpStream};
 use std::ops::Drop;
 
-use ::{TransportError, TransportErrorKind};
+use {TransportErrorKind, new_transport_error};
+use super::{ReadHalf, TIoChannel, WriteHalf};
 
-/// Communicate with a Thrift service over a TCP socket.
+/// Bidirectional TCP/IP channel.
 ///
 /// # Examples
 ///
-/// Create a `TTcpTransport`.
+/// Create a `TTcpChannel`.
 ///
 /// ```no_run
 /// use std::io::{Read, Write};
-/// use thrift::transport::TTcpTransport;
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
 ///
 /// let mut buf = vec![0u8; 4];
-/// t.read(&mut buf).unwrap();
-/// t.write(&vec![0, 1, 2]).unwrap();
+/// c.read(&mut buf).unwrap();
+/// c.write(&vec![0, 1, 2]).unwrap();
 /// ```
 ///
-/// Create a `TTcpTransport` by wrapping an existing `TcpStream`.
+/// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
 ///
 /// ```no_run
 /// use std::io::{Read, Write};
 /// use std::net::TcpStream;
-/// use thrift::transport::TTcpTransport;
+/// use thrift::transport::TTcpChannel;
 ///
 /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
-/// let mut t = TTcpTransport::with_stream(stream);
 ///
-/// // no need to call t.open() since we've already connected above
+/// // no need to call c.open() since we've already connected above
+/// let mut c = TTcpChannel::with_stream(stream);
 ///
 /// let mut buf = vec![0u8; 4];
-/// t.read(&mut buf).unwrap();
-/// t.write(&vec![0, 1, 2]).unwrap();
+/// c.read(&mut buf).unwrap();
+/// c.write(&vec![0, 1, 2]).unwrap();
 /// ```
-#[derive(Default)]
-pub struct TTcpTransport {
+#[derive(Debug, Default)]
+pub struct TTcpChannel {
     stream: Option<TcpStream>,
 }
 
-impl TTcpTransport {
-    /// Create an uninitialized `TTcpTransport`.
+impl TTcpChannel {
+    /// Create an uninitialized `TTcpChannel`.
     ///
-    /// The returned instance must be opened using `TTcpTransport::open(...)`
+    /// The returned instance must be opened using `TTcpChannel::open(...)`
     /// before it can be used.
-    pub fn new() -> TTcpTransport {
-        TTcpTransport { stream: None }
+    pub fn new() -> TTcpChannel {
+        TTcpChannel { stream: None }
     }
 
-    /// Create a `TTcpTransport` that wraps an existing `TcpStream`.
+    /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
     ///
     /// The passed-in stream is assumed to have been opened before being wrapped
-    /// by the created `TTcpTransport` instance.
-    pub fn with_stream(stream: TcpStream) -> TTcpTransport {
-        TTcpTransport { stream: Some(stream) }
+    /// by the created `TTcpChannel` instance.
+    pub fn with_stream(stream: TcpStream) -> TTcpChannel {
+        TTcpChannel { stream: Some(stream) }
     }
 
     /// Connect to `remote_address`, which should have the form `host:port`.
     pub fn open(&mut self, remote_address: &str) -> ::Result<()> {
         if self.stream.is_some() {
-            Err(::Error::Transport(TransportError::new(TransportErrorKind::AlreadyOpen,
-                                                       "transport previously opened")))
+            Err(
+                new_transport_error(
+                    TransportErrorKind::AlreadyOpen,
+                    "tcp connection previously opened",
+                ),
+            )
         } else {
             match TcpStream::connect(&remote_address) {
                 Ok(s) => {
@@ -95,33 +100,62 @@
         }
     }
 
-    /// Shutdown this transport.
+    /// Shut down this channel.
     ///
     /// Both send and receive halves are closed, and this instance can no
     /// longer be used to communicate with another endpoint.
     pub fn close(&mut self) -> ::Result<()> {
-        self.if_set(|s| s.shutdown(Shutdown::Both)).map_err(From::from)
+        self.if_set(|s| s.shutdown(Shutdown::Both))
+            .map_err(From::from)
     }
 
     fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
-        where F: FnMut(&mut TcpStream) -> io::Result<T>
+    where
+        F: FnMut(&mut TcpStream) -> io::Result<T>,
     {
 
         if let Some(ref mut s) = self.stream {
             stream_operation(s)
         } else {
-            Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"))
+            Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"),)
         }
     }
 }
 
-impl Read for TTcpTransport {
+impl TIoChannel for TTcpChannel {
+    fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
+    where
+        Self: Sized,
+    {
+        let mut s = self;
+
+        s.stream
+            .as_mut()
+            .and_then(|s| s.try_clone().ok())
+            .map(
+                |cloned| {
+                    (ReadHalf { handle: TTcpChannel { stream: s.stream.take() } },
+                     WriteHalf { handle: TTcpChannel { stream: Some(cloned) } })
+                },
+            )
+            .ok_or_else(
+                || {
+                    new_transport_error(
+                        TransportErrorKind::Unknown,
+                        "cannot clone underlying tcp stream",
+                    )
+                },
+            )
+    }
+}
+
+impl Read for TTcpChannel {
     fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
         self.if_set(|s| s.read(b))
     }
 }
 
-impl Write for TTcpTransport {
+impl Write for TTcpChannel {
     fn write(&mut self, b: &[u8]) -> io::Result<usize> {
         self.if_set(|s| s.write(b))
     }
@@ -131,11 +165,11 @@
     }
 }
 
-// Do I have to implement the Drop trait? TcpStream closes the socket on drop.
-impl Drop for TTcpTransport {
+// FIXME: Do I have to implement the Drop trait? TcpStream closes the socket on drop.
+impl Drop for TTcpChannel {
     fn drop(&mut self) {
         if let Err(e) = self.close() {
-            warn!("error while closing socket transport: {:?}", e)
+            warn!("error while closing socket: {:?}", e)
         }
     }
 }