THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/lib/rs/Cargo.toml b/lib/rs/Cargo.toml
index 07c5e67..be34785 100644
--- a/lib/rs/Cargo.toml
+++ b/lib/rs/Cargo.toml
@@ -11,8 +11,9 @@
 keywords = ["thrift"]
 
 [dependencies]
+byteorder = "0.5.3"
 integer-encoding = "1.0.3"
 log = "~0.3.6"
-byteorder = "0.5.3"
+threadpool = "1.0"
 try_from = "0.2.0"
 
diff --git a/lib/rs/src/autogen.rs b/lib/rs/src/autogen.rs
index 289c7be..54d4080 100644
--- a/lib/rs/src/autogen.rs
+++ b/lib/rs/src/autogen.rs
@@ -22,7 +22,7 @@
 //! to implement required functionality. Users should never have to use code
 //! in this module directly.
 
-use ::protocol::{TInputProtocol, TOutputProtocol};
+use protocol::{TInputProtocol, TOutputProtocol};
 
 /// Specifies the minimum functionality an auto-generated client should provide
 /// to communicate with a Thrift server.
diff --git a/lib/rs/src/errors.rs b/lib/rs/src/errors.rs
index a6049d5..e36cb3b 100644
--- a/lib/rs/src/errors.rs
+++ b/lib/rs/src/errors.rs
@@ -21,7 +21,7 @@
 use std::{error, fmt, io, string};
 use try_from::TryFrom;
 
-use ::protocol::{TFieldIdentifier, TInputProtocol, TOutputProtocol, TStructIdentifier, TType};
+use protocol::{TFieldIdentifier, TInputProtocol, TOutputProtocol, TStructIdentifier, TType};
 
 // FIXME: should all my error structs impl error::Error as well?
 // FIXME: should all fields in TransportError, ProtocolError and ApplicationError be optional?
@@ -198,8 +198,8 @@
     /// Create an `ApplicationError` from its wire representation.
     ///
     /// Application code **should never** call this method directly.
-    pub fn read_application_error_from_in_protocol(i: &mut TInputProtocol)
-                                                   -> ::Result<ApplicationError> {
+    pub fn read_application_error_from_in_protocol(i: &mut TInputProtocol,)
+        -> ::Result<ApplicationError> {
         let mut message = "general remote error".to_owned();
         let mut kind = ApplicationErrorKind::Unknown;
 
@@ -212,7 +212,9 @@
                 break;
             }
 
-            let id = field_ident.id.expect("sender should always specify id for non-STOP field");
+            let id = field_ident
+                .id
+                .expect("sender should always specify id for non-STOP field");
 
             match id {
                 1 => {
@@ -222,8 +224,9 @@
                 }
                 2 => {
                     let remote_type_as_int = i.read_i32()?;
-                    let remote_kind: ApplicationErrorKind = TryFrom::try_from(remote_type_as_int)
-                        .unwrap_or(ApplicationErrorKind::Unknown);
+                    let remote_kind: ApplicationErrorKind =
+                        TryFrom::try_from(remote_type_as_int)
+                            .unwrap_or(ApplicationErrorKind::Unknown);
                     i.read_field_end()?;
                     kind = remote_kind;
                 }
@@ -235,20 +238,23 @@
 
         i.read_struct_end()?;
 
-        Ok(ApplicationError {
-            kind: kind,
-            message: message,
-        })
+        Ok(
+            ApplicationError {
+                kind: kind,
+                message: message,
+            },
+        )
     }
 
     /// Convert an `ApplicationError` into its wire representation and write
     /// it to the remote.
     ///
     /// Application code **should never** call this method directly.
-    pub fn write_application_error_to_out_protocol(e: &ApplicationError,
-                                                   o: &mut TOutputProtocol)
-                                                   -> ::Result<()> {
-        o.write_struct_begin(&TStructIdentifier { name: "TApplicationException".to_owned() })?;
+    pub fn write_application_error_to_out_protocol(
+        e: &ApplicationError,
+        o: &mut TOutputProtocol,
+    ) -> ::Result<()> {
+        o.write_struct_begin(&TStructIdentifier { name: "TApplicationException".to_owned() },)?;
 
         let message_field = TFieldIdentifier::new("message", TType::String, 1);
         let type_field = TFieldIdentifier::new("type", TType::I32, 2);
@@ -303,19 +309,23 @@
 
 impl From<String> for Error {
     fn from(s: String) -> Self {
-        Error::Application(ApplicationError {
-            kind: ApplicationErrorKind::Unknown,
-            message: s,
-        })
+        Error::Application(
+            ApplicationError {
+                kind: ApplicationErrorKind::Unknown,
+                message: s,
+            },
+        )
     }
 }
 
 impl<'a> From<&'a str> for Error {
     fn from(s: &'a str) -> Self {
-        Error::Application(ApplicationError {
-            kind: ApplicationErrorKind::Unknown,
-            message: String::from(s),
-        })
+        Error::Application(
+            ApplicationError {
+                kind: ApplicationErrorKind::Unknown,
+                message: String::from(s),
+            },
+        )
     }
 }
 
@@ -418,10 +428,14 @@
             5 => Ok(TransportErrorKind::NegativeSize),
             6 => Ok(TransportErrorKind::SizeLimit),
             _ => {
-                Err(Error::Protocol(ProtocolError {
-                    kind: ProtocolErrorKind::Unknown,
-                    message: format!("cannot convert {} to TransportErrorKind", from),
-                }))
+                Err(
+                    Error::Protocol(
+                        ProtocolError {
+                            kind: ProtocolErrorKind::Unknown,
+                            message: format!("cannot convert {} to TransportErrorKind", from),
+                        },
+                    ),
+                )
             }
         }
     }
@@ -433,34 +447,44 @@
             io::ErrorKind::ConnectionReset |
             io::ErrorKind::ConnectionRefused |
             io::ErrorKind::NotConnected => {
-                Error::Transport(TransportError {
-                    kind: TransportErrorKind::NotOpen,
-                    message: err.description().to_owned(),
-                })
+                Error::Transport(
+                    TransportError {
+                        kind: TransportErrorKind::NotOpen,
+                        message: err.description().to_owned(),
+                    },
+                )
             }
             io::ErrorKind::AlreadyExists => {
-                Error::Transport(TransportError {
-                    kind: TransportErrorKind::AlreadyOpen,
-                    message: err.description().to_owned(),
-                })
+                Error::Transport(
+                    TransportError {
+                        kind: TransportErrorKind::AlreadyOpen,
+                        message: err.description().to_owned(),
+                    },
+                )
             }
             io::ErrorKind::TimedOut => {
-                Error::Transport(TransportError {
-                    kind: TransportErrorKind::TimedOut,
-                    message: err.description().to_owned(),
-                })
+                Error::Transport(
+                    TransportError {
+                        kind: TransportErrorKind::TimedOut,
+                        message: err.description().to_owned(),
+                    },
+                )
             }
             io::ErrorKind::UnexpectedEof => {
-                Error::Transport(TransportError {
-                    kind: TransportErrorKind::EndOfFile,
-                    message: err.description().to_owned(),
-                })
+                Error::Transport(
+                    TransportError {
+                        kind: TransportErrorKind::EndOfFile,
+                        message: err.description().to_owned(),
+                    },
+                )
             }
             _ => {
-                Error::Transport(TransportError {
-                    kind: TransportErrorKind::Unknown,
-                    message: err.description().to_owned(), // FIXME: use io error's debug string
-                })
+                Error::Transport(
+                    TransportError {
+                        kind: TransportErrorKind::Unknown,
+                        message: err.description().to_owned(), // FIXME: use io error's debug string
+                    },
+                )
             }
         }
     }
@@ -468,10 +492,12 @@
 
 impl From<string::FromUtf8Error> for Error {
     fn from(err: string::FromUtf8Error) -> Self {
-        Error::Protocol(ProtocolError {
-            kind: ProtocolErrorKind::InvalidData,
-            message: err.description().to_owned(), // FIXME: use fmt::Error's debug string
-        })
+        Error::Protocol(
+            ProtocolError {
+                kind: ProtocolErrorKind::InvalidData,
+                message: err.description().to_owned(), // FIXME: use fmt::Error's debug string
+            },
+        )
     }
 }
 
@@ -558,10 +584,14 @@
             5 => Ok(ProtocolErrorKind::NotImplemented),
             6 => Ok(ProtocolErrorKind::DepthLimit),
             _ => {
-                Err(Error::Protocol(ProtocolError {
-                    kind: ProtocolErrorKind::Unknown,
-                    message: format!("cannot convert {} to ProtocolErrorKind", from),
-                }))
+                Err(
+                    Error::Protocol(
+                        ProtocolError {
+                            kind: ProtocolErrorKind::Unknown,
+                            message: format!("cannot convert {} to ProtocolErrorKind", from),
+                        },
+                    ),
+                )
             }
         }
     }
@@ -668,10 +698,14 @@
             9 => Ok(ApplicationErrorKind::InvalidProtocol),
             10 => Ok(ApplicationErrorKind::UnsupportedClientType),
             _ => {
-                Err(Error::Application(ApplicationError {
-                    kind: ApplicationErrorKind::Unknown,
-                    message: format!("cannot convert {} to ApplicationErrorKind", from),
-                }))
+                Err(
+                    Error::Application(
+                        ApplicationError {
+                            kind: ApplicationErrorKind::Unknown,
+                            message: format!("cannot convert {} to ApplicationErrorKind", from),
+                        },
+                    ),
+                )
             }
         }
     }
diff --git a/lib/rs/src/lib.rs b/lib/rs/src/lib.rs
index ad18721..7ebb10c 100644
--- a/lib/rs/src/lib.rs
+++ b/lib/rs/src/lib.rs
@@ -26,11 +26,12 @@
 //! 4. server
 //! 5. autogen
 //!
-//! The modules are layered as shown in the diagram below. The `generated`
+//! The modules are layered as shown in the diagram below. The `autogen'd`
 //! layer is generated by the Thrift compiler's Rust plugin. It uses the
 //! types and functions defined in this crate to serialize and deserialize
 //! messages and implement RPC. Users interact with these types and services
-//! by writing their own code on top.
+//! by writing their own code that uses the auto-generated clients and
+//! servers.
 //!
 //! ```text
 //! +-----------+
@@ -49,6 +50,7 @@
 
 extern crate byteorder;
 extern crate integer_encoding;
+extern crate threadpool;
 extern crate try_from;
 
 #[macro_use]
diff --git a/lib/rs/src/protocol/binary.rs b/lib/rs/src/protocol/binary.rs
index 54613a5..e03ec94 100644
--- a/lib/rs/src/protocol/binary.rs
+++ b/lib/rs/src/protocol/binary.rs
@@ -16,14 +16,11 @@
 // under the License.
 
 use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
-use std::cell::RefCell;
 use std::convert::From;
-use std::io::{Read, Write};
-use std::rc::Rc;
 use try_from::TryFrom;
 
-use ::{ProtocolError, ProtocolErrorKind};
-use ::transport::TTransport;
+use {ProtocolError, ProtocolErrorKind};
+use transport::{TReadTransport, TWriteTransport};
 use super::{TFieldIdentifier, TInputProtocol, TInputProtocolFactory, TListIdentifier,
             TMapIdentifier, TMessageIdentifier, TMessageType};
 use super::{TOutputProtocol, TOutputProtocolFactory, TSetIdentifier, TStructIdentifier, TType};
@@ -41,32 +38,35 @@
 /// Create and use a `TBinaryInputProtocol`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift::protocol::{TBinaryInputProtocol, TInputProtocol};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("localhost:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("localhost:9090").unwrap();
 ///
-/// let mut i_prot = TBinaryInputProtocol::new(transport, true);
+/// let mut protocol = TBinaryInputProtocol::new(channel, true);
 ///
-/// let recvd_bool = i_prot.read_bool().unwrap();
-/// let recvd_string = i_prot.read_string().unwrap();
+/// let recvd_bool = protocol.read_bool().unwrap();
+/// let recvd_string = protocol.read_string().unwrap();
 /// ```
-pub struct TBinaryInputProtocol<'a> {
+#[derive(Debug)]
+pub struct TBinaryInputProtocol<T>
+where
+    T: TReadTransport,
+{
     strict: bool,
-    transport: Rc<RefCell<Box<TTransport + 'a>>>,
+    transport: T,
 }
 
-impl<'a> TBinaryInputProtocol<'a> {
+impl<'a, T> TBinaryInputProtocol<T>
+where
+    T: TReadTransport,
+{
     /// Create a `TBinaryInputProtocol` that reads bytes from `transport`.
     ///
     /// Set `strict` to `true` if all incoming messages contain the protocol
     /// version number in the protocol header.
-    pub fn new(transport: Rc<RefCell<Box<TTransport + 'a>>>,
-               strict: bool) -> TBinaryInputProtocol<'a> {
+    pub fn new(transport: T, strict: bool) -> TBinaryInputProtocol<T> {
         TBinaryInputProtocol {
             strict: strict,
             transport: transport,
@@ -74,11 +74,14 @@
     }
 }
 
-impl<'a> TInputProtocol for TBinaryInputProtocol<'a> {
+impl<T> TInputProtocol for TBinaryInputProtocol<T>
+where
+    T: TReadTransport,
+{
     #[cfg_attr(feature = "cargo-clippy", allow(collapsible_if))]
     fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier> {
         let mut first_bytes = vec![0; 4];
-        self.transport.borrow_mut().read_exact(&mut first_bytes[..])?;
+        self.transport.read_exact(&mut first_bytes[..])?;
 
         // the thrift version header is intentionally negative
         // so the first check we'll do is see if the sign bit is set
@@ -87,10 +90,14 @@
             // apparently we got a protocol-version header - check
             // it, and if it matches, read the rest of the fields
             if first_bytes[0..2] != [0x80, 0x01] {
-                Err(::Error::Protocol(ProtocolError {
-                    kind: ProtocolErrorKind::BadVersion,
-                    message: format!("received bad version: {:?}", &first_bytes[0..2]),
-                }))
+                Err(
+                    ::Error::Protocol(
+                        ProtocolError {
+                            kind: ProtocolErrorKind::BadVersion,
+                            message: format!("received bad version: {:?}", &first_bytes[0..2]),
+                        },
+                    ),
+                )
             } else {
                 let message_type: TMessageType = TryFrom::try_from(first_bytes[3])?;
                 let name = self.read_string()?;
@@ -103,17 +110,21 @@
             if self.strict {
                 // we're in strict mode however, and that always
                 // requires the protocol-version header to be written first
-                Err(::Error::Protocol(ProtocolError {
-                    kind: ProtocolErrorKind::BadVersion,
-                    message: format!("received bad version: {:?}", &first_bytes[0..2]),
-                }))
+                Err(
+                    ::Error::Protocol(
+                        ProtocolError {
+                            kind: ProtocolErrorKind::BadVersion,
+                            message: format!("received bad version: {:?}", &first_bytes[0..2]),
+                        },
+                    ),
+                )
             } else {
                 // in the non-strict version the first message field
                 // is the message name. strings (byte arrays) are length-prefixed,
                 // so we've just read the length in the first 4 bytes
                 let name_size = BigEndian::read_i32(&first_bytes) as usize;
                 let mut name_buf: Vec<u8> = Vec::with_capacity(name_size);
-                self.transport.borrow_mut().read_exact(&mut name_buf)?;
+                self.transport.read_exact(&mut name_buf)?;
                 let name = String::from_utf8(name_buf)?;
 
                 // read the rest of the fields
@@ -143,7 +154,7 @@
             TType::Stop => Ok(0),
             _ => self.read_i16(),
         }?;
-        Ok(TFieldIdentifier::new::<Option<String>, String, i16>(None, field_type, id))
+        Ok(TFieldIdentifier::new::<Option<String>, String, i16>(None, field_type, id),)
     }
 
     fn read_field_end(&mut self) -> ::Result<()> {
@@ -151,9 +162,12 @@
     }
 
     fn read_bytes(&mut self) -> ::Result<Vec<u8>> {
-        let num_bytes = self.transport.borrow_mut().read_i32::<BigEndian>()? as usize;
+        let num_bytes = self.transport.read_i32::<BigEndian>()? as usize;
         let mut buf = vec![0u8; num_bytes];
-        self.transport.borrow_mut().read_exact(&mut buf).map(|_| buf).map_err(From::from)
+        self.transport
+            .read_exact(&mut buf)
+            .map(|_| buf)
+            .map_err(From::from)
     }
 
     fn read_bool(&mut self) -> ::Result<bool> {
@@ -165,23 +179,31 @@
     }
 
     fn read_i8(&mut self) -> ::Result<i8> {
-        self.transport.borrow_mut().read_i8().map_err(From::from)
+        self.transport.read_i8().map_err(From::from)
     }
 
     fn read_i16(&mut self) -> ::Result<i16> {
-        self.transport.borrow_mut().read_i16::<BigEndian>().map_err(From::from)
+        self.transport
+            .read_i16::<BigEndian>()
+            .map_err(From::from)
     }
 
     fn read_i32(&mut self) -> ::Result<i32> {
-        self.transport.borrow_mut().read_i32::<BigEndian>().map_err(From::from)
+        self.transport
+            .read_i32::<BigEndian>()
+            .map_err(From::from)
     }
 
     fn read_i64(&mut self) -> ::Result<i64> {
-        self.transport.borrow_mut().read_i64::<BigEndian>().map_err(From::from)
+        self.transport
+            .read_i64::<BigEndian>()
+            .map_err(From::from)
     }
 
     fn read_double(&mut self) -> ::Result<f64> {
-        self.transport.borrow_mut().read_f64::<BigEndian>().map_err(From::from)
+        self.transport
+            .read_f64::<BigEndian>()
+            .map_err(From::from)
     }
 
     fn read_string(&mut self) -> ::Result<String> {
@@ -224,7 +246,7 @@
     //
 
     fn read_byte(&mut self) -> ::Result<u8> {
-        self.transport.borrow_mut().read_u8().map_err(From::from)
+        self.transport.read_u8().map_err(From::from)
     }
 }
 
@@ -240,8 +262,8 @@
 }
 
 impl TInputProtocolFactory for TBinaryInputProtocolFactory {
-    fn create<'a>(&mut self, transport: Rc<RefCell<Box<TTransport + 'a>>>) -> Box<TInputProtocol + 'a> {
-        Box::new(TBinaryInputProtocol::new(transport, true)) as Box<TInputProtocol + 'a>
+    fn create(&self, transport: Box<TReadTransport + Send>) -> Box<TInputProtocol + Send> {
+        Box::new(TBinaryInputProtocol::new(transport, true))
     }
 }
 
@@ -256,32 +278,35 @@
 /// Create and use a `TBinaryOutputProtocol`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift::protocol::{TBinaryOutputProtocol, TOutputProtocol};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("localhost:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("localhost:9090").unwrap();
 ///
-/// let mut o_prot = TBinaryOutputProtocol::new(transport, true);
+/// let mut protocol = TBinaryOutputProtocol::new(channel, true);
 ///
-/// o_prot.write_bool(true).unwrap();
-/// o_prot.write_string("test_string").unwrap();
+/// protocol.write_bool(true).unwrap();
+/// protocol.write_string("test_string").unwrap();
 /// ```
-pub struct TBinaryOutputProtocol<'a> {
+#[derive(Debug)]
+pub struct TBinaryOutputProtocol<T>
+where
+    T: TWriteTransport,
+{
     strict: bool,
-    transport: Rc<RefCell<Box<TTransport + 'a>>>,
+    pub transport: T, // FIXME: do not make public; only public for testing!
 }
 
-impl<'a> TBinaryOutputProtocol<'a> {
+impl<T> TBinaryOutputProtocol<T>
+where
+    T: TWriteTransport,
+{
     /// Create a `TBinaryOutputProtocol` that writes bytes to `transport`.
     ///
     /// Set `strict` to `true` if all outgoing messages should contain the
     /// protocol version number in the protocol header.
-    pub fn new(transport: Rc<RefCell<Box<TTransport + 'a>>>,
-               strict: bool) -> TBinaryOutputProtocol<'a> {
+    pub fn new(transport: T, strict: bool) -> TBinaryOutputProtocol<T> {
         TBinaryOutputProtocol {
             strict: strict,
             transport: transport,
@@ -289,16 +314,22 @@
     }
 
     fn write_transport(&mut self, buf: &[u8]) -> ::Result<()> {
-        self.transport.borrow_mut().write(buf).map(|_| ()).map_err(From::from)
+        self.transport
+            .write(buf)
+            .map(|_| ())
+            .map_err(From::from)
     }
 }
 
-impl<'a> TOutputProtocol for TBinaryOutputProtocol<'a> {
+impl<T> TOutputProtocol for TBinaryOutputProtocol<T>
+where
+    T: TWriteTransport,
+{
     fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> {
         if self.strict {
             let message_type: u8 = identifier.message_type.into();
             let header = BINARY_PROTOCOL_VERSION_1 | (message_type as u32);
-            self.transport.borrow_mut().write_u32::<BigEndian>(header)?;
+            self.transport.write_u32::<BigEndian>(header)?;
             self.write_string(&identifier.name)?;
             self.write_i32(identifier.sequence_number)
         } else {
@@ -322,11 +353,17 @@
 
     fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> ::Result<()> {
         if identifier.id.is_none() && identifier.field_type != TType::Stop {
-            return Err(::Error::Protocol(ProtocolError {
-                kind: ProtocolErrorKind::Unknown,
-                message: format!("cannot write identifier {:?} without sequence number",
-                                 &identifier),
-            }));
+            return Err(
+                ::Error::Protocol(
+                    ProtocolError {
+                        kind: ProtocolErrorKind::Unknown,
+                        message: format!(
+                            "cannot write identifier {:?} without sequence number",
+                            &identifier
+                        ),
+                    },
+                ),
+            );
         }
 
         self.write_byte(field_type_to_u8(identifier.field_type))?;
@@ -359,23 +396,31 @@
     }
 
     fn write_i8(&mut self, i: i8) -> ::Result<()> {
-        self.transport.borrow_mut().write_i8(i).map_err(From::from)
+        self.transport.write_i8(i).map_err(From::from)
     }
 
     fn write_i16(&mut self, i: i16) -> ::Result<()> {
-        self.transport.borrow_mut().write_i16::<BigEndian>(i).map_err(From::from)
+        self.transport
+            .write_i16::<BigEndian>(i)
+            .map_err(From::from)
     }
 
     fn write_i32(&mut self, i: i32) -> ::Result<()> {
-        self.transport.borrow_mut().write_i32::<BigEndian>(i).map_err(From::from)
+        self.transport
+            .write_i32::<BigEndian>(i)
+            .map_err(From::from)
     }
 
     fn write_i64(&mut self, i: i64) -> ::Result<()> {
-        self.transport.borrow_mut().write_i64::<BigEndian>(i).map_err(From::from)
+        self.transport
+            .write_i64::<BigEndian>(i)
+            .map_err(From::from)
     }
 
     fn write_double(&mut self, d: f64) -> ::Result<()> {
-        self.transport.borrow_mut().write_f64::<BigEndian>(d).map_err(From::from)
+        self.transport
+            .write_f64::<BigEndian>(d)
+            .map_err(From::from)
     }
 
     fn write_string(&mut self, s: &str) -> ::Result<()> {
@@ -401,10 +446,12 @@
     }
 
     fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()> {
-        let key_type = identifier.key_type
+        let key_type = identifier
+            .key_type
             .expect("map identifier to write should contain key type");
         self.write_byte(field_type_to_u8(key_type))?;
-        let val_type = identifier.value_type
+        let val_type = identifier
+            .value_type
             .expect("map identifier to write should contain value type");
         self.write_byte(field_type_to_u8(val_type))?;
         self.write_i32(identifier.size)
@@ -415,14 +462,14 @@
     }
 
     fn flush(&mut self) -> ::Result<()> {
-        self.transport.borrow_mut().flush().map_err(From::from)
+        self.transport.flush().map_err(From::from)
     }
 
     // utility
     //
 
     fn write_byte(&mut self, b: u8) -> ::Result<()> {
-        self.transport.borrow_mut().write_u8(b).map_err(From::from)
+        self.transport.write_u8(b).map_err(From::from)
     }
 }
 
@@ -438,8 +485,8 @@
 }
 
 impl TOutputProtocolFactory for TBinaryOutputProtocolFactory {
-    fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> Box<TOutputProtocol> {
-        Box::new(TBinaryOutputProtocol::new(transport, true)) as Box<TOutputProtocol>
+    fn create(&self, transport: Box<TWriteTransport + Send>) -> Box<TOutputProtocol + Send> {
+        Box::new(TBinaryOutputProtocol::new(transport, true))
     }
 }
 
@@ -481,10 +528,14 @@
         0x10 => Ok(TType::Utf8),
         0x11 => Ok(TType::Utf16),
         unkn => {
-            Err(::Error::Protocol(ProtocolError {
-                kind: ProtocolErrorKind::InvalidData,
-                message: format!("cannot convert {} to TType", unkn),
-            }))
+            Err(
+                ::Error::Protocol(
+                    ProtocolError {
+                        kind: ProtocolErrorKind::InvalidData,
+                        message: format!("cannot convert {} to TType", unkn),
+                    },
+                ),
+            )
         }
     }
 }
@@ -492,56 +543,79 @@
 #[cfg(test)]
 mod tests {
 
-    use std::rc::Rc;
-    use std::cell::RefCell;
-
-    use ::protocol::{TFieldIdentifier, TMessageIdentifier, TMessageType, TInputProtocol,
-                     TListIdentifier, TMapIdentifier, TOutputProtocol, TSetIdentifier,
-                     TStructIdentifier, TType};
-    use ::transport::{TPassThruTransport, TTransport};
-    use ::transport::mem::TBufferTransport;
+    use protocol::{TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier,
+                   TMessageIdentifier, TMessageType, TOutputProtocol, TSetIdentifier,
+                   TStructIdentifier, TType};
+    use transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf};
 
     use super::*;
 
     #[test]
     fn must_write_message_call_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         let ident = TMessageIdentifier::new("test", TMessageType::Call, 1);
         assert!(o_prot.write_message_begin(&ident).is_ok());
 
-        let buf = trans.borrow().write_buffer_to_vec();
+        let expected: [u8; 16] = [
+            0x80,
+            0x01,
+            0x00,
+            0x01,
+            0x00,
+            0x00,
+            0x00,
+            0x04,
+            0x74,
+            0x65,
+            0x73,
+            0x74,
+            0x00,
+            0x00,
+            0x00,
+            0x01,
+        ];
 
-        let expected: [u8; 16] = [0x80, 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65,
-                                  0x73, 0x74, 0x00, 0x00, 0x00, 0x01];
-
-        assert_eq!(&expected, buf.as_slice());
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
-
     #[test]
     fn must_write_message_reply_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         let ident = TMessageIdentifier::new("test", TMessageType::Reply, 10);
         assert!(o_prot.write_message_begin(&ident).is_ok());
 
-        let buf = trans.borrow().write_buffer_to_vec();
+        let expected: [u8; 16] = [
+            0x80,
+            0x01,
+            0x00,
+            0x02,
+            0x00,
+            0x00,
+            0x00,
+            0x04,
+            0x74,
+            0x65,
+            0x73,
+            0x74,
+            0x00,
+            0x00,
+            0x00,
+            0x0A,
+        ];
 
-        let expected: [u8; 16] = [0x80, 0x01, 0x00, 0x02, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65,
-                                  0x73, 0x74, 0x00, 0x00, 0x00, 0x0A];
-
-        assert_eq!(&expected, buf.as_slice());
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_strict_message_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let sent_ident = TMessageIdentifier::new("test", TMessageType::Call, 1);
         assert!(o_prot.write_message_begin(&sent_ident).is_ok());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let received_ident = assert_success!(i_prot.read_message_begin());
         assert_eq!(&received_ident, &sent_ident);
@@ -564,24 +638,26 @@
 
     #[test]
     fn must_write_field_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
-        assert!(o_prot.write_field_begin(&TFieldIdentifier::new("some_field", TType::String, 22))
-            .is_ok());
+        assert!(
+            o_prot
+                .write_field_begin(&TFieldIdentifier::new("some_field", TType::String, 22))
+                .is_ok()
+        );
 
         let expected: [u8; 3] = [0x0B, 0x00, 0x16];
-        let buf = trans.borrow().write_buffer_to_vec();
-        assert_eq!(&expected, buf.as_slice());
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_field_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let sent_field_ident = TFieldIdentifier::new("foo", TType::I64, 20);
         assert!(o_prot.write_field_begin(&sent_field_ident).is_ok());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let expected_ident = TFieldIdentifier {
             name: None,
@@ -594,22 +670,21 @@
 
     #[test]
     fn must_write_stop_field() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert!(o_prot.write_field_stop().is_ok());
 
         let expected: [u8; 1] = [0x00];
-        let buf = trans.borrow().write_buffer_to_vec();
-        assert_eq!(&expected, buf.as_slice());
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_field_stop() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         assert!(o_prot.write_field_stop().is_ok());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let expected_ident = TFieldIdentifier {
             name: None,
@@ -628,23 +703,26 @@
 
     #[test]
     fn must_write_list_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
-        assert!(o_prot.write_list_begin(&TListIdentifier::new(TType::Bool, 5)).is_ok());
+        assert!(
+            o_prot
+                .write_list_begin(&TListIdentifier::new(TType::Bool, 5))
+                .is_ok()
+        );
 
         let expected: [u8; 5] = [0x02, 0x00, 0x00, 0x00, 0x05];
-        let buf = trans.borrow().write_buffer_to_vec();
-        assert_eq!(&expected, buf.as_slice());
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_list_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let ident = TListIdentifier::new(TType::List, 900);
         assert!(o_prot.write_list_begin(&ident).is_ok());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let received_ident = assert_success!(i_prot.read_list_begin());
         assert_eq!(&received_ident, &ident);
@@ -657,23 +735,26 @@
 
     #[test]
     fn must_write_set_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
-        assert!(o_prot.write_set_begin(&TSetIdentifier::new(TType::I16, 7)).is_ok());
+        assert!(
+            o_prot
+                .write_set_begin(&TSetIdentifier::new(TType::I16, 7))
+                .is_ok()
+        );
 
         let expected: [u8; 5] = [0x06, 0x00, 0x00, 0x00, 0x07];
-        let buf = trans.borrow().write_buffer_to_vec();
-        assert_eq!(&expected, buf.as_slice());
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_set_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let ident = TSetIdentifier::new(TType::I64, 2000);
         assert!(o_prot.write_set_begin(&ident).is_ok());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let received_ident_result = i_prot.read_set_begin();
         assert!(received_ident_result.is_ok());
@@ -687,24 +768,26 @@
 
     #[test]
     fn must_write_map_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
-        assert!(o_prot.write_map_begin(&TMapIdentifier::new(TType::I64, TType::Struct, 32))
-            .is_ok());
+        assert!(
+            o_prot
+                .write_map_begin(&TMapIdentifier::new(TType::I64, TType::Struct, 32))
+                .is_ok()
+        );
 
         let expected: [u8; 6] = [0x0A, 0x0C, 0x00, 0x00, 0x00, 0x20];
-        let buf = trans.borrow().write_buffer_to_vec();
-        assert_eq!(&expected, buf.as_slice());
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_map_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let ident = TMapIdentifier::new(TType::Map, TType::Set, 100);
         assert!(o_prot.write_map_begin(&ident).is_ok());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let received_ident = assert_success!(i_prot.read_map_begin());
         assert_eq!(&received_ident, &ident);
@@ -717,31 +800,29 @@
 
     #[test]
     fn must_write_bool_true() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert!(o_prot.write_bool(true).is_ok());
 
         let expected: [u8; 1] = [0x01];
-        let buf = trans.borrow().write_buffer_to_vec();
-        assert_eq!(&expected, buf.as_slice());
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_write_bool_false() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert!(o_prot.write_bool(false).is_ok());
 
         let expected: [u8; 1] = [0x00];
-        let buf = trans.borrow().write_buffer_to_vec();
-        assert_eq!(&expected, buf.as_slice());
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_read_bool_true() {
-        let (trans, mut i_prot, _) = test_objects();
+        let (mut i_prot, _) = test_objects();
 
-        trans.borrow_mut().set_readable_bytes(&[0x01]);
+        set_readable_bytes!(i_prot, &[0x01]);
 
         let read_bool = assert_success!(i_prot.read_bool());
         assert_eq!(read_bool, true);
@@ -749,9 +830,9 @@
 
     #[test]
     fn must_read_bool_false() {
-        let (trans, mut i_prot, _) = test_objects();
+        let (mut i_prot, _) = test_objects();
 
-        trans.borrow_mut().set_readable_bytes(&[0x00]);
+        set_readable_bytes!(i_prot, &[0x00]);
 
         let read_bool = assert_success!(i_prot.read_bool());
         assert_eq!(read_bool, false);
@@ -759,9 +840,9 @@
 
     #[test]
     fn must_allow_any_non_zero_value_to_be_interpreted_as_bool_true() {
-        let (trans, mut i_prot, _) = test_objects();
+        let (mut i_prot, _) = test_objects();
 
-        trans.borrow_mut().set_readable_bytes(&[0xAC]);
+        set_readable_bytes!(i_prot, &[0xAC]);
 
         let read_bool = assert_success!(i_prot.read_bool());
         assert_eq!(read_bool, true);
@@ -769,52 +850,77 @@
 
     #[test]
     fn must_write_bytes() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         let bytes: [u8; 10] = [0x0A, 0xCC, 0xD1, 0x84, 0x99, 0x12, 0xAB, 0xBB, 0x45, 0xDF];
 
         assert!(o_prot.write_bytes(&bytes).is_ok());
 
-        let buf = trans.borrow().write_buffer_to_vec();
+        let buf = o_prot.transport.write_bytes();
         assert_eq!(&buf[0..4], [0x00, 0x00, 0x00, 0x0A]); // length
         assert_eq!(&buf[4..], bytes); // actual bytes
     }
 
     #[test]
     fn must_round_trip_bytes() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
-        let bytes: [u8; 25] = [0x20, 0xFD, 0x18, 0x84, 0x99, 0x12, 0xAB, 0xBB, 0x45, 0xDF, 0x34,
-                               0xDC, 0x98, 0xA4, 0x6D, 0xF3, 0x99, 0xB4, 0xB7, 0xD4, 0x9C, 0xA5,
-                               0xB3, 0xC9, 0x88];
+        let bytes: [u8; 25] = [
+            0x20,
+            0xFD,
+            0x18,
+            0x84,
+            0x99,
+            0x12,
+            0xAB,
+            0xBB,
+            0x45,
+            0xDF,
+            0x34,
+            0xDC,
+            0x98,
+            0xA4,
+            0x6D,
+            0xF3,
+            0x99,
+            0xB4,
+            0xB7,
+            0xD4,
+            0x9C,
+            0xA5,
+            0xB3,
+            0xC9,
+            0x88,
+        ];
 
         assert!(o_prot.write_bytes(&bytes).is_ok());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let received_bytes = assert_success!(i_prot.read_bytes());
         assert_eq!(&received_bytes, &bytes);
     }
 
-    fn test_objects<'a>
-        ()
-        -> (Rc<RefCell<Box<TBufferTransport>>>, TBinaryInputProtocol<'a>, TBinaryOutputProtocol<'a>)
+    fn test_objects()
+        -> (TBinaryInputProtocol<ReadHalf<TBufferChannel>>,
+            TBinaryOutputProtocol<WriteHalf<TBufferChannel>>)
     {
+        let mem = TBufferChannel::with_capacity(40, 40);
 
-        let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity(40, 40))));
+        let (r_mem, w_mem) = mem.split().unwrap();
 
-        let inner: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
-        let inner = Rc::new(RefCell::new(inner));
+        let i_prot = TBinaryInputProtocol::new(r_mem, true);
+        let o_prot = TBinaryOutputProtocol::new(w_mem, true);
 
-        let i_prot = TBinaryInputProtocol::new(inner.clone(), true);
-        let o_prot = TBinaryOutputProtocol::new(inner.clone(), true);
-
-        (mem, i_prot, o_prot)
+        (i_prot, o_prot)
     }
 
-    fn assert_no_write<F: FnMut(&mut TBinaryOutputProtocol) -> ::Result<()>>(mut write_fn: F) {
-        let (trans, _, mut o_prot) = test_objects();
+    fn assert_no_write<F>(mut write_fn: F)
+    where
+        F: FnMut(&mut TBinaryOutputProtocol<WriteHalf<TBufferChannel>>) -> ::Result<()>,
+    {
+        let (_, mut o_prot) = test_objects();
         assert!(write_fn(&mut o_prot).is_ok());
-        assert_eq!(trans.borrow().write_buffer_as_ref().len(), 0);
+        assert_eq!(o_prot.transport.write_bytes().len(), 0);
     }
 }
diff --git a/lib/rs/src/protocol/compact.rs b/lib/rs/src/protocol/compact.rs
index 353514d..dfe11f8 100644
--- a/lib/rs/src/protocol/compact.rs
+++ b/lib/rs/src/protocol/compact.rs
@@ -17,15 +17,12 @@
 
 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
 use integer_encoding::{VarIntReader, VarIntWriter};
-use std::cell::RefCell;
 use std::convert::From;
-use std::rc::Rc;
-use std::io::{Read, Write};
 use try_from::TryFrom;
 
-use ::transport::TTransport;
-use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType,
-            TInputProtocol, TInputProtocolFactory};
+use transport::{TReadTransport, TWriteTransport};
+use super::{TFieldIdentifier, TInputProtocol, TInputProtocolFactory, TListIdentifier,
+            TMapIdentifier, TMessageIdentifier, TMessageType};
 use super::{TOutputProtocol, TOutputProtocolFactory, TSetIdentifier, TStructIdentifier, TType};
 
 const COMPACT_PROTOCOL_ID: u8 = 0x82;
@@ -39,21 +36,22 @@
 /// Create and use a `TCompactInputProtocol`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift::protocol::{TCompactInputProtocol, TInputProtocol};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("localhost:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("localhost:9090").unwrap();
 ///
-/// let mut i_prot = TCompactInputProtocol::new(transport);
+/// let mut protocol = TCompactInputProtocol::new(channel);
 ///
-/// let recvd_bool = i_prot.read_bool().unwrap();
-/// let recvd_string = i_prot.read_string().unwrap();
+/// let recvd_bool = protocol.read_bool().unwrap();
+/// let recvd_string = protocol.read_string().unwrap();
 /// ```
-pub struct TCompactInputProtocol<'a> {
+#[derive(Debug)]
+pub struct TCompactInputProtocol<T>
+where
+    T: TReadTransport,
+{
     // Identifier of the last field deserialized for a struct.
     last_read_field_id: i16,
     // Stack of the last read field ids (a new entry is added each time a nested struct is read).
@@ -63,12 +61,15 @@
     // and reading the field only occurs after the field id is read.
     pending_read_bool_value: Option<bool>,
     // Underlying transport used for byte-level operations.
-    transport: Rc<RefCell<Box<TTransport + 'a>>>,
+    transport: T,
 }
 
-impl<'a> TCompactInputProtocol<'a> {
+impl<T> TCompactInputProtocol<T>
+where
+    T: TReadTransport,
+{
     /// Create a `TCompactInputProtocol` that reads bytes from `transport`.
-    pub fn new(transport: Rc<RefCell<Box<TTransport + 'a>>>) -> TCompactInputProtocol<'a> {
+    pub fn new(transport: T) -> TCompactInputProtocol<T> {
         TCompactInputProtocol {
             last_read_field_id: 0,
             read_field_id_stack: Vec::new(),
@@ -87,21 +88,28 @@
             // high bits set high if count and type encoded separately
             element_count = possible_element_count as i32;
         } else {
-            element_count = self.transport.borrow_mut().read_varint::<u32>()? as i32;
+            element_count = self.transport.read_varint::<u32>()? as i32;
         }
 
         Ok((element_type, element_count))
     }
 }
 
-impl<'a> TInputProtocol for TCompactInputProtocol<'a> {
+impl<T> TInputProtocol for TCompactInputProtocol<T>
+where
+    T: TReadTransport,
+{
     fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier> {
         let compact_id = self.read_byte()?;
         if compact_id != COMPACT_PROTOCOL_ID {
-            Err(::Error::Protocol(::ProtocolError {
-                kind: ::ProtocolErrorKind::BadVersion,
-                message: format!("invalid compact protocol header {:?}", compact_id),
-            }))
+            Err(
+                ::Error::Protocol(
+                    ::ProtocolError {
+                        kind: ::ProtocolErrorKind::BadVersion,
+                        message: format!("invalid compact protocol header {:?}", compact_id),
+                    },
+                ),
+            )
         } else {
             Ok(())
         }?;
@@ -109,11 +117,17 @@
         let type_and_byte = self.read_byte()?;
         let received_version = type_and_byte & COMPACT_VERSION_MASK;
         if received_version != COMPACT_VERSION {
-            Err(::Error::Protocol(::ProtocolError {
-                kind: ::ProtocolErrorKind::BadVersion,
-                message: format!("cannot process compact protocol version {:?}",
-                                 received_version),
-            }))
+            Err(
+                ::Error::Protocol(
+                    ::ProtocolError {
+                        kind: ::ProtocolErrorKind::BadVersion,
+                        message: format!(
+                            "cannot process compact protocol version {:?}",
+                            received_version
+                        ),
+                    },
+                ),
+            )
         } else {
             Ok(())
         }?;
@@ -125,7 +139,7 @@
 
         self.last_read_field_id = 0;
 
-        Ok(TMessageIdentifier::new(service_call_name, message_type, sequence_number))
+        Ok(TMessageIdentifier::new(service_call_name, message_type, sequence_number),)
     }
 
     fn read_message_end(&mut self) -> ::Result<()> {
@@ -165,9 +179,13 @@
 
         match field_type {
             TType::Stop => {
-                Ok(TFieldIdentifier::new::<Option<String>, String, Option<i16>>(None,
-                                                                                TType::Stop,
-                                                                                None))
+                Ok(
+                    TFieldIdentifier::new::<Option<String>, String, Option<i16>>(
+                        None,
+                        TType::Stop,
+                        None,
+                    ),
+                )
             }
             _ => {
                 if field_delta != 0 {
@@ -176,11 +194,13 @@
                     self.last_read_field_id = self.read_i16()?;
                 };
 
-                Ok(TFieldIdentifier {
-                    name: None,
-                    field_type: field_type,
-                    id: Some(self.last_read_field_id),
-                })
+                Ok(
+                    TFieldIdentifier {
+                        name: None,
+                        field_type: field_type,
+                        id: Some(self.last_read_field_id),
+                    },
+                )
             }
         }
     }
@@ -198,10 +218,14 @@
                     0x01 => Ok(true),
                     0x02 => Ok(false),
                     unkn => {
-                        Err(::Error::Protocol(::ProtocolError {
-                            kind: ::ProtocolErrorKind::InvalidData,
-                            message: format!("cannot convert {} into bool", unkn),
-                        }))
+                        Err(
+                            ::Error::Protocol(
+                                ::ProtocolError {
+                                    kind: ::ProtocolErrorKind::InvalidData,
+                                    message: format!("cannot convert {} into bool", unkn),
+                                },
+                            ),
+                        )
                     }
                 }
             }
@@ -209,9 +233,12 @@
     }
 
     fn read_bytes(&mut self) -> ::Result<Vec<u8>> {
-        let len = self.transport.borrow_mut().read_varint::<u32>()?;
+        let len = self.transport.read_varint::<u32>()?;
         let mut buf = vec![0u8; len as usize];
-        self.transport.borrow_mut().read_exact(&mut buf).map_err(From::from).map(|_| buf)
+        self.transport
+            .read_exact(&mut buf)
+            .map_err(From::from)
+            .map(|_| buf)
     }
 
     fn read_i8(&mut self) -> ::Result<i8> {
@@ -219,19 +246,21 @@
     }
 
     fn read_i16(&mut self) -> ::Result<i16> {
-        self.transport.borrow_mut().read_varint::<i16>().map_err(From::from)
+        self.transport.read_varint::<i16>().map_err(From::from)
     }
 
     fn read_i32(&mut self) -> ::Result<i32> {
-        self.transport.borrow_mut().read_varint::<i32>().map_err(From::from)
+        self.transport.read_varint::<i32>().map_err(From::from)
     }
 
     fn read_i64(&mut self) -> ::Result<i64> {
-        self.transport.borrow_mut().read_varint::<i64>().map_err(From::from)
+        self.transport.read_varint::<i64>().map_err(From::from)
     }
 
     fn read_double(&mut self) -> ::Result<f64> {
-        self.transport.borrow_mut().read_f64::<BigEndian>().map_err(From::from)
+        self.transport
+            .read_f64::<BigEndian>()
+            .map_err(From::from)
     }
 
     fn read_string(&mut self) -> ::Result<String> {
@@ -258,7 +287,7 @@
     }
 
     fn read_map_begin(&mut self) -> ::Result<TMapIdentifier> {
-        let element_count = self.transport.borrow_mut().read_varint::<u32>()? as i32;
+        let element_count = self.transport.read_varint::<u32>()? as i32;
         if element_count == 0 {
             Ok(TMapIdentifier::new(None, None, 0))
         } else {
@@ -278,7 +307,10 @@
 
     fn read_byte(&mut self) -> ::Result<u8> {
         let mut buf = [0u8; 1];
-        self.transport.borrow_mut().read_exact(&mut buf).map_err(From::from).map(|_| buf[0])
+        self.transport
+            .read_exact(&mut buf)
+            .map_err(From::from)
+            .map(|_| buf[0])
     }
 }
 
@@ -294,8 +326,8 @@
 }
 
 impl TInputProtocolFactory for TCompactInputProtocolFactory {
-    fn create<'a>(&mut self, transport: Rc<RefCell<Box<TTransport + 'a>>>) -> Box<TInputProtocol + 'a> {
-        Box::new(TCompactInputProtocol::new(transport)) as Box<TInputProtocol + 'a>
+    fn create(&self, transport: Box<TReadTransport + Send>) -> Box<TInputProtocol + Send> {
+        Box::new(TCompactInputProtocol::new(transport))
     }
 }
 
@@ -306,35 +338,39 @@
 /// Create and use a `TCompactOutputProtocol`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("localhost:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("localhost:9090").unwrap();
 ///
-/// let mut o_prot = TCompactOutputProtocol::new(transport);
+/// let mut protocol = TCompactOutputProtocol::new(channel);
 ///
-/// o_prot.write_bool(true).unwrap();
-/// o_prot.write_string("test_string").unwrap();
+/// protocol.write_bool(true).unwrap();
+/// protocol.write_string("test_string").unwrap();
 /// ```
-pub struct TCompactOutputProtocol<'a> {
+#[derive(Debug)]
+pub struct TCompactOutputProtocol<T>
+where
+    T: TWriteTransport,
+{
     // Identifier of the last field serialized for a struct.
     last_write_field_id: i16,
-    // Stack of the last written field ids (a new entry is added each time a nested struct is written).
+    // Stack of the last written field ids (new entry added each time a nested struct is written).
     write_field_id_stack: Vec<i16>,
     // Field identifier of the boolean field to be written.
     // Saved because boolean fields and their value are encoded in a single byte
     pending_write_bool_field_identifier: Option<TFieldIdentifier>,
     // Underlying transport used for byte-level operations.
-    transport: Rc<RefCell<Box<TTransport + 'a>>>,
+    transport: T,
 }
 
-impl<'a> TCompactOutputProtocol<'a> {
+impl<T> TCompactOutputProtocol<T>
+where
+    T: TWriteTransport,
+{
     /// Create a `TCompactOutputProtocol` that writes bytes to `transport`.
-    pub fn new(transport: Rc<RefCell<Box<TTransport + 'a>>>) -> TCompactOutputProtocol<'a> {
+    pub fn new(transport: T) -> TCompactOutputProtocol<T> {
         TCompactOutputProtocol {
             last_write_field_id: 0,
             write_field_id_stack: Vec::new(),
@@ -365,7 +401,6 @@
             let header = 0xF0 | elem_identifier;
             self.write_byte(header)?;
             self.transport
-                .borrow_mut()
                 .write_varint(element_count as u32)
                 .map_err(From::from)
                 .map(|_| ())
@@ -379,7 +414,10 @@
     }
 }
 
-impl<'a> TOutputProtocol for TCompactOutputProtocol<'a> {
+impl<T> TOutputProtocol for TCompactOutputProtocol<T>
+where
+    T: TWriteTransport,
+{
     fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> {
         self.write_byte(COMPACT_PROTOCOL_ID)?;
         self.write_byte((u8::from(identifier.message_type) << 5) | COMPACT_VERSION)?;
@@ -401,8 +439,9 @@
 
     fn write_struct_end(&mut self) -> ::Result<()> {
         self.assert_no_pending_bool_write();
-        self.last_write_field_id =
-            self.write_field_id_stack.pop().expect("should have previous field ids");
+        self.last_write_field_id = self.write_field_id_stack
+            .pop()
+            .expect("should have previous field ids");
         Ok(())
     }
 
@@ -410,16 +449,20 @@
         match identifier.field_type {
             TType::Bool => {
                 if self.pending_write_bool_field_identifier.is_some() {
-                    panic!("should not have a pending bool while writing another bool with id: \
+                    panic!(
+                        "should not have a pending bool while writing another bool with id: \
                             {:?}",
-                           identifier)
+                        identifier
+                    )
                 }
                 self.pending_write_bool_field_identifier = Some(identifier.clone());
                 Ok(())
             }
             _ => {
                 let field_type = type_to_u8(identifier.field_type);
-                let field_id = identifier.id.expect("non-stop field should have field id");
+                let field_id = identifier
+                    .id
+                    .expect("non-stop field should have field id");
                 self.write_field_header(field_type, field_id)
             }
         }
@@ -453,8 +496,8 @@
     }
 
     fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
-        self.transport.borrow_mut().write_varint(b.len() as u32)?;
-        self.transport.borrow_mut().write_all(b).map_err(From::from)
+        self.transport.write_varint(b.len() as u32)?;
+        self.transport.write_all(b).map_err(From::from)
     }
 
     fn write_i8(&mut self, i: i8) -> ::Result<()> {
@@ -462,19 +505,30 @@
     }
 
     fn write_i16(&mut self, i: i16) -> ::Result<()> {
-        self.transport.borrow_mut().write_varint(i).map_err(From::from).map(|_| ())
+        self.transport
+            .write_varint(i)
+            .map_err(From::from)
+            .map(|_| ())
     }
 
     fn write_i32(&mut self, i: i32) -> ::Result<()> {
-        self.transport.borrow_mut().write_varint(i).map_err(From::from).map(|_| ())
+        self.transport
+            .write_varint(i)
+            .map_err(From::from)
+            .map(|_| ())
     }
 
     fn write_i64(&mut self, i: i64) -> ::Result<()> {
-        self.transport.borrow_mut().write_varint(i).map_err(From::from).map(|_| ())
+        self.transport
+            .write_varint(i)
+            .map_err(From::from)
+            .map(|_| ())
     }
 
     fn write_double(&mut self, d: f64) -> ::Result<()> {
-        self.transport.borrow_mut().write_f64::<BigEndian>(d).map_err(From::from)
+        self.transport
+            .write_f64::<BigEndian>(d)
+            .map_err(From::from)
     }
 
     fn write_string(&mut self, s: &str) -> ::Result<()> {
@@ -501,13 +555,15 @@
         if identifier.size == 0 {
             self.write_byte(0)
         } else {
-            self.transport.borrow_mut().write_varint(identifier.size as u32)?;
+            self.transport.write_varint(identifier.size as u32)?;
 
-            let key_type = identifier.key_type
+            let key_type = identifier
+                .key_type
                 .expect("map identifier to write should contain key type");
             let key_type_byte = collection_type_to_u8(key_type) << 4;
 
-            let val_type = identifier.value_type
+            let val_type = identifier
+                .value_type
                 .expect("map identifier to write should contain value type");
             let val_type_byte = collection_type_to_u8(val_type);
 
@@ -521,14 +577,17 @@
     }
 
     fn flush(&mut self) -> ::Result<()> {
-        self.transport.borrow_mut().flush().map_err(From::from)
+        self.transport.flush().map_err(From::from)
     }
 
     // utility
     //
 
     fn write_byte(&mut self, b: u8) -> ::Result<()> {
-        self.transport.borrow_mut().write(&[b]).map_err(From::from).map(|_| ())
+        self.transport
+            .write(&[b])
+            .map_err(From::from)
+            .map(|_| ())
     }
 }
 
@@ -544,8 +603,8 @@
 }
 
 impl TOutputProtocolFactory for TCompactOutputProtocolFactory {
-    fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> Box<TOutputProtocol> {
-        Box::new(TCompactOutputProtocol::new(transport)) as Box<TOutputProtocol>
+    fn create(&self, transport: Box<TWriteTransport + Send>) -> Box<TOutputProtocol + Send> {
+        Box::new(TCompactOutputProtocol::new(transport))
     }
 }
 
@@ -594,10 +653,14 @@
         0x0B => Ok(TType::Map),
         0x0C => Ok(TType::Struct),
         unkn => {
-            Err(::Error::Protocol(::ProtocolError {
-                kind: ::ProtocolErrorKind::InvalidData,
-                message: format!("cannot convert {} into TType", unkn),
-            }))
+            Err(
+                ::Error::Protocol(
+                    ::ProtocolError {
+                        kind: ::ProtocolErrorKind::InvalidData,
+                        message: format!("cannot convert {} into TType", unkn),
+                    },
+                ),
+            )
         }
     }
 }
@@ -605,54 +668,65 @@
 #[cfg(test)]
 mod tests {
 
-    use std::rc::Rc;
-    use std::cell::RefCell;
-
-    use ::protocol::{TFieldIdentifier, TMessageIdentifier, TMessageType, TInputProtocol,
-                     TListIdentifier, TMapIdentifier, TOutputProtocol, TSetIdentifier,
-                     TStructIdentifier, TType};
-    use ::transport::{TPassThruTransport, TTransport};
-    use ::transport::mem::TBufferTransport;
+    use protocol::{TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier,
+                   TMessageIdentifier, TMessageType, TOutputProtocol, TSetIdentifier,
+                   TStructIdentifier, TType};
+    use transport::{ReadHalf, TBufferChannel, TIoChannel, WriteHalf};
 
     use super::*;
 
     #[test]
     fn must_write_message_begin_0() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert_success!(o_prot.write_message_begin(&TMessageIdentifier::new("foo", TMessageType::Call, 431)));
 
-        let expected: [u8; 8] =
-            [0x82 /* protocol ID */, 0x21 /* message type | protocol version */, 0xDE,
-             0x06 /* zig-zag varint sequence number */, 0x03 /* message-name length */,
-             0x66, 0x6F, 0x6F /* "foo" */];
+        let expected: [u8; 8] = [
+            0x82, /* protocol ID */
+            0x21, /* message type | protocol version */
+            0xDE,
+            0x06, /* zig-zag varint sequence number */
+            0x03, /* message-name length */
+            0x66,
+            0x6F,
+            0x6F /* "foo" */,
+        ];
 
-        assert_eq!(trans.borrow().write_buffer_as_ref(), &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_write_message_begin_1() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
-        assert_success!(o_prot.write_message_begin(&TMessageIdentifier::new("bar", TMessageType::Reply, 991828)));
+        assert_success!(
+            o_prot.write_message_begin(&TMessageIdentifier::new("bar", TMessageType::Reply, 991828))
+        );
 
-        let expected: [u8; 9] =
-            [0x82 /* protocol ID */, 0x41 /* message type | protocol version */, 0xA8,
-             0x89, 0x79 /* zig-zag varint sequence number */,
-             0x03 /* message-name length */, 0x62, 0x61, 0x72 /* "bar" */];
+        let expected: [u8; 9] = [
+            0x82, /* protocol ID */
+            0x41, /* message type | protocol version */
+            0xA8,
+            0x89,
+            0x79, /* zig-zag varint sequence number */
+            0x03, /* message-name length */
+            0x62,
+            0x61,
+            0x72 /* "bar" */,
+        ];
 
-        assert_eq!(trans.borrow().write_buffer_as_ref(), &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_message_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let ident = TMessageIdentifier::new("service_call", TMessageType::Call, 1283948);
 
         assert_success!(o_prot.write_message_begin(&ident));
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let res = assert_success!(i_prot.read_message_begin());
         assert_eq!(&res, &ident);
@@ -668,7 +742,7 @@
 
     #[test]
     fn must_write_struct_with_delta_fields() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         // no bytes should be written however
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -692,20 +766,20 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        // get bytes written
-        let buf = trans.borrow_mut().write_buffer_to_vec();
+        let expected: [u8; 5] = [
+            0x03, /* field type */
+            0x00, /* first field id */
+            0x44, /* field delta (4) | field type */
+            0x59, /* field delta (5) | field type */
+            0x00 /* field stop */,
+        ];
 
-        let expected: [u8; 5] = [0x03 /* field type */, 0x00 /* first field id */,
-                                 0x44 /* field delta (4) | field type */,
-                                 0x59 /* field delta (5) | field type */,
-                                 0x00 /* field stop */];
-
-        assert_eq!(&buf, &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_struct_with_delta_fields() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         // no bytes should be written however
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -732,40 +806,57 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // read the struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_1 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_1,
-                   TFieldIdentifier { name: None, ..field_ident_1 });
+        assert_eq!(
+            read_ident_1,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_1
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_2 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_2,
-                   TFieldIdentifier { name: None, ..field_ident_2 });
+        assert_eq!(
+            read_ident_2,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_2
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_3 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_3,
-                   TFieldIdentifier { name: None, ..field_ident_3 });
+        assert_eq!(
+            read_ident_3,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_3
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_4 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_4,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_4,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
 
         assert_success!(i_prot.read_struct_end());
     }
 
     #[test]
     fn must_write_struct_with_non_zero_initial_field_and_delta_fields() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         // no bytes should be written however
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -789,20 +880,19 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        // get bytes written
-        let buf = trans.borrow_mut().write_buffer_to_vec();
+        let expected: [u8; 4] = [
+            0x15, /* field delta (1) | field type */
+            0x1A, /* field delta (1) | field type */
+            0x48, /* field delta (4) | field type */
+            0x00 /* field stop */,
+        ];
 
-        let expected: [u8; 4] = [0x15 /* field delta (1) | field type */,
-                                 0x1A /* field delta (1) | field type */,
-                                 0x48 /* field delta (4) | field type */,
-                                 0x00 /* field stop */];
-
-        assert_eq!(&buf, &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_struct_with_non_zero_initial_field_and_delta_fields() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         // no bytes should be written however
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -829,40 +919,57 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // read the struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_1 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_1,
-                   TFieldIdentifier { name: None, ..field_ident_1 });
+        assert_eq!(
+            read_ident_1,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_1
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_2 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_2,
-                   TFieldIdentifier { name: None, ..field_ident_2 });
+        assert_eq!(
+            read_ident_2,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_2
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_3 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_3,
-                   TFieldIdentifier { name: None, ..field_ident_3 });
+        assert_eq!(
+            read_ident_3,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_3
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_4 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_4,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_4,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
 
         assert_success!(i_prot.read_struct_end());
     }
 
     #[test]
     fn must_write_struct_with_long_fields() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         // no bytes should be written however
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -885,21 +992,23 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        // get bytes written
-        let buf = trans.borrow_mut().write_buffer_to_vec();
+        let expected: [u8; 8] = [
+            0x05, /* field type */
+            0x00, /* first field id */
+            0x06, /* field type */
+            0x20, /* zig-zag varint field id */
+            0x0A, /* field type */
+            0xC6,
+            0x01, /* zig-zag varint field id */
+            0x00 /* field stop */,
+        ];
 
-        let expected: [u8; 8] =
-            [0x05 /* field type */, 0x00 /* first field id */,
-             0x06 /* field type */, 0x20 /* zig-zag varint field id */,
-             0x0A /* field type */, 0xC6, 0x01 /* zig-zag varint field id */,
-             0x00 /* field stop */];
-
-        assert_eq!(&buf, &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_struct_with_long_fields() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         // no bytes should be written however
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -925,40 +1034,57 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // read the struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_1 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_1,
-                   TFieldIdentifier { name: None, ..field_ident_1 });
+        assert_eq!(
+            read_ident_1,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_1
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_2 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_2,
-                   TFieldIdentifier { name: None, ..field_ident_2 });
+        assert_eq!(
+            read_ident_2,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_2
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_3 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_3,
-                   TFieldIdentifier { name: None, ..field_ident_3 });
+        assert_eq!(
+            read_ident_3,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_3
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_4 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_4,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_4,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
 
         assert_success!(i_prot.read_struct_end());
     }
 
     #[test]
     fn must_write_struct_with_mix_of_long_and_delta_fields() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         // no bytes should be written however
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -989,22 +1115,25 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        // get bytes written
-        let buf = trans.borrow_mut().write_buffer_to_vec();
+        let expected: [u8; 10] = [
+            0x16, /* field delta (1) | field type */
+            0x85, /* field delta (8) | field type */
+            0x0A, /* field type */
+            0xD0,
+            0x0F, /* zig-zag varint field id */
+            0x0A, /* field type */
+            0xA2,
+            0x1F, /* zig-zag varint field id */
+            0x3A, /* field delta (3) | field type */
+            0x00 /* field stop */,
+        ];
 
-        let expected: [u8; 10] =
-            [0x16 /* field delta (1) | field type */,
-             0x85 /* field delta (8) | field type */, 0x0A /* field type */, 0xD0,
-             0x0F /* zig-zag varint field id */, 0x0A /* field type */, 0xA2,
-             0x1F /* zig-zag varint field id */,
-             0x3A /* field delta (3) | field type */, 0x00 /* field stop */];
-
-        assert_eq!(&buf, &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_struct_with_mix_of_long_and_delta_fields() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         // no bytes should be written however
         let struct_ident = TStructIdentifier::new("foo");
@@ -1041,43 +1170,70 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // read the struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_1 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_1,
-                   TFieldIdentifier { name: None, ..field_ident_1 });
+        assert_eq!(
+            read_ident_1,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_1
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_2 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_2,
-                   TFieldIdentifier { name: None, ..field_ident_2 });
+        assert_eq!(
+            read_ident_2,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_2
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_3 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_3,
-                   TFieldIdentifier { name: None, ..field_ident_3 });
+        assert_eq!(
+            read_ident_3,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_3
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_4 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_4,
-                   TFieldIdentifier { name: None, ..field_ident_4 });
+        assert_eq!(
+            read_ident_4,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_4
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_5 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_5,
-                   TFieldIdentifier { name: None, ..field_ident_5 });
+        assert_eq!(
+            read_ident_5,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_5
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_6 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_6,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_6,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
 
         assert_success!(i_prot.read_struct_end());
     }
@@ -1087,7 +1243,7 @@
         // last field of the containing struct is a delta
         // first field of the the contained struct is a delta
 
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         // start containing struct
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -1123,17 +1279,17 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        // get bytes written
-        let buf = trans.borrow_mut().write_buffer_to_vec();
+        let expected: [u8; 7] = [
+            0x16, /* field delta (1) | field type */
+            0x85, /* field delta (8) | field type */
+            0x73, /* field delta (7) | field type */
+            0x07, /* field type */
+            0x30, /* zig-zag varint field id */
+            0x00, /* field stop - contained */
+            0x00 /* field stop - containing */,
+        ];
 
-        let expected: [u8; 7] =
-            [0x16 /* field delta (1) | field type */,
-             0x85 /* field delta (8) | field type */,
-             0x73 /* field delta (7) | field type */, 0x07 /* field type */,
-             0x30 /* zig-zag varint field id */, 0x00 /* field stop - contained */,
-             0x00 /* field stop - containing */];
-
-        assert_eq!(&buf, &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
@@ -1141,7 +1297,7 @@
         // last field of the containing struct is a delta
         // first field of the the contained struct is a delta
 
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         // start containing struct
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -1181,52 +1337,76 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // read containing struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_1 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_1,
-                   TFieldIdentifier { name: None, ..field_ident_1 });
+        assert_eq!(
+            read_ident_1,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_1
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_2 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_2,
-                   TFieldIdentifier { name: None, ..field_ident_2 });
+        assert_eq!(
+            read_ident_2,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_2
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         // read contained struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_3 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_3,
-                   TFieldIdentifier { name: None, ..field_ident_3 });
+        assert_eq!(
+            read_ident_3,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_3
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_4 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_4,
-                   TFieldIdentifier { name: None, ..field_ident_4 });
+        assert_eq!(
+            read_ident_4,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_4
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         // end contained struct
         let read_ident_6 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_6,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_6,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
         assert_success!(i_prot.read_struct_end());
 
         // end containing struct
         let read_ident_7 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_7,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_7,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
         assert_success!(i_prot.read_struct_end());
     }
 
@@ -1235,7 +1415,7 @@
         // last field of the containing struct is a delta
         // first field of the the contained struct is a full write
 
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         // start containing struct
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -1271,17 +1451,17 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        // get bytes written
-        let buf = trans.borrow_mut().write_buffer_to_vec();
+        let expected: [u8; 7] = [
+            0x16, /* field delta (1) | field type */
+            0x85, /* field delta (8) | field type */
+            0x07, /* field type */
+            0x30, /* zig-zag varint field id */
+            0x33, /* field delta (3) | field type */
+            0x00, /* field stop - contained */
+            0x00 /* field stop - containing */,
+        ];
 
-        let expected: [u8; 7] =
-            [0x16 /* field delta (1) | field type */,
-             0x85 /* field delta (8) | field type */, 0x07 /* field type */,
-             0x30 /* zig-zag varint field id */,
-             0x33 /* field delta (3) | field type */, 0x00 /* field stop - contained */,
-             0x00 /* field stop - containing */];
-
-        assert_eq!(&buf, &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
@@ -1289,7 +1469,7 @@
         // last field of the containing struct is a delta
         // first field of the the contained struct is a full write
 
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         // start containing struct
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -1329,52 +1509,76 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // read containing struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_1 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_1,
-                   TFieldIdentifier { name: None, ..field_ident_1 });
+        assert_eq!(
+            read_ident_1,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_1
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_2 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_2,
-                   TFieldIdentifier { name: None, ..field_ident_2 });
+        assert_eq!(
+            read_ident_2,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_2
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         // read contained struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_3 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_3,
-                   TFieldIdentifier { name: None, ..field_ident_3 });
+        assert_eq!(
+            read_ident_3,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_3
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_4 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_4,
-                   TFieldIdentifier { name: None, ..field_ident_4 });
+        assert_eq!(
+            read_ident_4,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_4
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         // end contained struct
         let read_ident_6 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_6,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_6,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
         assert_success!(i_prot.read_struct_end());
 
         // end containing struct
         let read_ident_7 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_7,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_7,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
         assert_success!(i_prot.read_struct_end());
     }
 
@@ -1383,7 +1587,7 @@
         // last field of the containing struct is a full write
         // first field of the the contained struct is a delta write
 
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         // start containing struct
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -1419,21 +1623,22 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        // get bytes written
-        let buf = trans.borrow_mut().write_buffer_to_vec();
+        let expected: [u8; 7] = [
+            0x16, /* field delta (1) | field type */
+            0x08, /* field type */
+            0x2A, /* zig-zag varint field id */
+            0x77, /* field delta(7) | field type */
+            0x33, /* field delta (3) | field type */
+            0x00, /* field stop - contained */
+            0x00 /* field stop - containing */,
+        ];
 
-        let expected: [u8; 7] =
-            [0x16 /* field delta (1) | field type */, 0x08 /* field type */,
-             0x2A /* zig-zag varint field id */, 0x77 /* field delta(7) | field type */,
-             0x33 /* field delta (3) | field type */, 0x00 /* field stop - contained */,
-             0x00 /* field stop - containing */];
-
-        assert_eq!(&buf, &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_nested_structs_2() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         // start containing struct
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -1473,52 +1678,76 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // read containing struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_1 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_1,
-                   TFieldIdentifier { name: None, ..field_ident_1 });
+        assert_eq!(
+            read_ident_1,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_1
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_2 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_2,
-                   TFieldIdentifier { name: None, ..field_ident_2 });
+        assert_eq!(
+            read_ident_2,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_2
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         // read contained struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_3 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_3,
-                   TFieldIdentifier { name: None, ..field_ident_3 });
+        assert_eq!(
+            read_ident_3,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_3
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_4 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_4,
-                   TFieldIdentifier { name: None, ..field_ident_4 });
+        assert_eq!(
+            read_ident_4,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_4
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         // end contained struct
         let read_ident_6 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_6,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_6,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
         assert_success!(i_prot.read_struct_end());
 
         // end containing struct
         let read_ident_7 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_7,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_7,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
         assert_success!(i_prot.read_struct_end());
     }
 
@@ -1527,7 +1756,7 @@
         // last field of the containing struct is a full write
         // first field of the the contained struct is a full write
 
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         // start containing struct
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -1563,17 +1792,18 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        // get bytes written
-        let buf = trans.borrow_mut().write_buffer_to_vec();
+        let expected: [u8; 8] = [
+            0x16, /* field delta (1) | field type */
+            0x08, /* field type */
+            0x2A, /* zig-zag varint field id */
+            0x07, /* field type */
+            0x2A, /* zig-zag varint field id */
+            0x63, /* field delta (6) | field type */
+            0x00, /* field stop - contained */
+            0x00 /* field stop - containing */,
+        ];
 
-        let expected: [u8; 8] =
-            [0x16 /* field delta (1) | field type */, 0x08 /* field type */,
-             0x2A /* zig-zag varint field id */, 0x07 /* field type */,
-             0x2A /* zig-zag varint field id */,
-             0x63 /* field delta (6) | field type */, 0x00 /* field stop - contained */,
-             0x00 /* field stop - containing */];
-
-        assert_eq!(&buf, &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
@@ -1581,7 +1811,7 @@
         // last field of the containing struct is a full write
         // first field of the the contained struct is a full write
 
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         // start containing struct
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -1621,58 +1851,82 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // read containing struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_1 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_1,
-                   TFieldIdentifier { name: None, ..field_ident_1 });
+        assert_eq!(
+            read_ident_1,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_1
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_2 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_2,
-                   TFieldIdentifier { name: None, ..field_ident_2 });
+        assert_eq!(
+            read_ident_2,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_2
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         // read contained struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_3 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_3,
-                   TFieldIdentifier { name: None, ..field_ident_3 });
+        assert_eq!(
+            read_ident_3,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_3
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         let read_ident_4 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_4,
-                   TFieldIdentifier { name: None, ..field_ident_4 });
+        assert_eq!(
+            read_ident_4,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_4
+            }
+        );
         assert_success!(i_prot.read_field_end());
 
         // end contained struct
         let read_ident_6 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_6,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_6,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
         assert_success!(i_prot.read_struct_end());
 
         // end containing struct
         let read_ident_7 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_7,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_7,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
         assert_success!(i_prot.read_struct_end());
     }
 
     #[test]
     fn must_write_bool_field() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         // no bytes should be written however
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
@@ -1703,20 +1957,22 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        // get bytes written
-        let buf = trans.borrow_mut().write_buffer_to_vec();
+        let expected: [u8; 7] = [
+            0x11, /* field delta (1) | true */
+            0x82, /* field delta (8) | false */
+            0x01, /* true */
+            0x34, /* field id */
+            0x02, /* false */
+            0x5A, /* field id */
+            0x00 /* stop field */,
+        ];
 
-        let expected: [u8; 7] = [0x11 /* field delta (1) | true */,
-                                 0x82 /* field delta (8) | false */, 0x01 /* true */,
-                                 0x34 /* field id */, 0x02 /* false */,
-                                 0x5A /* field id */, 0x00 /* stop field */];
-
-        assert_eq!(&buf, &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_bool_field() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         // no bytes should be written however
         let struct_ident = TStructIdentifier::new("foo");
@@ -1752,46 +2008,68 @@
         assert_success!(o_prot.write_field_stop());
         assert_success!(o_prot.write_struct_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // read the struct back
         assert_success!(i_prot.read_struct_begin());
 
         let read_ident_1 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_1,
-                   TFieldIdentifier { name: None, ..field_ident_1 });
+        assert_eq!(
+            read_ident_1,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_1
+            }
+        );
         let read_value_1 = assert_success!(i_prot.read_bool());
         assert_eq!(read_value_1, true);
         assert_success!(i_prot.read_field_end());
 
         let read_ident_2 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_2,
-                   TFieldIdentifier { name: None, ..field_ident_2 });
+        assert_eq!(
+            read_ident_2,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_2
+            }
+        );
         let read_value_2 = assert_success!(i_prot.read_bool());
         assert_eq!(read_value_2, false);
         assert_success!(i_prot.read_field_end());
 
         let read_ident_3 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_3,
-                   TFieldIdentifier { name: None, ..field_ident_3 });
+        assert_eq!(
+            read_ident_3,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_3
+            }
+        );
         let read_value_3 = assert_success!(i_prot.read_bool());
         assert_eq!(read_value_3, true);
         assert_success!(i_prot.read_field_end());
 
         let read_ident_4 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_4,
-                   TFieldIdentifier { name: None, ..field_ident_4 });
+        assert_eq!(
+            read_ident_4,
+            TFieldIdentifier {
+                name: None,
+                ..field_ident_4
+            }
+        );
         let read_value_4 = assert_success!(i_prot.read_bool());
         assert_eq!(read_value_4, false);
         assert_success!(i_prot.read_field_end());
 
         let read_ident_5 = assert_success!(i_prot.read_field_begin());
-        assert_eq!(read_ident_5,
-                   TFieldIdentifier {
-                       name: None,
-                       field_type: TType::Stop,
-                       id: None,
-                   });
+        assert_eq!(
+            read_ident_5,
+            TFieldIdentifier {
+                name: None,
+                field_type: TType::Stop,
+                id: None,
+            }
+        );
 
         assert_success!(i_prot.read_struct_end());
     }
@@ -1799,7 +2077,7 @@
     #[test]
     #[should_panic]
     fn must_fail_if_write_field_end_without_writing_bool_value() {
-        let (_, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
         assert_success!(o_prot.write_field_begin(&TFieldIdentifier::new("foo", TType::Bool, 1)));
         o_prot.write_field_end().unwrap();
@@ -1808,7 +2086,7 @@
     #[test]
     #[should_panic]
     fn must_fail_if_write_stop_field_without_writing_bool_value() {
-        let (_, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
         assert_success!(o_prot.write_field_begin(&TFieldIdentifier::new("foo", TType::Bool, 1)));
         o_prot.write_field_stop().unwrap();
@@ -1817,7 +2095,7 @@
     #[test]
     #[should_panic]
     fn must_fail_if_write_struct_end_without_writing_bool_value() {
-        let (_, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
         assert_success!(o_prot.write_struct_begin(&TStructIdentifier::new("foo")));
         assert_success!(o_prot.write_field_begin(&TFieldIdentifier::new("foo", TType::Bool, 1)));
         o_prot.write_struct_end().unwrap();
@@ -1826,7 +2104,7 @@
     #[test]
     #[should_panic]
     fn must_fail_if_write_struct_end_without_any_fields() {
-        let (_, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
         o_prot.write_struct_end().unwrap();
     }
 
@@ -1837,24 +2115,24 @@
 
     #[test]
     fn must_write_small_sized_list_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert_success!(o_prot.write_list_begin(&TListIdentifier::new(TType::I64, 4)));
 
         let expected: [u8; 1] = [0x46 /* size | elem_type */];
 
-        assert_eq!(trans.borrow().write_buffer_as_ref(), &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_small_sized_list_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let ident = TListIdentifier::new(TType::I08, 10);
 
         assert_success!(o_prot.write_list_begin(&ident));
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let res = assert_success!(i_prot.read_list_begin());
         assert_eq!(&res, &ident);
@@ -1862,26 +2140,29 @@
 
     #[test]
     fn must_write_large_sized_list_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         let res = o_prot.write_list_begin(&TListIdentifier::new(TType::List, 9999));
         assert!(res.is_ok());
 
-        let expected: [u8; 3] = [0xF9 /* 0xF0 | elem_type */, 0x8F,
-                                 0x4E /* size as varint */];
+        let expected: [u8; 3] = [
+            0xF9, /* 0xF0 | elem_type */
+            0x8F,
+            0x4E /* size as varint */,
+        ];
 
-        assert_eq!(trans.borrow().write_buffer_as_ref(), &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_large_sized_list_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let ident = TListIdentifier::new(TType::Set, 47381);
 
         assert_success!(o_prot.write_list_begin(&ident));
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let res = assert_success!(i_prot.read_list_begin());
         assert_eq!(&res, &ident);
@@ -1894,24 +2175,24 @@
 
     #[test]
     fn must_write_small_sized_set_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert_success!(o_prot.write_set_begin(&TSetIdentifier::new(TType::Struct, 2)));
 
         let expected: [u8; 1] = [0x2C /* size | elem_type */];
 
-        assert_eq!(trans.borrow().write_buffer_as_ref(), &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_small_sized_set_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let ident = TSetIdentifier::new(TType::I16, 7);
 
         assert_success!(o_prot.write_set_begin(&ident));
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let res = assert_success!(i_prot.read_set_begin());
         assert_eq!(&res, &ident);
@@ -1919,25 +2200,29 @@
 
     #[test]
     fn must_write_large_sized_set_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert_success!(o_prot.write_set_begin(&TSetIdentifier::new(TType::Double, 23891)));
 
-        let expected: [u8; 4] = [0xF7 /* 0xF0 | elem_type */, 0xD3, 0xBA,
-                                 0x01 /* size as varint */];
+        let expected: [u8; 4] = [
+            0xF7, /* 0xF0 | elem_type */
+            0xD3,
+            0xBA,
+            0x01 /* size as varint */,
+        ];
 
-        assert_eq!(trans.borrow().write_buffer_as_ref(), &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_large_sized_set_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let ident = TSetIdentifier::new(TType::Map, 3928429);
 
         assert_success!(o_prot.write_set_begin(&ident));
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let res = assert_success!(i_prot.read_set_begin());
         assert_eq!(&res, &ident);
@@ -1950,53 +2235,58 @@
 
     #[test]
     fn must_write_zero_sized_map_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert_success!(o_prot.write_map_begin(&TMapIdentifier::new(TType::String, TType::I32, 0)));
 
         let expected: [u8; 1] = [0x00]; // since size is zero we don't write anything
 
-        assert_eq!(trans.borrow().write_buffer_as_ref(), &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_read_zero_sized_map_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         assert_success!(o_prot.write_map_begin(&TMapIdentifier::new(TType::Double, TType::I32, 0)));
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let res = assert_success!(i_prot.read_map_begin());
-        assert_eq!(&res,
-                   &TMapIdentifier {
-                       key_type: None,
-                       value_type: None,
-                       size: 0,
-                   });
+        assert_eq!(
+            &res,
+            &TMapIdentifier {
+                 key_type: None,
+                 value_type: None,
+                 size: 0,
+             }
+        );
     }
 
     #[test]
     fn must_write_map_begin() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert_success!(o_prot.write_map_begin(&TMapIdentifier::new(TType::Double, TType::String, 238)));
 
-        let expected: [u8; 3] = [0xEE, 0x01 /* size as varint */,
-                                 0x78 /* key type | val type */];
+        let expected: [u8; 3] = [
+            0xEE,
+            0x01, /* size as varint */
+            0x78 /* key type | val type */,
+        ];
 
-        assert_eq!(trans.borrow().write_buffer_as_ref(), &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_map_begin() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let ident = TMapIdentifier::new(TType::Map, TType::List, 1928349);
 
         assert_success!(o_prot.write_map_begin(&ident));
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         let res = assert_success!(i_prot.read_map_begin());
         assert_eq!(&res, &ident);
@@ -2009,23 +2299,26 @@
 
     #[test]
     fn must_write_map_with_bool_key_and_value() {
-        let (trans, _, mut o_prot) = test_objects();
+        let (_, mut o_prot) = test_objects();
 
         assert_success!(o_prot.write_map_begin(&TMapIdentifier::new(TType::Bool, TType::Bool, 1)));
         assert_success!(o_prot.write_bool(true));
         assert_success!(o_prot.write_bool(false));
         assert_success!(o_prot.write_map_end());
 
-        let expected: [u8; 4] = [0x01 /* size as varint */,
-                                 0x11 /* key type | val type */, 0x01 /* key: true */,
-                                 0x02 /* val: false */];
+        let expected: [u8; 4] = [
+            0x01, /* size as varint */
+            0x11, /* key type | val type */
+            0x01, /* key: true */
+            0x02 /* val: false */,
+        ];
 
-        assert_eq!(trans.borrow().write_buffer_as_ref(), &expected);
+        assert_eq_written_bytes!(o_prot, expected);
     }
 
     #[test]
     fn must_round_trip_map_with_bool_value() {
-        let (trans, mut i_prot, mut o_prot) = test_objects();
+        let (mut i_prot, mut o_prot) = test_objects();
 
         let map_ident = TMapIdentifier::new(TType::Bool, TType::Bool, 2);
         assert_success!(o_prot.write_map_begin(&map_ident));
@@ -2035,7 +2328,7 @@
         assert_success!(o_prot.write_bool(true));
         assert_success!(o_prot.write_map_end());
 
-        trans.borrow_mut().copy_write_buffer_to_read_buffer();
+        copy_write_buffer_to_read_buffer!(o_prot);
 
         // map header
         let rcvd_ident = assert_success!(i_prot.read_map_begin());
@@ -2058,28 +2351,30 @@
 
     #[test]
     fn must_read_map_end() {
-        let (_, mut i_prot, _) = test_objects();
+        let (mut i_prot, _) = test_objects();
         assert!(i_prot.read_map_end().is_ok()); // will blow up if we try to read from empty buffer
     }
 
-    fn test_objects<'a>
-        ()
-        -> (Rc<RefCell<Box<TBufferTransport>>>, TCompactInputProtocol<'a>, TCompactOutputProtocol<'a>)
+    fn test_objects()
+        -> (TCompactInputProtocol<ReadHalf<TBufferChannel>>,
+            TCompactOutputProtocol<WriteHalf<TBufferChannel>>)
     {
-        let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity(80, 80))));
+        let mem = TBufferChannel::with_capacity(80, 80);
 
-        let inner: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
-        let inner = Rc::new(RefCell::new(inner));
+        let (r_mem, w_mem) = mem.split().unwrap();
 
-        let i_prot = TCompactInputProtocol::new(inner.clone());
-        let o_prot = TCompactOutputProtocol::new(inner.clone());
+        let i_prot = TCompactInputProtocol::new(r_mem);
+        let o_prot = TCompactOutputProtocol::new(w_mem);
 
-        (mem, i_prot, o_prot)
+        (i_prot, o_prot)
     }
 
-    fn assert_no_write<F: FnMut(&mut TCompactOutputProtocol) -> ::Result<()>>(mut write_fn: F) {
-        let (trans, _, mut o_prot) = test_objects();
+    fn assert_no_write<F>(mut write_fn: F)
+    where
+        F: FnMut(&mut TCompactOutputProtocol<WriteHalf<TBufferChannel>>) -> ::Result<()>,
+    {
+        let (_, mut o_prot) = test_objects();
         assert!(write_fn(&mut o_prot).is_ok());
-        assert_eq!(trans.borrow().write_buffer_as_ref().len(), 0);
+        assert_eq!(o_prot.transport.write_bytes().len(), 0);
     }
 }
diff --git a/lib/rs/src/protocol/mod.rs b/lib/rs/src/protocol/mod.rs
index b230d63..4f13914 100644
--- a/lib/rs/src/protocol/mod.rs
+++ b/lib/rs/src/protocol/mod.rs
@@ -19,59 +19,77 @@
 //!
 //! # Examples
 //!
-//! Create and use a `TOutputProtocol`.
-//!
-//! ```no_run
-//! use std::cell::RefCell;
-//! use std::rc::Rc;
-//! use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, TOutputProtocol, TType};
-//! use thrift::transport::{TTcpTransport, TTransport};
-//!
-//! // create the I/O channel
-//! let mut transport = TTcpTransport::new();
-//! transport.open("127.0.0.1:9090").unwrap();
-//! let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
-//!
-//! // create the protocol to encode types into bytes
-//! let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true);
-//!
-//! // write types
-//! o_prot.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap();
-//! o_prot.write_string("foo").unwrap();
-//! o_prot.write_field_end().unwrap();
-//! ```
-//!
 //! Create and use a `TInputProtocol`.
 //!
 //! ```no_run
-//! use std::cell::RefCell;
-//! use std::rc::Rc;
 //! use thrift::protocol::{TBinaryInputProtocol, TInputProtocol};
-//! use thrift::transport::{TTcpTransport, TTransport};
+//! use thrift::transport::TTcpChannel;
 //!
 //! // create the I/O channel
-//! let mut transport = TTcpTransport::new();
-//! transport.open("127.0.0.1:9090").unwrap();
-//! let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+//! let mut channel = TTcpChannel::new();
+//! channel.open("127.0.0.1:9090").unwrap();
 //!
 //! // create the protocol to decode bytes into types
-//! let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true);
+//! let mut protocol = TBinaryInputProtocol::new(channel, true);
 //!
 //! // read types from the wire
-//! let field_identifier = i_prot.read_field_begin().unwrap();
-//! let field_contents = i_prot.read_string().unwrap();
-//! let field_end = i_prot.read_field_end().unwrap();
+//! let field_identifier = protocol.read_field_begin().unwrap();
+//! let field_contents = protocol.read_string().unwrap();
+//! let field_end = protocol.read_field_end().unwrap();
+//! ```
+//!
+//! Create and use a `TOutputProtocol`.
+//!
+//! ```no_run
+//! use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, TOutputProtocol, TType};
+//! use thrift::transport::TTcpChannel;
+//!
+//! // create the I/O channel
+//! let mut channel = TTcpChannel::new();
+//! channel.open("127.0.0.1:9090").unwrap();
+//!
+//! // create the protocol to encode types into bytes
+//! let mut protocol = TBinaryOutputProtocol::new(channel, true);
+//!
+//! // write types
+//! protocol.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap();
+//! protocol.write_string("foo").unwrap();
+//! protocol.write_field_end().unwrap();
 //! ```
 
-use std::cell::RefCell;
 use std::fmt;
 use std::fmt::{Display, Formatter};
 use std::convert::From;
-use std::rc::Rc;
 use try_from::TryFrom;
 
-use ::{ProtocolError, ProtocolErrorKind};
-use ::transport::TTransport;
+use {ProtocolError, ProtocolErrorKind};
+use transport::{TReadTransport, TWriteTransport};
+
+#[cfg(test)]
+macro_rules! assert_eq_written_bytes {
+    ($o_prot:ident, $expected_bytes:ident) => {
+        {
+            assert_eq!($o_prot.transport.write_bytes(), &$expected_bytes);
+        }
+    };
+}
+
+// FIXME: should take both read and write
+#[cfg(test)]
+macro_rules! copy_write_buffer_to_read_buffer {
+    ($o_prot:ident) => {
+        {
+            $o_prot.transport.copy_write_buffer_to_read_buffer();
+        }
+    };
+}
+
+#[cfg(test)]
+macro_rules! set_readable_bytes {
+    ($i_prot:ident, $bytes:expr) => {
+        $i_prot.transport.set_readable_bytes($bytes);
+    }
+}
 
 mod binary;
 mod compact;
@@ -107,20 +125,17 @@
 /// Create and use a `TInputProtocol`
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift::protocol::{TBinaryInputProtocol, TInputProtocol};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("127.0.0.1:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("127.0.0.1:9090").unwrap();
 ///
-/// let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true);
+/// let mut protocol = TBinaryInputProtocol::new(channel, true);
 ///
-/// let field_identifier = i_prot.read_field_begin().unwrap();
-/// let field_contents = i_prot.read_string().unwrap();
-/// let field_end = i_prot.read_field_end().unwrap();
+/// let field_identifier = protocol.read_field_begin().unwrap();
+/// let field_contents = protocol.read_string().unwrap();
+/// let field_end = protocol.read_field_end().unwrap();
 /// ```
 pub trait TInputProtocol {
     /// Read the beginning of a Thrift message.
@@ -171,10 +186,14 @@
     /// Skip a field with type `field_type` recursively up to `depth` levels.
     fn skip_till_depth(&mut self, field_type: TType, depth: i8) -> ::Result<()> {
         if depth == 0 {
-            return Err(::Error::Protocol(ProtocolError {
-                kind: ProtocolErrorKind::DepthLimit,
-                message: format!("cannot parse past {:?}", field_type),
-            }));
+            return Err(
+                ::Error::Protocol(
+                    ProtocolError {
+                        kind: ProtocolErrorKind::DepthLimit,
+                        message: format!("cannot parse past {:?}", field_type),
+                    },
+                ),
+            );
         }
 
         match field_type {
@@ -213,9 +232,11 @@
             TType::Map => {
                 let map_ident = self.read_map_begin()?;
                 for _ in 0..map_ident.size {
-                    let key_type = map_ident.key_type
+                    let key_type = map_ident
+                        .key_type
                         .expect("non-zero sized map should contain key type");
-                    let val_type = map_ident.value_type
+                    let val_type = map_ident
+                        .value_type
                         .expect("non-zero sized map should contain value type");
                     self.skip_till_depth(key_type, depth - 1)?;
                     self.skip_till_depth(val_type, depth - 1)?;
@@ -223,10 +244,14 @@
                 self.read_map_end()
             }
             u => {
-                Err(::Error::Protocol(ProtocolError {
-                    kind: ProtocolErrorKind::Unknown,
-                    message: format!("cannot skip field type {:?}", &u),
-                }))
+                Err(
+                    ::Error::Protocol(
+                        ProtocolError {
+                            kind: ProtocolErrorKind::Unknown,
+                            message: format!("cannot skip field type {:?}", &u),
+                        },
+                    ),
+                )
             }
         }
     }
@@ -259,20 +284,17 @@
 /// Create and use a `TOutputProtocol`
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, TOutputProtocol, TType};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("127.0.0.1:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("127.0.0.1:9090").unwrap();
 ///
-/// let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true);
+/// let mut protocol = TBinaryOutputProtocol::new(channel, true);
 ///
-/// o_prot.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap();
-/// o_prot.write_string("foo").unwrap();
-/// o_prot.write_field_end().unwrap();
+/// protocol.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap();
+/// protocol.write_string("foo").unwrap();
+/// protocol.write_field_end().unwrap();
 /// ```
 pub trait TOutputProtocol {
     /// Write the beginning of a Thrift message.
@@ -330,6 +352,192 @@
     fn write_byte(&mut self, b: u8) -> ::Result<()>; // FIXME: REMOVE
 }
 
+impl<P> TInputProtocol for Box<P>
+where
+    P: TInputProtocol + ?Sized,
+{
+    fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier> {
+        (**self).read_message_begin()
+    }
+
+    fn read_message_end(&mut self) -> ::Result<()> {
+        (**self).read_message_end()
+    }
+
+    fn read_struct_begin(&mut self) -> ::Result<Option<TStructIdentifier>> {
+        (**self).read_struct_begin()
+    }
+
+    fn read_struct_end(&mut self) -> ::Result<()> {
+        (**self).read_struct_end()
+    }
+
+    fn read_field_begin(&mut self) -> ::Result<TFieldIdentifier> {
+        (**self).read_field_begin()
+    }
+
+    fn read_field_end(&mut self) -> ::Result<()> {
+        (**self).read_field_end()
+    }
+
+    fn read_bool(&mut self) -> ::Result<bool> {
+        (**self).read_bool()
+    }
+
+    fn read_bytes(&mut self) -> ::Result<Vec<u8>> {
+        (**self).read_bytes()
+    }
+
+    fn read_i8(&mut self) -> ::Result<i8> {
+        (**self).read_i8()
+    }
+
+    fn read_i16(&mut self) -> ::Result<i16> {
+        (**self).read_i16()
+    }
+
+    fn read_i32(&mut self) -> ::Result<i32> {
+        (**self).read_i32()
+    }
+
+    fn read_i64(&mut self) -> ::Result<i64> {
+        (**self).read_i64()
+    }
+
+    fn read_double(&mut self) -> ::Result<f64> {
+        (**self).read_double()
+    }
+
+    fn read_string(&mut self) -> ::Result<String> {
+        (**self).read_string()
+    }
+
+    fn read_list_begin(&mut self) -> ::Result<TListIdentifier> {
+        (**self).read_list_begin()
+    }
+
+    fn read_list_end(&mut self) -> ::Result<()> {
+        (**self).read_list_end()
+    }
+
+    fn read_set_begin(&mut self) -> ::Result<TSetIdentifier> {
+        (**self).read_set_begin()
+    }
+
+    fn read_set_end(&mut self) -> ::Result<()> {
+        (**self).read_set_end()
+    }
+
+    fn read_map_begin(&mut self) -> ::Result<TMapIdentifier> {
+        (**self).read_map_begin()
+    }
+
+    fn read_map_end(&mut self) -> ::Result<()> {
+        (**self).read_map_end()
+    }
+
+    fn read_byte(&mut self) -> ::Result<u8> {
+        (**self).read_byte()
+    }
+}
+
+impl<P> TOutputProtocol for Box<P>
+where
+    P: TOutputProtocol + ?Sized,
+{
+    fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> {
+        (**self).write_message_begin(identifier)
+    }
+
+    fn write_message_end(&mut self) -> ::Result<()> {
+        (**self).write_message_end()
+    }
+
+    fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> ::Result<()> {
+        (**self).write_struct_begin(identifier)
+    }
+
+    fn write_struct_end(&mut self) -> ::Result<()> {
+        (**self).write_struct_end()
+    }
+
+    fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> ::Result<()> {
+        (**self).write_field_begin(identifier)
+    }
+
+    fn write_field_end(&mut self) -> ::Result<()> {
+        (**self).write_field_end()
+    }
+
+    fn write_field_stop(&mut self) -> ::Result<()> {
+        (**self).write_field_stop()
+    }
+
+    fn write_bool(&mut self, b: bool) -> ::Result<()> {
+        (**self).write_bool(b)
+    }
+
+    fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
+        (**self).write_bytes(b)
+    }
+
+    fn write_i8(&mut self, i: i8) -> ::Result<()> {
+        (**self).write_i8(i)
+    }
+
+    fn write_i16(&mut self, i: i16) -> ::Result<()> {
+        (**self).write_i16(i)
+    }
+
+    fn write_i32(&mut self, i: i32) -> ::Result<()> {
+        (**self).write_i32(i)
+    }
+
+    fn write_i64(&mut self, i: i64) -> ::Result<()> {
+        (**self).write_i64(i)
+    }
+
+    fn write_double(&mut self, d: f64) -> ::Result<()> {
+        (**self).write_double(d)
+    }
+
+    fn write_string(&mut self, s: &str) -> ::Result<()> {
+        (**self).write_string(s)
+    }
+
+    fn write_list_begin(&mut self, identifier: &TListIdentifier) -> ::Result<()> {
+        (**self).write_list_begin(identifier)
+    }
+
+    fn write_list_end(&mut self) -> ::Result<()> {
+        (**self).write_list_end()
+    }
+
+    fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()> {
+        (**self).write_set_begin(identifier)
+    }
+
+    fn write_set_end(&mut self) -> ::Result<()> {
+        (**self).write_set_end()
+    }
+
+    fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()> {
+        (**self).write_map_begin(identifier)
+    }
+
+    fn write_map_end(&mut self) -> ::Result<()> {
+        (**self).write_map_end()
+    }
+
+    fn flush(&mut self) -> ::Result<()> {
+        (**self).flush()
+    }
+
+    fn write_byte(&mut self, b: u8) -> ::Result<()> {
+        (**self).write_byte(b)
+    }
+}
+
 /// Helper type used by servers to create `TInputProtocol` instances for
 /// accepted client connections.
 ///
@@ -338,21 +546,27 @@
 /// Create a `TInputProtocolFactory` and use it to create a `TInputProtocol`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift::protocol::{TBinaryInputProtocolFactory, TInputProtocolFactory};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("127.0.0.1:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("127.0.0.1:9090").unwrap();
 ///
-/// let mut i_proto_factory = TBinaryInputProtocolFactory::new();
-/// let i_prot = i_proto_factory.create(transport);
+/// let factory = TBinaryInputProtocolFactory::new();
+/// let protocol = factory.create(Box::new(channel));
 /// ```
 pub trait TInputProtocolFactory {
-    /// Create a `TInputProtocol` that reads bytes from `transport`.
-    fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> Box<TInputProtocol>;
+    // Create a `TInputProtocol` that reads bytes from `transport`.
+    fn create(&self, transport: Box<TReadTransport + Send>) -> Box<TInputProtocol + Send>;
+}
+
+impl<T> TInputProtocolFactory for Box<T>
+where
+    T: TInputProtocolFactory + ?Sized,
+{
+    fn create(&self, transport: Box<TReadTransport + Send>) -> Box<TInputProtocol + Send> {
+        (**self).create(transport)
+    }
 }
 
 /// Helper type used by servers to create `TOutputProtocol` instances for
@@ -363,21 +577,27 @@
 /// Create a `TOutputProtocolFactory` and use it to create a `TOutputProtocol`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift::protocol::{TBinaryOutputProtocolFactory, TOutputProtocolFactory};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("127.0.0.1:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("127.0.0.1:9090").unwrap();
 ///
-/// let mut o_proto_factory = TBinaryOutputProtocolFactory::new();
-/// let o_prot = o_proto_factory.create(transport);
+/// let factory = TBinaryOutputProtocolFactory::new();
+/// let protocol = factory.create(Box::new(channel));
 /// ```
 pub trait TOutputProtocolFactory {
     /// Create a `TOutputProtocol` that writes bytes to `transport`.
-    fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> Box<TOutputProtocol>;
+    fn create(&self, transport: Box<TWriteTransport + Send>) -> Box<TOutputProtocol + Send>;
+}
+
+impl<T> TOutputProtocolFactory for Box<T>
+where
+    T: TOutputProtocolFactory + ?Sized,
+{
+    fn create(&self, transport: Box<TWriteTransport + Send>) -> Box<TOutputProtocol + Send> {
+        (**self).create(transport)
+    }
 }
 
 /// Thrift message identifier.
@@ -394,10 +614,11 @@
 impl TMessageIdentifier {
     /// Create a `TMessageIdentifier` for a Thrift service-call named `name`
     /// with message type `message_type` and sequence number `sequence_number`.
-    pub fn new<S: Into<String>>(name: S,
-                                message_type: TMessageType,
-                                sequence_number: i32)
-                                -> TMessageIdentifier {
+    pub fn new<S: Into<String>>(
+        name: S,
+        message_type: TMessageType,
+        sequence_number: i32,
+    ) -> TMessageIdentifier {
         TMessageIdentifier {
             name: name.into(),
             message_type: message_type,
@@ -443,9 +664,10 @@
     ///
     /// `id` should be `None` if `field_type` is `TType::Stop`.
     pub fn new<N, S, I>(name: N, field_type: TType, id: I) -> TFieldIdentifier
-        where N: Into<Option<S>>,
-              S: Into<String>,
-              I: Into<Option<i16>>
+    where
+        N: Into<Option<S>>,
+        S: Into<String>,
+        I: Into<Option<i16>>,
     {
         TFieldIdentifier {
             name: name.into().map(|n| n.into()),
@@ -510,8 +732,9 @@
     /// Create a `TMapIdentifier` for a map with `size` entries of type
     /// `key_type -> value_type`.
     pub fn new<K, V>(key_type: K, value_type: V, size: i32) -> TMapIdentifier
-        where K: Into<Option<TType>>,
-              V: Into<Option<TType>>
+    where
+        K: Into<Option<TType>>,
+        V: Into<Option<TType>>,
     {
         TMapIdentifier {
             key_type: key_type.into(),
@@ -565,10 +788,14 @@
             0x03 => Ok(TMessageType::Exception),
             0x04 => Ok(TMessageType::OneWay),
             unkn => {
-                Err(::Error::Protocol(ProtocolError {
-                    kind: ProtocolErrorKind::InvalidData,
-                    message: format!("cannot convert {} to TMessageType", unkn),
-                }))
+                Err(
+                    ::Error::Protocol(
+                        ProtocolError {
+                            kind: ProtocolErrorKind::InvalidData,
+                            message: format!("cannot convert {} to TMessageType", unkn),
+                        },
+                    ),
+                )
             }
         }
     }
@@ -642,10 +869,14 @@
     if expected == actual {
         Ok(())
     } else {
-        Err(::Error::Application(::ApplicationError {
-            kind: ::ApplicationErrorKind::BadSequenceId,
-            message: format!("expected {} got {}", expected, actual),
-        }))
+        Err(
+            ::Error::Application(
+                ::ApplicationError {
+                    kind: ::ApplicationErrorKind::BadSequenceId,
+                    message: format!("expected {} got {}", expected, actual),
+                },
+            ),
+        )
     }
 }
 
@@ -657,10 +888,14 @@
     if expected == actual {
         Ok(())
     } else {
-        Err(::Error::Application(::ApplicationError {
-            kind: ::ApplicationErrorKind::WrongMethodName,
-            message: format!("expected {} got {}", expected, actual),
-        }))
+        Err(
+            ::Error::Application(
+                ::ApplicationError {
+                    kind: ::ApplicationErrorKind::WrongMethodName,
+                    message: format!("expected {} got {}", expected, actual),
+                },
+            ),
+        )
     }
 }
 
@@ -672,10 +907,14 @@
     if expected == actual {
         Ok(())
     } else {
-        Err(::Error::Application(::ApplicationError {
-            kind: ::ApplicationErrorKind::InvalidMessageType,
-            message: format!("expected {} got {}", expected, actual),
-        }))
+        Err(
+            ::Error::Application(
+                ::ApplicationError {
+                    kind: ::ApplicationErrorKind::InvalidMessageType,
+                    message: format!("expected {} got {}", expected, actual),
+                },
+            ),
+        )
     }
 }
 
@@ -686,10 +925,14 @@
     match *field {
         Some(_) => Ok(()),
         None => {
-            Err(::Error::Protocol(::ProtocolError {
-                kind: ::ProtocolErrorKind::Unknown,
-                message: format!("missing required field {}", field_name),
-            }))
+            Err(
+                ::Error::Protocol(
+                    ::ProtocolError {
+                        kind: ::ProtocolErrorKind::Unknown,
+                        message: format!("missing required field {}", field_name),
+                    },
+                ),
+            )
         }
     }
 }
@@ -700,10 +943,67 @@
 ///
 /// Return `TFieldIdentifier.id` if an id exists, `Err` otherwise.
 pub fn field_id(field_ident: &TFieldIdentifier) -> ::Result<i16> {
-    field_ident.id.ok_or_else(|| {
-        ::Error::Protocol(::ProtocolError {
-            kind: ::ProtocolErrorKind::Unknown,
-            message: format!("missing field in in {:?}", field_ident),
-        })
-    })
+    field_ident
+        .id
+        .ok_or_else(
+            || {
+                ::Error::Protocol(
+                    ::ProtocolError {
+                        kind: ::ProtocolErrorKind::Unknown,
+                        message: format!("missing field in in {:?}", field_ident),
+                    },
+                )
+            },
+        )
+}
+
+#[cfg(test)]
+mod tests {
+
+    use std::io::Cursor;
+
+    use super::*;
+    use transport::{TReadTransport, TWriteTransport};
+
+    #[test]
+    fn must_create_usable_input_protocol_from_concrete_input_protocol() {
+        let r: Box<TReadTransport> = Box::new(Cursor::new([0, 1, 2]));
+        let mut t = TCompactInputProtocol::new(r);
+        takes_input_protocol(&mut t)
+    }
+
+    #[test]
+    fn must_create_usable_input_protocol_from_boxed_input() {
+        let r: Box<TReadTransport> = Box::new(Cursor::new([0, 1, 2]));
+        let mut t: Box<TInputProtocol> = Box::new(TCompactInputProtocol::new(r));
+        takes_input_protocol(&mut t)
+    }
+
+    #[test]
+    fn must_create_usable_output_protocol_from_concrete_output_protocol() {
+        let w: Box<TWriteTransport> = Box::new(vec![0u8; 10]);
+        let mut t = TCompactOutputProtocol::new(w);
+        takes_output_protocol(&mut t)
+    }
+
+    #[test]
+    fn must_create_usable_output_protocol_from_boxed_output() {
+        let w: Box<TWriteTransport> = Box::new(vec![0u8; 10]);
+        let mut t: Box<TOutputProtocol> = Box::new(TCompactOutputProtocol::new(w));
+        takes_output_protocol(&mut t)
+    }
+
+    fn takes_input_protocol<R>(t: &mut R)
+    where
+        R: TInputProtocol,
+    {
+        t.read_byte().unwrap();
+    }
+
+    fn takes_output_protocol<W>(t: &mut W)
+    where
+        W: TOutputProtocol,
+    {
+        t.flush().unwrap();
+    }
 }
diff --git a/lib/rs/src/protocol/multiplexed.rs b/lib/rs/src/protocol/multiplexed.rs
index a30aca8..db08027 100644
--- a/lib/rs/src/protocol/multiplexed.rs
+++ b/lib/rs/src/protocol/multiplexed.rs
@@ -37,33 +37,37 @@
 /// Create and use a `TMultiplexedOutputProtocol`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol};
 /// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("localhost:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("localhost:9090").unwrap();
 ///
-/// let o_prot = TBinaryOutputProtocol::new(transport, true);
-/// let mut o_prot = TMultiplexedOutputProtocol::new("service_name", Box::new(o_prot));
+/// let protocol = TBinaryOutputProtocol::new(channel, true);
+/// let mut protocol = TMultiplexedOutputProtocol::new("service_name", protocol);
 ///
 /// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1);
-/// o_prot.write_message_begin(&ident).unwrap();
+/// protocol.write_message_begin(&ident).unwrap();
 /// ```
-pub struct TMultiplexedOutputProtocol<'a> {
+#[derive(Debug)]
+pub struct TMultiplexedOutputProtocol<P>
+where
+    P: TOutputProtocol,
+{
     service_name: String,
-    inner: Box<TOutputProtocol + 'a>,
+    inner: P,
 }
 
-impl<'a> TMultiplexedOutputProtocol<'a> {
+impl<P> TMultiplexedOutputProtocol<P>
+where
+    P: TOutputProtocol,
+{
     /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages
     /// as originating from a service named `service_name` and sends them over
     /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent
     /// by `wrapped`, not by this instance.
-    pub fn new(service_name: &str, wrapped: Box<TOutputProtocol + 'a>) -> TMultiplexedOutputProtocol<'a> {
+    pub fn new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P> {
         TMultiplexedOutputProtocol {
             service_name: service_name.to_owned(),
             inner: wrapped,
@@ -72,7 +76,10 @@
 }
 
 // FIXME: avoid passthrough methods
-impl<'a> TOutputProtocol for TMultiplexedOutputProtocol<'a> {
+impl<P> TOutputProtocol for TMultiplexedOutputProtocol<P>
+where
+    P: TOutputProtocol,
+{
     fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> {
         match identifier.message_type { // FIXME: is there a better way to override identifier here?
             TMessageType::Call | TMessageType::OneWay => {
@@ -181,39 +188,50 @@
 #[cfg(test)]
 mod tests {
 
-    use std::cell::RefCell;
-    use std::rc::Rc;
-
-    use ::protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
-    use ::transport::{TPassThruTransport, TTransport};
-    use ::transport::mem::TBufferTransport;
+    use protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
+    use transport::{TBufferChannel, TIoChannel, WriteHalf};
 
     use super::*;
 
     #[test]
     fn must_write_message_begin_with_prefixed_service_name() {
-        let (trans, mut o_prot) = test_objects();
+        let mut o_prot = test_objects();
 
         let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2);
         assert_success!(o_prot.write_message_begin(&ident));
 
-        let expected: [u8; 19] =
-            [0x80, 0x01 /* protocol identifier */, 0x00, 0x01 /* message type */, 0x00,
-             0x00, 0x00, 0x07, 0x66, 0x6F, 0x6F /* "foo" */, 0x3A /* ":" */, 0x62, 0x61,
-             0x72 /* "bar" */, 0x00, 0x00, 0x00, 0x02 /* sequence number */];
+        let expected: [u8; 19] = [
+            0x80,
+            0x01, /* protocol identifier */
+            0x00,
+            0x01, /* message type */
+            0x00,
+            0x00,
+            0x00,
+            0x07,
+            0x66,
+            0x6F,
+            0x6F, /* "foo" */
+            0x3A, /* ":" */
+            0x62,
+            0x61,
+            0x72, /* "bar" */
+            0x00,
+            0x00,
+            0x00,
+            0x02 /* sequence number */,
+        ];
 
-        assert_eq!(&trans.borrow().write_buffer_to_vec(), &expected);
+        assert_eq!(o_prot.inner.transport.write_bytes(), expected);
     }
 
-    fn test_objects<'a>() -> (Rc<RefCell<Box<TBufferTransport>>>, TMultiplexedOutputProtocol<'a>) {
-        let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity(40, 40))));
-
-        let inner: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
-        let inner = Rc::new(RefCell::new(inner));
-
-        let o_prot = TBinaryOutputProtocol::new(inner.clone(), true);
-        let o_prot = TMultiplexedOutputProtocol::new("foo", Box::new(o_prot));
-
-        (mem, o_prot)
+    fn test_objects
+        ()
+        -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>>
+    {
+        let c = TBufferChannel::with_capacity(40, 40);
+        let (_, w_chan) = c.split().unwrap();
+        let prot = TBinaryOutputProtocol::new(w_chan, true);
+        TMultiplexedOutputProtocol::new("foo", prot)
     }
 }
diff --git a/lib/rs/src/protocol/stored.rs b/lib/rs/src/protocol/stored.rs
index 6826c00..b3f305f 100644
--- a/lib/rs/src/protocol/stored.rs
+++ b/lib/rs/src/protocol/stored.rs
@@ -17,8 +17,8 @@
 
 use std::convert::Into;
 
-use ::ProtocolErrorKind;
-use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TInputProtocol,
+use ProtocolErrorKind;
+use super::{TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier, TMessageIdentifier,
             TSetIdentifier, TStructIdentifier};
 
 /// `TInputProtocol` required to use a `TMultiplexedProcessor`.
@@ -40,35 +40,34 @@
 /// Create and use a `TStoredInputProtocol`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
 /// use thrift;
 /// use thrift::protocol::{TInputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
 /// use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TStoredInputProtocol};
 /// use thrift::server::TProcessor;
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::{TIoChannel, TTcpChannel};
 ///
 /// // sample processor
 /// struct ActualProcessor;
 /// impl TProcessor for ActualProcessor {
 ///     fn process(
-///         &mut self,
+///         &self,
 ///         _: &mut TInputProtocol,
 ///         _: &mut TOutputProtocol
 ///     ) -> thrift::Result<()> {
 ///         unimplemented!()
 ///     }
 /// }
-/// let mut processor = ActualProcessor {};
+/// let processor = ActualProcessor {};
 ///
 /// // construct the shared transport
-/// let mut transport = TTcpTransport::new();
-/// transport.open("localhost:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("localhost:9090").unwrap();
+///
+/// let (i_chan, o_chan) = channel.split().unwrap();
 ///
 /// // construct the actual input and output protocols
-/// let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true);
-/// let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true);
+/// let mut i_prot = TBinaryInputProtocol::new(i_chan, true);
+/// let mut o_prot = TBinaryOutputProtocol::new(o_chan, true);
 ///
 /// // message identifier received from remote and modified to remove the service name
 /// let new_msg_ident = TMessageIdentifier::new("service_call", TMessageType::Call, 1);
@@ -77,6 +76,7 @@
 /// let mut proxy_i_prot = TStoredInputProtocol::new(&mut i_prot, new_msg_ident);
 /// let res = processor.process(&mut proxy_i_prot, &mut o_prot);
 /// ```
+// FIXME: implement Debug
 pub struct TStoredInputProtocol<'a> {
     inner: &'a mut TInputProtocol,
     message_ident: Option<TMessageIdentifier>,
@@ -88,9 +88,10 @@
     /// `TInputProtocol`. `message_ident` is the modified message identifier -
     /// with service name stripped - that will be passed to
     /// `wrapped.read_message_begin(...)`.
-    pub fn new(wrapped: &mut TInputProtocol,
-               message_ident: TMessageIdentifier)
-               -> TStoredInputProtocol {
+    pub fn new(
+        wrapped: &mut TInputProtocol,
+        message_ident: TMessageIdentifier,
+    ) -> TStoredInputProtocol {
         TStoredInputProtocol {
             inner: wrapped,
             message_ident: message_ident.into(),
@@ -100,10 +101,16 @@
 
 impl<'a> TInputProtocol for TStoredInputProtocol<'a> {
     fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier> {
-        self.message_ident.take().ok_or_else(|| {
-            ::errors::new_protocol_error(ProtocolErrorKind::Unknown,
-                                         "message identifier already read")
-        })
+        self.message_ident
+            .take()
+            .ok_or_else(
+                || {
+                    ::errors::new_protocol_error(
+                        ProtocolErrorKind::Unknown,
+                        "message identifier already read",
+                    )
+                },
+            )
     }
 
     fn read_message_end(&mut self) -> ::Result<()> {
diff --git a/lib/rs/src/server/mod.rs b/lib/rs/src/server/mod.rs
index ceac18a..21c392c 100644
--- a/lib/rs/src/server/mod.rs
+++ b/lib/rs/src/server/mod.rs
@@ -15,15 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Types required to implement a Thrift server.
+//! Types used to implement a Thrift server.
 
-use ::protocol::{TInputProtocol, TOutputProtocol};
+use protocol::{TInputProtocol, TOutputProtocol};
 
-mod simple;
 mod multiplexed;
+mod threaded;
 
-pub use self::simple::TSimpleServer;
 pub use self::multiplexed::TMultiplexedProcessor;
+pub use self::threaded::TServer;
 
 /// Handles incoming Thrift messages and dispatches them to the user-defined
 /// handler functions.
@@ -56,14 +56,14 @@
 ///
 /// // `TProcessor` implementation for `SimpleService`
 /// impl TProcessor for SimpleServiceSyncProcessor {
-///     fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
+///     fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
 ///         unimplemented!();
 ///     }
 /// }
 ///
 /// // service functions for SimpleService
 /// trait SimpleServiceSyncHandler {
-///     fn service_call(&mut self) -> thrift::Result<()>;
+///     fn service_call(&self) -> thrift::Result<()>;
 /// }
 ///
 /// //
@@ -73,7 +73,7 @@
 /// // define a handler that will be invoked when `service_call` is received
 /// struct SimpleServiceHandlerImpl;
 /// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
-///     fn service_call(&mut self) -> thrift::Result<()> {
+///     fn service_call(&self) -> thrift::Result<()> {
 ///         unimplemented!();
 ///     }
 /// }
@@ -82,7 +82,7 @@
 /// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
 ///
 /// // at this point you can pass the processor to the server
-/// // let server = TSimpleServer::new(..., processor);
+/// // let server = TServer::new(..., processor);
 /// ```
 pub trait TProcessor {
     /// Process a Thrift service call.
@@ -91,5 +91,5 @@
     /// the response to `o`.
     ///
     /// Returns `()` if the handler was executed; `Err` otherwise.
-    fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>;
+    fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>;
 }
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
index d2314a1..b1243a8 100644
--- a/lib/rs/src/server/multiplexed.rs
+++ b/lib/rs/src/server/multiplexed.rs
@@ -17,9 +17,10 @@
 
 use std::collections::HashMap;
 use std::convert::Into;
+use std::sync::{Arc, Mutex};
 
-use ::{new_application_error, ApplicationErrorKind};
-use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
+use {ApplicationErrorKind, new_application_error};
+use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
 
 use super::TProcessor;
 
@@ -33,8 +34,9 @@
 ///
 /// A `TMultiplexedProcessor` can only handle messages sent by a
 /// `TMultiplexedOutputProtocol`.
+// FIXME: implement Debug
 pub struct TMultiplexedProcessor {
-    processors: HashMap<String, Box<TProcessor>>,
+    processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>,
 }
 
 impl TMultiplexedProcessor {
@@ -46,46 +48,62 @@
     /// Return `false` if a mapping previously existed (the previous mapping is
     /// *not* overwritten).
     #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
-    pub fn register_processor<S: Into<String>>(&mut self,
-                                               service_name: S,
-                                               processor: Box<TProcessor>)
-                                               -> bool {
+    pub fn register_processor<S: Into<String>>(
+        &mut self,
+        service_name: S,
+        processor: Box<TProcessor>,
+    ) -> bool {
+        let mut processors = self.processors.lock().unwrap();
+
         let name = service_name.into();
-        if self.processors.contains_key(&name) {
+        if processors.contains_key(&name) {
             false
         } else {
-            self.processors.insert(name, processor);
+            processors.insert(name, Arc::new(processor));
             true
         }
     }
 }
 
 impl TProcessor for TMultiplexedProcessor {
-    fn process(&mut self,
-               i_prot: &mut TInputProtocol,
-               o_prot: &mut TOutputProtocol)
-               -> ::Result<()> {
+    fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
         let msg_ident = i_prot.read_message_begin()?;
-        let sep_index = msg_ident.name
+        let sep_index = msg_ident
+            .name
             .find(':')
-            .ok_or_else(|| {
-                new_application_error(ApplicationErrorKind::Unknown,
-                                      "no service separator found in incoming message")
-            })?;
+            .ok_or_else(
+                || {
+                    new_application_error(
+                        ApplicationErrorKind::Unknown,
+                        "no service separator found in incoming message",
+                    )
+                },
+            )?;
 
         let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
 
-        match self.processors.get_mut(svc_name) {
-            Some(ref mut processor) => {
-                let new_msg_ident = TMessageIdentifier::new(svc_call,
-                                                            msg_ident.message_type,
-                                                            msg_ident.sequence_number);
+        let processor: Option<Arc<Box<TProcessor>>> = {
+            let processors = self.processors.lock().unwrap();
+            processors.get(svc_name).cloned()
+        };
+
+        match processor {
+            Some(arc) => {
+                let new_msg_ident = TMessageIdentifier::new(
+                    svc_call,
+                    msg_ident.message_type,
+                    msg_ident.sequence_number,
+                );
                 let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
-                processor.process(&mut proxy_i_prot, o_prot)
+                (*arc).process(&mut proxy_i_prot, o_prot)
             }
             None => {
-                Err(new_application_error(ApplicationErrorKind::Unknown,
-                                          format!("no processor found for service {}", svc_name)))
+                Err(
+                    new_application_error(
+                        ApplicationErrorKind::Unknown,
+                        format!("no processor found for service {}", svc_name),
+                    ),
+                )
             }
         }
     }
diff --git a/lib/rs/src/server/simple.rs b/lib/rs/src/server/simple.rs
deleted file mode 100644
index 89ed977..0000000
--- a/lib/rs/src/server/simple.rs
+++ /dev/null
@@ -1,189 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::cell::RefCell;
-use std::net::{TcpListener, TcpStream};
-use std::rc::Rc;
-
-use ::{ApplicationError, ApplicationErrorKind};
-use ::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
-use ::transport::{TTcpTransport, TTransport, TTransportFactory};
-
-use super::TProcessor;
-
-/// Single-threaded blocking Thrift socket server.
-///
-/// A `TSimpleServer` listens on a given address and services accepted
-/// connections *synchronously* and *sequentially* - i.e. in a blocking manner,
-/// one at a time - on the main thread. Each accepted connection has an input
-/// half and an output half, each of which uses a `TTransport` and `TProtocol`
-/// to translate messages to and from byes. Any combination of `TProtocol` and
-/// `TTransport` may be used.
-///
-/// # Examples
-///
-/// Creating and running a `TSimpleServer` using Thrift-compiler-generated
-/// service code.
-///
-/// ```no_run
-/// use thrift;
-/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
-/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
-/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
-/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory};
-/// use thrift::server::{TProcessor, TSimpleServer};
-///
-/// //
-/// // auto-generated
-/// //
-///
-/// // processor for `SimpleService`
-/// struct SimpleServiceSyncProcessor;
-/// impl SimpleServiceSyncProcessor {
-///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
-///         unimplemented!();
-///     }
-/// }
-///
-/// // `TProcessor` implementation for `SimpleService`
-/// impl TProcessor for SimpleServiceSyncProcessor {
-///     fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
-///         unimplemented!();
-///     }
-/// }
-///
-/// // service functions for SimpleService
-/// trait SimpleServiceSyncHandler {
-///     fn service_call(&mut self) -> thrift::Result<()>;
-/// }
-///
-/// //
-/// // user-code follows
-/// //
-///
-/// // define a handler that will be invoked when `service_call` is received
-/// struct SimpleServiceHandlerImpl;
-/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
-///     fn service_call(&mut self) -> thrift::Result<()> {
-///         unimplemented!();
-///     }
-/// }
-///
-/// // instantiate the processor
-/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
-///
-/// // instantiate the server
-/// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
-/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
-/// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
-/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
-///
-/// let mut server = TSimpleServer::new(
-///     i_tr_fact,
-///     i_pr_fact,
-///     o_tr_fact,
-///     o_pr_fact,
-///     processor
-/// );
-///
-/// // start listening for incoming connections
-/// match server.listen("127.0.0.1:8080") {
-///   Ok(_)  => println!("listen completed"),
-///   Err(e) => println!("listen failed with error {:?}", e),
-/// }
-/// ```
-pub struct TSimpleServer<PR: TProcessor> {
-    i_trans_factory: Box<TTransportFactory>,
-    i_proto_factory: Box<TInputProtocolFactory>,
-    o_trans_factory: Box<TTransportFactory>,
-    o_proto_factory: Box<TOutputProtocolFactory>,
-    processor: PR,
-}
-
-impl<PR: TProcessor> TSimpleServer<PR> {
-    /// Create a `TSimpleServer`.
-    ///
-    /// Each accepted connection has an input and output half, each of which
-    /// requires a `TTransport` and `TProtocol`. `TSimpleServer` uses
-    /// `input_transport_factory` and `input_protocol_factory` to create
-    /// implementations for the input, and `output_transport_factory` and
-    /// `output_protocol_factory` to create implementations for the output.
-    pub fn new(input_transport_factory: Box<TTransportFactory>,
-               input_protocol_factory: Box<TInputProtocolFactory>,
-               output_transport_factory: Box<TTransportFactory>,
-               output_protocol_factory: Box<TOutputProtocolFactory>,
-               processor: PR)
-               -> TSimpleServer<PR> {
-        TSimpleServer {
-            i_trans_factory: input_transport_factory,
-            i_proto_factory: input_protocol_factory,
-            o_trans_factory: output_transport_factory,
-            o_proto_factory: output_protocol_factory,
-            processor: processor,
-        }
-    }
-
-    /// Listen for incoming connections on `listen_address`.
-    ///
-    /// `listen_address` should be in the form `host:port`,
-    /// for example: `127.0.0.1:8080`.
-    ///
-    /// Return `()` if successful.
-    ///
-    /// Return `Err` when the server cannot bind to `listen_address` or there
-    /// is an unrecoverable error.
-    pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
-        let listener = TcpListener::bind(listen_address)?;
-        for stream in listener.incoming() {
-            match stream {
-                Ok(s) => self.handle_incoming_connection(s),
-                Err(e) => warn!("failed to accept remote connection with error {:?}", e),
-            }
-        }
-
-        Err(::Error::Application(ApplicationError {
-            kind: ApplicationErrorKind::Unknown,
-            message: "aborted listen loop".into(),
-        }))
-    }
-
-    fn handle_incoming_connection(&mut self, stream: TcpStream) {
-        // create the shared tcp stream
-        let stream = TTcpTransport::with_stream(stream);
-        let stream: Box<TTransport> = Box::new(stream);
-        let stream = Rc::new(RefCell::new(stream));
-
-        // input protocol and transport
-        let i_tran = self.i_trans_factory.create(stream.clone());
-        let i_tran = Rc::new(RefCell::new(i_tran));
-        let mut i_prot = self.i_proto_factory.create(i_tran);
-
-        // output protocol and transport
-        let o_tran = self.o_trans_factory.create(stream.clone());
-        let o_tran = Rc::new(RefCell::new(o_tran));
-        let mut o_prot = self.o_proto_factory.create(o_tran);
-
-        // process loop
-        loop {
-            let r = self.processor.process(&mut *i_prot, &mut *o_prot);
-            if let Err(e) = r {
-                warn!("processor failed with error: {:?}", e);
-                break; // FIXME: close here
-            }
-        }
-    }
-}
diff --git a/lib/rs/src/server/threaded.rs b/lib/rs/src/server/threaded.rs
new file mode 100644
index 0000000..a486c5a
--- /dev/null
+++ b/lib/rs/src/server/threaded.rs
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::net::{TcpListener, TcpStream};
+use std::sync::Arc;
+use threadpool::ThreadPool;
+
+use {ApplicationError, ApplicationErrorKind};
+use protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
+use transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
+
+use super::TProcessor;
+
+/// Fixed-size thread-pool blocking Thrift server.
+///
+/// A `TServer` listens on a given address and submits accepted connections
+/// to an **unbounded** queue. Connections from this queue are serviced by
+/// the first available worker thread from a **fixed-size** thread pool. Each
+/// accepted connection is handled by that worker thread, and communication
+/// over this thread occurs sequentially and synchronously (i.e. calls block).
+/// Accepted connections have an input half and an output half, each of which
+/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
+/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
+/// and `TTransport` may be used.
+///
+/// # Examples
+///
+/// Creating and running a `TServer` using Thrift-compiler-generated
+/// service code.
+///
+/// ```no_run
+/// use thrift;
+/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
+/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
+/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
+/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory, TReadTransportFactory, TWriteTransportFactory};
+/// use thrift::server::{TProcessor, TServer};
+///
+/// //
+/// // auto-generated
+/// //
+///
+/// // processor for `SimpleService`
+/// struct SimpleServiceSyncProcessor;
+/// impl SimpleServiceSyncProcessor {
+///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // `TProcessor` implementation for `SimpleService`
+/// impl TProcessor for SimpleServiceSyncProcessor {
+///     fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // service functions for SimpleService
+/// trait SimpleServiceSyncHandler {
+///     fn service_call(&self) -> thrift::Result<()>;
+/// }
+///
+/// //
+/// // user-code follows
+/// //
+///
+/// // define a handler that will be invoked when `service_call` is received
+/// struct SimpleServiceHandlerImpl;
+/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
+///     fn service_call(&self) -> thrift::Result<()> {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // instantiate the processor
+/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
+///
+/// // instantiate the server
+/// let i_tr_fact: Box<TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
+/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
+/// let o_tr_fact: Box<TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
+/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
+///
+/// let mut server = TServer::new(
+///     i_tr_fact,
+///     i_pr_fact,
+///     o_tr_fact,
+///     o_pr_fact,
+///     processor,
+///     10
+/// );
+///
+/// // start listening for incoming connections
+/// match server.listen("127.0.0.1:8080") {
+///   Ok(_)  => println!("listen completed"),
+///   Err(e) => println!("listen failed with error {:?}", e),
+/// }
+/// ```
+#[derive(Debug)]
+pub struct TServer<PRC, RTF, IPF, WTF, OPF>
+where
+    PRC: TProcessor + Send + Sync + 'static,
+    RTF: TReadTransportFactory + 'static,
+    IPF: TInputProtocolFactory + 'static,
+    WTF: TWriteTransportFactory + 'static,
+    OPF: TOutputProtocolFactory + 'static,
+{
+    r_trans_factory: RTF,
+    i_proto_factory: IPF,
+    w_trans_factory: WTF,
+    o_proto_factory: OPF,
+    processor: Arc<PRC>,
+    worker_pool: ThreadPool,
+}
+
+impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
+    where PRC: TProcessor + Send + Sync + 'static,
+          RTF: TReadTransportFactory + 'static,
+          IPF: TInputProtocolFactory + 'static,
+          WTF: TWriteTransportFactory + 'static,
+          OPF: TOutputProtocolFactory + 'static {
+    /// Create a `TServer`.
+    ///
+    /// Each accepted connection has an input and output half, each of which
+    /// requires a `TTransport` and `TProtocol`. `TServer` uses
+    /// `read_transport_factory` and `input_protocol_factory` to create
+    /// implementations for the input, and `write_transport_factory` and
+    /// `output_protocol_factory` to create implementations for the output.
+    pub fn new(
+        read_transport_factory: RTF,
+        input_protocol_factory: IPF,
+        write_transport_factory: WTF,
+        output_protocol_factory: OPF,
+        processor: PRC,
+        num_workers: usize,
+    ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
+        TServer {
+            r_trans_factory: read_transport_factory,
+            i_proto_factory: input_protocol_factory,
+            w_trans_factory: write_transport_factory,
+            o_proto_factory: output_protocol_factory,
+            processor: Arc::new(processor),
+            worker_pool: ThreadPool::new_with_name(
+                "Thrift service processor".to_owned(),
+                num_workers,
+            ),
+        }
+    }
+
+    /// Listen for incoming connections on `listen_address`.
+    ///
+    /// `listen_address` should be in the form `host:port`,
+    /// for example: `127.0.0.1:8080`.
+    ///
+    /// Return `()` if successful.
+    ///
+    /// Return `Err` when the server cannot bind to `listen_address` or there
+    /// is an unrecoverable error.
+    pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
+        let listener = TcpListener::bind(listen_address)?;
+        for stream in listener.incoming() {
+            match stream {
+                Ok(s) => {
+                    let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
+                    let processor = self.processor.clone();
+                    self.worker_pool
+                        .execute(move || handle_incoming_connection(processor, i_prot, o_prot),);
+                }
+                Err(e) => {
+                    warn!("failed to accept remote connection with error {:?}", e);
+                }
+            }
+        }
+
+        Err(
+            ::Error::Application(
+                ApplicationError {
+                    kind: ApplicationErrorKind::Unknown,
+                    message: "aborted listen loop".into(),
+                },
+            ),
+        )
+    }
+
+
+    fn new_protocols_for_connection(
+        &mut self,
+        stream: TcpStream,
+    ) -> ::Result<(Box<TInputProtocol + Send>, Box<TOutputProtocol + Send>)> {
+        // create the shared tcp stream
+        let channel = TTcpChannel::with_stream(stream);
+
+        // split it into two - one to be owned by the
+        // input tran/proto and the other by the output
+        let (r_chan, w_chan) = channel.split()?;
+
+        // input protocol and transport
+        let r_tran = self.r_trans_factory.create(Box::new(r_chan));
+        let i_prot = self.i_proto_factory.create(r_tran);
+
+        // output protocol and transport
+        let w_tran = self.w_trans_factory.create(Box::new(w_chan));
+        let o_prot = self.o_proto_factory.create(w_tran);
+
+        Ok((i_prot, o_prot))
+    }
+}
+
+fn handle_incoming_connection<PRC>(
+    processor: Arc<PRC>,
+    i_prot: Box<TInputProtocol>,
+    o_prot: Box<TOutputProtocol>,
+) where
+    PRC: TProcessor,
+{
+    let mut i_prot = i_prot;
+    let mut o_prot = o_prot;
+    loop {
+        let r = processor.process(&mut *i_prot, &mut *o_prot);
+        if let Err(e) = r {
+            warn!("processor completed with error: {:?}", e);
+            break;
+        }
+    }
+}
diff --git a/lib/rs/src/transport/buffered.rs b/lib/rs/src/transport/buffered.rs
index 3f240d8..b588ec1 100644
--- a/lib/rs/src/transport/buffered.rs
+++ b/lib/rs/src/transport/buffered.rs
@@ -15,104 +15,94 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cell::RefCell;
 use std::cmp;
 use std::io;
 use std::io::{Read, Write};
-use std::rc::Rc;
 
-use super::{TTransport, TTransportFactory};
+use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
 
 /// Default capacity of the read buffer in bytes.
-const DEFAULT_RBUFFER_CAPACITY: usize = 4096;
+const READ_CAPACITY: usize = 4096;
 
 /// Default capacity of the write buffer in bytes..
-const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+const WRITE_CAPACITY: usize = 4096;
 
-/// Transport that communicates with endpoints using a byte stream.
+/// Transport that reads messages via an internal buffer.
 ///
-/// A `TBufferedTransport` maintains a fixed-size internal write buffer. All
-/// writes are made to this buffer and are sent to the wrapped transport only
-/// when `TTransport::flush()` is called. On a flush a fixed-length header with a
-/// count of the buffered bytes is written, followed by the bytes themselves.
-///
-/// A `TBufferedTransport` also maintains a fixed-size internal read buffer.
-/// On a call to `TTransport::read(...)` one full message - both fixed-length
-/// header and bytes - is read from the wrapped transport and buffered.
+/// A `TBufferedReadTransport` maintains a fixed-size internal read buffer.
+/// On a call to `TBufferedReadTransport::read(...)` one full message - both
+/// fixed-length header and bytes - is read from the wrapped channel and buffered.
 /// Subsequent read calls are serviced from the internal buffer until it is
 /// exhausted, at which point the next full message is read from the wrapped
-/// transport.
+/// channel.
 ///
 /// # Examples
 ///
-/// Create and use a `TBufferedTransport`.
+/// Create and use a `TBufferedReadTransport`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use std::io::{Read, Write};
-/// use thrift::transport::{TBufferedTransport, TTcpTransport, TTransport};
+/// use std::io::Read;
+/// use thrift::transport::{TBufferedReadTransport, TTcpChannel};
 ///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
 ///
-/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
-/// let mut t = TBufferedTransport::new(t);
+/// let mut t = TBufferedReadTransport::new(c);
 ///
-/// // read
 /// t.read(&mut vec![0u8; 1]).unwrap();
-///
-/// // write
-/// t.write(&[0x00]).unwrap();
-/// t.flush().unwrap();
 /// ```
-pub struct TBufferedTransport {
-    rbuf: Box<[u8]>,
-    rpos: usize,
-    rcap: usize,
-    wbuf: Vec<u8>,
-    inner: Rc<RefCell<Box<TTransport>>>,
+#[derive(Debug)]
+pub struct TBufferedReadTransport<C>
+where
+    C: Read,
+{
+    buf: Box<[u8]>,
+    pos: usize,
+    cap: usize,
+    chan: C,
 }
 
-impl TBufferedTransport {
+impl<C> TBufferedReadTransport<C>
+where
+    C: Read,
+{
     /// Create a `TBufferedTransport` with default-sized internal read and
-    /// write buffers that wraps an `inner` `TTransport`.
-    pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TBufferedTransport {
-        TBufferedTransport::with_capacity(DEFAULT_RBUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner)
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TBufferedReadTransport<C> {
+        TBufferedReadTransport::with_capacity(READ_CAPACITY, channel)
     }
 
     /// Create a `TBufferedTransport` with an internal read buffer of size
-    /// `read_buffer_capacity` and an internal write buffer of size
-    /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
-    pub fn with_capacity(read_buffer_capacity: usize,
-                         write_buffer_capacity: usize,
-                         inner: Rc<RefCell<Box<TTransport>>>)
-                         -> TBufferedTransport {
-        TBufferedTransport {
-            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
-            rpos: 0,
-            rcap: 0,
-            wbuf: Vec::with_capacity(write_buffer_capacity),
-            inner: inner,
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(read_capacity: usize, channel: C) -> TBufferedReadTransport<C> {
+        TBufferedReadTransport {
+            buf: vec![0; read_capacity].into_boxed_slice(),
+            pos: 0,
+            cap: 0,
+            chan: channel,
         }
     }
 
     fn get_bytes(&mut self) -> io::Result<&[u8]> {
-        if self.rcap - self.rpos == 0 {
-            self.rpos = 0;
-            self.rcap = self.inner.borrow_mut().read(&mut self.rbuf)?;
+        if self.cap - self.pos == 0 {
+            self.pos = 0;
+            self.cap = self.chan.read(&mut self.buf)?;
         }
 
-        Ok(&self.rbuf[self.rpos..self.rcap])
+        Ok(&self.buf[self.pos..self.cap])
     }
 
     fn consume(&mut self, consumed: usize) {
         // TODO: was a bug here += <-- test somehow
-        self.rpos = cmp::min(self.rcap, self.rpos + consumed);
+        self.pos = cmp::min(self.cap, self.pos + consumed);
     }
 }
 
-impl Read for TBufferedTransport {
+impl<C> Read for TBufferedReadTransport<C>
+where
+    C: Read,
+{
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
         let mut bytes_read = 0;
 
@@ -137,65 +127,127 @@
     }
 }
 
-impl Write for TBufferedTransport {
+/// Factory for creating instances of `TBufferedReadTransport`.
+#[derive(Default)]
+pub struct TBufferedReadTransportFactory;
+
+impl TBufferedReadTransportFactory {
+    pub fn new() -> TBufferedReadTransportFactory {
+        TBufferedReadTransportFactory {}
+    }
+}
+
+impl TReadTransportFactory for TBufferedReadTransportFactory {
+    /// Create a `TBufferedReadTransport`.
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+        Box::new(TBufferedReadTransport::new(channel))
+    }
+}
+
+/// Transport that writes messages via an internal buffer.
+///
+/// A `TBufferedWriteTransport` maintains a fixed-size internal write buffer.
+/// All writes are made to this buffer and are sent to the wrapped channel only
+/// when `TBufferedWriteTransport::flush()` is called. On a flush a fixed-length
+/// header with a count of the buffered bytes is written, followed by the bytes
+/// themselves.
+///
+/// # Examples
+///
+/// Create and use a `TBufferedWriteTransport`.
+///
+/// ```no_run
+/// use std::io::Write;
+/// use thrift::transport::{TBufferedWriteTransport, TTcpChannel};
+///
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
+///
+/// let mut t = TBufferedWriteTransport::new(c);
+///
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+#[derive(Debug)]
+pub struct TBufferedWriteTransport<C>
+where
+    C: Write,
+{
+    buf: Vec<u8>,
+    channel: C,
+}
+
+impl<C> TBufferedWriteTransport<C>
+where
+    C: Write,
+{
+    /// Create a `TBufferedTransport` with default-sized internal read and
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TBufferedWriteTransport<C> {
+        TBufferedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
+    }
+
+    /// Create a `TBufferedTransport` with an internal read buffer of size
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(write_capacity: usize, channel: C) -> TBufferedWriteTransport<C> {
+        TBufferedWriteTransport {
+            buf: Vec::with_capacity(write_capacity),
+            channel: channel,
+        }
+    }
+}
+
+impl<C> Write for TBufferedWriteTransport<C>
+where
+    C: Write,
+{
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let avail_bytes = cmp::min(buf.len(), self.wbuf.capacity() - self.wbuf.len());
-        self.wbuf.extend_from_slice(&buf[..avail_bytes]);
-        assert!(self.wbuf.len() <= self.wbuf.capacity(),
-                "copy overflowed buffer");
+        let avail_bytes = cmp::min(buf.len(), self.buf.capacity() - self.buf.len());
+        self.buf.extend_from_slice(&buf[..avail_bytes]);
+        assert!(
+            self.buf.len() <= self.buf.capacity(),
+            "copy overflowed buffer"
+        );
         Ok(avail_bytes)
     }
 
     fn flush(&mut self) -> io::Result<()> {
-        self.inner.borrow_mut().write_all(&self.wbuf)?;
-        self.inner.borrow_mut().flush()?;
-        self.wbuf.clear();
+        self.channel.write_all(&self.buf)?;
+        self.channel.flush()?;
+        self.buf.clear();
         Ok(())
     }
 }
 
-/// Factory for creating instances of `TBufferedTransport`
+/// Factory for creating instances of `TBufferedWriteTransport`.
 #[derive(Default)]
-pub struct TBufferedTransportFactory;
+pub struct TBufferedWriteTransportFactory;
 
-impl TBufferedTransportFactory {
-    /// Create a `TBufferedTransportFactory`.
-    pub fn new() -> TBufferedTransportFactory {
-        TBufferedTransportFactory {}
+impl TBufferedWriteTransportFactory {
+    pub fn new() -> TBufferedWriteTransportFactory {
+        TBufferedWriteTransportFactory {}
     }
 }
 
-impl TTransportFactory for TBufferedTransportFactory {
-    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
-        Box::new(TBufferedTransport::new(inner)) as Box<TTransport>
+impl TWriteTransportFactory for TBufferedWriteTransportFactory {
+    /// Create a `TBufferedWriteTransport`.
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+        Box::new(TBufferedWriteTransport::new(channel))
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::cell::RefCell;
     use std::io::{Read, Write};
-    use std::rc::Rc;
 
     use super::*;
-    use ::transport::{TPassThruTransport, TTransport};
-    use ::transport::mem::TBufferTransport;
-
-    macro_rules! new_transports {
-        ($wbc:expr, $rbc:expr) => (
-            {
-                let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity($wbc, $rbc))));
-                let thru: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
-                let thru = Rc::new(RefCell::new(thru));
-                (mem, thru)
-            }
-        );
-    }
+    use transport::TBufferChannel;
 
     #[test]
     fn must_return_zero_if_read_buffer_is_empty() {
-        let (_, thru) = new_transports!(10, 0);
-        let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+        let mem = TBufferChannel::with_capacity(10, 0);
+        let mut t = TBufferedReadTransport::with_capacity(10, mem);
 
         let mut b = vec![0; 10];
         let read_result = t.read(&mut b);
@@ -205,8 +257,8 @@
 
     #[test]
     fn must_return_zero_if_caller_reads_into_zero_capacity_buffer() {
-        let (_, thru) = new_transports!(10, 0);
-        let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+        let mem = TBufferChannel::with_capacity(10, 0);
+        let mut t = TBufferedReadTransport::with_capacity(10, mem);
 
         let read_result = t.read(&mut []);
 
@@ -215,10 +267,10 @@
 
     #[test]
     fn must_return_zero_if_nothing_more_can_be_read() {
-        let (mem, thru) = new_transports!(4, 0);
-        let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+        let mem = TBufferChannel::with_capacity(4, 0);
+        let mut t = TBufferedReadTransport::with_capacity(4, mem);
 
-        mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+        t.chan.set_readable_bytes(&[0, 1, 2, 3]);
 
         // read buffer is exactly the same size as bytes available
         let mut buf = vec![0u8; 4];
@@ -239,10 +291,10 @@
 
     #[test]
     fn must_fill_user_buffer_with_only_as_many_bytes_as_available() {
-        let (mem, thru) = new_transports!(4, 0);
-        let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+        let mem = TBufferChannel::with_capacity(4, 0);
+        let mut t = TBufferedReadTransport::with_capacity(4, mem);
 
-        mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+        t.chan.set_readable_bytes(&[0, 1, 2, 3]);
 
         // read buffer is much larger than the bytes available
         let mut buf = vec![0u8; 8];
@@ -268,15 +320,16 @@
 
         // we have a much smaller buffer than the
         // underlying transport has bytes available
-        let (mem, thru) = new_transports!(10, 0);
-        let mut t = TBufferedTransport::with_capacity(2, 0, thru);
+        let mem = TBufferChannel::with_capacity(10, 0);
+        let mut t = TBufferedReadTransport::with_capacity(2, mem);
 
         // fill the underlying transport's byte buffer
         let mut readable_bytes = [0u8; 10];
         for i in 0..10 {
             readable_bytes[i] = i as u8;
         }
-        mem.borrow_mut().set_readable_bytes(&readable_bytes);
+
+        t.chan.set_readable_bytes(&readable_bytes);
 
         // we ask to read into a buffer that's much larger
         // than the one the buffered transport has; as a result
@@ -312,8 +365,8 @@
 
     #[test]
     fn must_return_zero_if_nothing_can_be_written() {
-        let (_, thru) = new_transports!(0, 0);
-        let mut t = TBufferedTransport::with_capacity(0, 0, thru);
+        let mem = TBufferChannel::with_capacity(0, 0);
+        let mut t = TBufferedWriteTransport::with_capacity(0, mem);
 
         let b = vec![0; 10];
         let r = t.write(&b);
@@ -323,19 +376,20 @@
 
     #[test]
     fn must_return_zero_if_caller_calls_write_with_empty_buffer() {
-        let (mem, thru) = new_transports!(0, 10);
-        let mut t = TBufferedTransport::with_capacity(0, 10, thru);
+        let mem = TBufferChannel::with_capacity(0, 10);
+        let mut t = TBufferedWriteTransport::with_capacity(10, mem);
 
         let r = t.write(&[]);
+        let expected: [u8; 0] = [];
 
         assert_eq!(r.unwrap(), 0);
-        assert_eq!(mem.borrow_mut().write_buffer_as_ref(), &[]);
+        assert_eq_transport_written_bytes!(t, expected);
     }
 
     #[test]
     fn must_return_zero_if_write_buffer_full() {
-        let (_, thru) = new_transports!(0, 0);
-        let mut t = TBufferedTransport::with_capacity(0, 4, thru);
+        let mem = TBufferChannel::with_capacity(0, 0);
+        let mut t = TBufferedWriteTransport::with_capacity(4, mem);
 
         let b = [0x00, 0x01, 0x02, 0x03];
 
@@ -350,26 +404,22 @@
 
     #[test]
     fn must_only_write_to_inner_transport_on_flush() {
-        let (mem, thru) = new_transports!(10, 10);
-        let mut t = TBufferedTransport::new(thru);
+        let mem = TBufferChannel::with_capacity(10, 10);
+        let mut t = TBufferedWriteTransport::new(mem);
 
         let b: [u8; 5] = [0, 1, 2, 3, 4];
         assert_eq!(t.write(&b).unwrap(), 5);
-        assert_eq!(mem.borrow_mut().write_buffer_as_ref().len(), 0);
+        assert_eq_transport_num_written_bytes!(t, 0);
 
         assert!(t.flush().is_ok());
 
-        {
-            let inner = mem.borrow_mut();
-            let underlying_buffer = inner.write_buffer_as_ref();
-            assert_eq!(b, underlying_buffer);
-        }
+        assert_eq_transport_written_bytes!(t, b);
     }
 
     #[test]
     fn must_write_successfully_after_flush() {
-        let (mem, thru) = new_transports!(0, 5);
-        let mut t = TBufferedTransport::with_capacity(0, 5, thru);
+        let mem = TBufferChannel::with_capacity(0, 5);
+        let mut t = TBufferedWriteTransport::with_capacity(5, mem);
 
         // write and flush
         let b: [u8; 5] = [0, 1, 2, 3, 4];
@@ -377,24 +427,16 @@
         assert!(t.flush().is_ok());
 
         // check the flushed bytes
-        {
-            let inner = mem.borrow_mut();
-            let underlying_buffer = inner.write_buffer_as_ref();
-            assert_eq!(b, underlying_buffer);
-        }
+        assert_eq_transport_written_bytes!(t, b);
 
         // reset our underlying transport
-        mem.borrow_mut().empty_write_buffer();
+        t.channel.empty_write_buffer();
 
         // write and flush again
         assert_eq!(t.write(&b).unwrap(), 5);
         assert!(t.flush().is_ok());
 
         // check the flushed bytes
-        {
-            let inner = mem.borrow_mut();
-            let underlying_buffer = inner.write_buffer_as_ref();
-            assert_eq!(b, underlying_buffer);
-        }
+        assert_eq_transport_written_bytes!(t, b);
     }
 }
diff --git a/lib/rs/src/transport/framed.rs b/lib/rs/src/transport/framed.rs
index 75c12f4..d78d2f7 100644
--- a/lib/rs/src/transport/framed.rs
+++ b/lib/rs/src/transport/framed.rs
@@ -16,165 +16,242 @@
 // under the License.
 
 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
-use std::cell::RefCell;
 use std::cmp;
 use std::io;
 use std::io::{ErrorKind, Read, Write};
-use std::rc::Rc;
 
-use super::{TTransport, TTransportFactory};
+use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
 
 /// Default capacity of the read buffer in bytes.
-const WRITE_BUFFER_CAPACITY: usize = 4096;
+const READ_CAPACITY: usize = 4096;
 
-/// Default capacity of the write buffer in bytes..
-const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+/// Default capacity of the write buffer in bytes.
+const WRITE_CAPACITY: usize = 4096;
 
-/// Transport that communicates with endpoints using framed messages.
+/// Transport that reads framed messages.
 ///
-/// A `TFramedTransport` maintains a fixed-size internal write buffer. All
-/// writes are made to this buffer and are sent to the wrapped transport only
-/// when `TTransport::flush()` is called. On a flush a fixed-length header with a
-/// count of the buffered bytes is written, followed by the bytes themselves.
-///
-/// A `TFramedTransport` also maintains a fixed-size internal read buffer.
-/// On a call to `TTransport::read(...)` one full message - both fixed-length
-/// header and bytes - is read from the wrapped transport and buffered.
-/// Subsequent read calls are serviced from the internal buffer until it is
-/// exhausted, at which point the next full message is read from the wrapped
-/// transport.
+/// A `TFramedReadTransport` maintains a fixed-size internal read buffer.
+/// On a call to `TFramedReadTransport::read(...)` one full message - both
+/// fixed-length header and bytes - is read from the wrapped channel and
+/// buffered. Subsequent read calls are serviced from the internal buffer
+/// until it is exhausted, at which point the next full message is read
+/// from the wrapped channel.
 ///
 /// # Examples
 ///
-/// Create and use a `TFramedTransport`.
+/// Create and use a `TFramedReadTransport`.
 ///
 /// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use std::io::{Read, Write};
-/// use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+/// use std::io::Read;
+/// use thrift::transport::{TFramedReadTransport, TTcpChannel};
 ///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
 ///
-/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
-/// let mut t = TFramedTransport::new(t);
+/// let mut t = TFramedReadTransport::new(c);
 ///
-/// // read
 /// t.read(&mut vec![0u8; 1]).unwrap();
-///
-/// // write
-/// t.write(&[0x00]).unwrap();
-/// t.flush().unwrap();
 /// ```
-pub struct TFramedTransport {
-    rbuf: Box<[u8]>,
-    rpos: usize,
-    rcap: usize,
-    wbuf: Box<[u8]>,
-    wpos: usize,
-    inner: Rc<RefCell<Box<TTransport>>>,
+#[derive(Debug)]
+pub struct TFramedReadTransport<C>
+where
+    C: Read,
+{
+    buf: Box<[u8]>,
+    pos: usize,
+    cap: usize,
+    chan: C,
 }
 
-impl TFramedTransport {
+impl<C> TFramedReadTransport<C>
+where
+    C: Read,
+{
     /// Create a `TFramedTransport` with default-sized internal read and
-    /// write buffers that wraps an `inner` `TTransport`.
-    pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TFramedTransport {
-        TFramedTransport::with_capacity(WRITE_BUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner)
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TFramedReadTransport<C> {
+        TFramedReadTransport::with_capacity(READ_CAPACITY, channel)
     }
 
     /// Create a `TFramedTransport` with an internal read buffer of size
-    /// `read_buffer_capacity` and an internal write buffer of size
-    /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
-    pub fn with_capacity(read_buffer_capacity: usize,
-                         write_buffer_capacity: usize,
-                         inner: Rc<RefCell<Box<TTransport>>>)
-                         -> TFramedTransport {
-        TFramedTransport {
-            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
-            rpos: 0,
-            rcap: 0,
-            wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
-            wpos: 0,
-            inner: inner,
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(read_capacity: usize, channel: C) -> TFramedReadTransport<C> {
+        TFramedReadTransport {
+            buf: vec![0; read_capacity].into_boxed_slice(),
+            pos: 0,
+            cap: 0,
+            chan: channel,
         }
     }
 }
 
-impl Read for TFramedTransport {
+impl<C> Read for TFramedReadTransport<C>
+where
+    C: Read,
+{
     fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
-        if self.rcap - self.rpos == 0 {
-            let message_size = self.inner.borrow_mut().read_i32::<BigEndian>()? as usize;
-            if message_size > self.rbuf.len() {
-                return Err(io::Error::new(ErrorKind::Other,
-                                          format!("bytes to be read ({}) exceeds buffer \
+        if self.cap - self.pos == 0 {
+            let message_size = self.chan.read_i32::<BigEndian>()? as usize;
+            if message_size > self.buf.len() {
+                return Err(
+                    io::Error::new(
+                        ErrorKind::Other,
+                        format!(
+                            "bytes to be read ({}) exceeds buffer \
                                                    capacity ({})",
-                                                  message_size,
-                                                  self.rbuf.len())));
+                            message_size,
+                            self.buf.len()
+                        ),
+                    ),
+                );
             }
-            self.inner.borrow_mut().read_exact(&mut self.rbuf[..message_size])?;
-            self.rpos = 0;
-            self.rcap = message_size as usize;
+            self.chan.read_exact(&mut self.buf[..message_size])?;
+            self.pos = 0;
+            self.cap = message_size as usize;
         }
 
-        let nread = cmp::min(b.len(), self.rcap - self.rpos);
-        b[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
-        self.rpos += nread;
+        let nread = cmp::min(b.len(), self.cap - self.pos);
+        b[..nread].clone_from_slice(&self.buf[self.pos..self.pos + nread]);
+        self.pos += nread;
 
         Ok(nread)
     }
 }
 
-impl Write for TFramedTransport {
+/// Factory for creating instances of `TFramedReadTransport`.
+#[derive(Default)]
+pub struct TFramedReadTransportFactory;
+
+impl TFramedReadTransportFactory {
+    pub fn new() -> TFramedReadTransportFactory {
+        TFramedReadTransportFactory {}
+    }
+}
+
+impl TReadTransportFactory for TFramedReadTransportFactory {
+    /// Create a `TFramedReadTransport`.
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+        Box::new(TFramedReadTransport::new(channel))
+    }
+}
+
+/// Transport that writes framed messages.
+///
+/// A `TFramedWriteTransport` maintains a fixed-size internal write buffer. All
+/// writes are made to this buffer and are sent to the wrapped channel only
+/// when `TFramedWriteTransport::flush()` is called. On a flush a fixed-length
+/// header with a count of the buffered bytes is written, followed by the bytes
+/// themselves.
+///
+/// # Examples
+///
+/// Create and use a `TFramedWriteTransport`.
+///
+/// ```no_run
+/// use std::io::Write;
+/// use thrift::transport::{TFramedWriteTransport, TTcpChannel};
+///
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
+///
+/// let mut t = TFramedWriteTransport::new(c);
+///
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+#[derive(Debug)]
+pub struct TFramedWriteTransport<C>
+where
+    C: Write,
+{
+    buf: Box<[u8]>,
+    pos: usize,
+    channel: C,
+}
+
+impl<C> TFramedWriteTransport<C>
+where
+    C: Write,
+{
+    /// Create a `TFramedTransport` with default-sized internal read and
+    /// write buffers that wraps the given `TIoChannel`.
+    pub fn new(channel: C) -> TFramedWriteTransport<C> {
+        TFramedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
+    }
+
+    /// Create a `TFramedTransport` with an internal read buffer of size
+    /// `read_capacity` and an internal write buffer of size
+    /// `write_capacity` that wraps the given `TIoChannel`.
+    pub fn with_capacity(write_capacity: usize, channel: C) -> TFramedWriteTransport<C> {
+        TFramedWriteTransport {
+            buf: vec![0; write_capacity].into_boxed_slice(),
+            pos: 0,
+            channel: channel,
+        }
+    }
+}
+
+impl<C> Write for TFramedWriteTransport<C>
+where
+    C: Write,
+{
     fn write(&mut self, b: &[u8]) -> io::Result<usize> {
-        if b.len() > (self.wbuf.len() - self.wpos) {
-            return Err(io::Error::new(ErrorKind::Other,
-                                      format!("bytes to be written ({}) exceeds buffer \
+        if b.len() > (self.buf.len() - self.pos) {
+            return Err(
+                io::Error::new(
+                    ErrorKind::Other,
+                    format!(
+                        "bytes to be written ({}) exceeds buffer \
                                                capacity ({})",
-                                              b.len(),
-                                              self.wbuf.len() - self.wpos)));
+                        b.len(),
+                        self.buf.len() - self.pos
+                    ),
+                ),
+            );
         }
 
         let nwrite = b.len(); // always less than available write buffer capacity
-        self.wbuf[self.wpos..(self.wpos + nwrite)].clone_from_slice(b);
-        self.wpos += nwrite;
+        self.buf[self.pos..(self.pos + nwrite)].clone_from_slice(b);
+        self.pos += nwrite;
         Ok(nwrite)
     }
 
     fn flush(&mut self) -> io::Result<()> {
-        let message_size = self.wpos;
+        let message_size = self.pos;
 
         if let 0 = message_size {
             return Ok(());
         } else {
-            self.inner.borrow_mut().write_i32::<BigEndian>(message_size as i32)?;
+            self.channel
+                .write_i32::<BigEndian>(message_size as i32)?;
         }
 
         let mut byte_index = 0;
-        while byte_index < self.wpos {
-            let nwrite = self.inner.borrow_mut().write(&self.wbuf[byte_index..self.wpos])?;
-            byte_index = cmp::min(byte_index + nwrite, self.wpos);
+        while byte_index < self.pos {
+            let nwrite = self.channel.write(&self.buf[byte_index..self.pos])?;
+            byte_index = cmp::min(byte_index + nwrite, self.pos);
         }
 
-        self.wpos = 0;
-        self.inner.borrow_mut().flush()
+        self.pos = 0;
+        self.channel.flush()
     }
 }
 
-/// Factory for creating instances of `TFramedTransport`.
+/// Factory for creating instances of `TFramedWriteTransport`.
 #[derive(Default)]
-pub struct TFramedTransportFactory;
+pub struct TFramedWriteTransportFactory;
 
-impl TFramedTransportFactory {
-    // Create a `TFramedTransportFactory`.
-    pub fn new() -> TFramedTransportFactory {
-        TFramedTransportFactory {}
+impl TFramedWriteTransportFactory {
+    pub fn new() -> TFramedWriteTransportFactory {
+        TFramedWriteTransportFactory {}
     }
 }
 
-impl TTransportFactory for TFramedTransportFactory {
-    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
-        Box::new(TFramedTransport::new(inner)) as Box<TTransport>
+impl TWriteTransportFactory for TFramedWriteTransportFactory {
+    /// Create a `TFramedWriteTransport`.
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+        Box::new(TFramedWriteTransport::new(channel))
     }
 }
 
@@ -183,5 +260,5 @@
     //    use std::io::{Read, Write};
     //
     //    use super::*;
-    //    use ::transport::mem::TBufferTransport;
+    //    use ::transport::mem::TBufferChannel;
 }
diff --git a/lib/rs/src/transport/mem.rs b/lib/rs/src/transport/mem.rs
index 97ec503..86ac6bb 100644
--- a/lib/rs/src/transport/mem.rs
+++ b/lib/rs/src/transport/mem.rs
@@ -17,9 +17,11 @@
 
 use std::cmp;
 use std::io;
+use std::sync::{Arc, Mutex};
 
-/// Simple transport that contains both a fixed-length internal read buffer and
-/// a fixed-length internal write buffer.
+use super::{ReadHalf, TIoChannel, WriteHalf};
+
+/// In-memory read and write channel with fixed-size read and write buffers.
 ///
 /// On a `write` bytes are written to the internal write buffer. Writes are no
 /// longer accepted once this buffer is full. Callers must `empty_write_buffer()`
@@ -29,37 +31,61 @@
 /// `set_readable_bytes(...)`. Callers can then read until the buffer is
 /// depleted. No further reads are accepted until the internal read buffer is
 /// replenished again.
-pub struct TBufferTransport {
-    rbuf: Box<[u8]>,
-    rpos: usize,
-    ridx: usize,
-    rcap: usize,
-    wbuf: Box<[u8]>,
-    wpos: usize,
-    wcap: usize,
+#[derive(Debug)]
+pub struct TBufferChannel {
+    read: Arc<Mutex<ReadData>>,
+    write: Arc<Mutex<WriteData>>,
 }
 
-impl TBufferTransport {
-    /// Constructs a new, empty `TBufferTransport` with the given
+#[derive(Debug)]
+struct ReadData {
+    buf: Box<[u8]>,
+    pos: usize,
+    idx: usize,
+    cap: usize,
+}
+
+#[derive(Debug)]
+struct WriteData {
+    buf: Box<[u8]>,
+    pos: usize,
+    cap: usize,
+}
+
+impl TBufferChannel {
+    /// Constructs a new, empty `TBufferChannel` with the given
     /// read buffer capacity and write buffer capacity.
-    pub fn with_capacity(read_buffer_capacity: usize,
-                         write_buffer_capacity: usize)
-                         -> TBufferTransport {
-        TBufferTransport {
-            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
-            ridx: 0,
-            rpos: 0,
-            rcap: read_buffer_capacity,
-            wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
-            wpos: 0,
-            wcap: write_buffer_capacity,
+    pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel {
+        TBufferChannel {
+            read: Arc::new(
+                Mutex::new(
+                    ReadData {
+                        buf: vec![0; read_capacity].into_boxed_slice(),
+                        idx: 0,
+                        pos: 0,
+                        cap: read_capacity,
+                    },
+                ),
+            ),
+            write: Arc::new(
+                Mutex::new(
+                    WriteData {
+                        buf: vec![0; write_capacity].into_boxed_slice(),
+                        pos: 0,
+                        cap: write_capacity,
+                    },
+                ),
+            ),
         }
     }
 
-    /// Return a slice containing the bytes held by the internal read buffer.
-    /// Returns an empty slice if no readable bytes are present.
-    pub fn read_buffer(&self) -> &[u8] {
-        &self.rbuf[..self.ridx]
+    /// Return a copy of the bytes held by the internal read buffer.
+    /// Returns an empty vector if no readable bytes are present.
+    pub fn read_bytes(&self) -> Vec<u8> {
+        let rdata = self.read.as_ref().lock().unwrap();
+        let mut buf = vec![0u8; rdata.idx];
+        buf.copy_from_slice(&rdata.buf[..rdata.idx]);
+        buf
     }
 
     // FIXME: do I really need this API call?
@@ -68,8 +94,9 @@
     ///
     /// Subsequent calls to `read` will return nothing.
     pub fn empty_read_buffer(&mut self) {
-        self.rpos = 0;
-        self.ridx = 0;
+        let mut rdata = self.read.as_ref().lock().unwrap();
+        rdata.pos = 0;
+        rdata.idx = 0;
     }
 
     /// Copy bytes from the source buffer `buf` into the internal read buffer,
@@ -77,37 +104,36 @@
     /// which is `min(buf.len(), internal_read_buf.len())`.
     pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
         self.empty_read_buffer();
-        let max_bytes = cmp::min(self.rcap, buf.len());
-        self.rbuf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
-        self.ridx = max_bytes;
+        let mut rdata = self.read.as_ref().lock().unwrap();
+        let max_bytes = cmp::min(rdata.cap, buf.len());
+        rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
+        rdata.idx = max_bytes;
         max_bytes
     }
 
-    /// Return a slice containing the bytes held by the internal write buffer.
-    /// Returns an empty slice if no bytes were written.
-    pub fn write_buffer_as_ref(&self) -> &[u8] {
-        &self.wbuf[..self.wpos]
-    }
-
-    /// Return a vector with a copy of the bytes held by the internal write buffer.
+    /// Return a copy of the bytes held by the internal write buffer.
     /// Returns an empty vector if no bytes were written.
-    pub fn write_buffer_to_vec(&self) -> Vec<u8> {
-        let mut buf = vec![0u8; self.wpos];
-        buf.copy_from_slice(&self.wbuf[..self.wpos]);
+    pub fn write_bytes(&self) -> Vec<u8> {
+        let wdata = self.write.as_ref().lock().unwrap();
+        let mut buf = vec![0u8; wdata.pos];
+        buf.copy_from_slice(&wdata.buf[..wdata.pos]);
         buf
     }
 
     /// Resets the internal write buffer, making it seem like no bytes were
-    /// written. Calling `write_buffer` after this returns an empty slice.
+    /// written. Calling `write_buffer` after this returns an empty vector.
     pub fn empty_write_buffer(&mut self) {
-        self.wpos = 0;
+        let mut wdata = self.write.as_ref().lock().unwrap();
+        wdata.pos = 0;
     }
 
     /// Overwrites the contents of the read buffer with the contents of the
     /// write buffer. The write buffer is emptied after this operation.
     pub fn copy_write_buffer_to_read_buffer(&mut self) {
+        // FIXME: redo this entire method
         let buf = {
-            let b = self.write_buffer_as_ref();
+            let wdata = self.write.as_ref().lock().unwrap();
+            let b = &wdata.buf[..wdata.pos];
             let mut b_ret = vec![0; b.len()];
             b_ret.copy_from_slice(b);
             b_ret
@@ -120,20 +146,45 @@
     }
 }
 
-impl io::Read for TBufferTransport {
+impl TIoChannel for TBufferChannel {
+    fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
+    where
+        Self: Sized,
+    {
+        Ok(
+            (ReadHalf {
+                 handle: TBufferChannel {
+                     read: self.read.clone(),
+                     write: self.write.clone(),
+                 },
+             },
+             WriteHalf {
+                 handle: TBufferChannel {
+                     read: self.read.clone(),
+                     write: self.write.clone(),
+                 },
+             }),
+        )
+    }
+}
+
+impl io::Read for TBufferChannel {
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        let nread = cmp::min(buf.len(), self.ridx - self.rpos);
-        buf[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
-        self.rpos += nread;
+        let mut rdata = self.read.as_ref().lock().unwrap();
+        let nread = cmp::min(buf.len(), rdata.idx - rdata.pos);
+        buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]);
+        rdata.pos += nread;
         Ok(nread)
     }
 }
 
-impl io::Write for TBufferTransport {
+impl io::Write for TBufferChannel {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let nwrite = cmp::min(buf.len(), self.wcap - self.wpos);
-        self.wbuf[self.wpos..self.wpos + nwrite].clone_from_slice(&buf[..nwrite]);
-        self.wpos += nwrite;
+        let mut wdata = self.write.as_ref().lock().unwrap();
+        let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos);
+        let (start, end) = (wdata.pos, wdata.pos + nwrite);
+        wdata.buf[start..end].clone_from_slice(&buf[..nwrite]);
+        wdata.pos += nwrite;
         Ok(nwrite)
     }
 
@@ -146,68 +197,68 @@
 mod tests {
     use std::io::{Read, Write};
 
-    use super::TBufferTransport;
+    use super::TBufferChannel;
 
     #[test]
     fn must_empty_write_buffer() {
-        let mut t = TBufferTransport::with_capacity(0, 1);
+        let mut t = TBufferChannel::with_capacity(0, 1);
 
         let bytes_to_write: [u8; 1] = [0x01];
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 1);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
+        assert_eq!(&t.write_bytes(), &bytes_to_write);
 
         t.empty_write_buffer();
-        assert_eq!(t.write_buffer_as_ref().len(), 0);
+        assert_eq!(t.write_bytes().len(), 0);
     }
 
     #[test]
     fn must_accept_writes_after_buffer_emptied() {
-        let mut t = TBufferTransport::with_capacity(0, 2);
+        let mut t = TBufferChannel::with_capacity(0, 2);
 
         let bytes_to_write: [u8; 2] = [0x01, 0x02];
 
         // first write (all bytes written)
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 2);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
+        assert_eq!(&t.write_bytes(), &bytes_to_write);
 
         // try write again (nothing should be written)
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 0);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write); // still the same as before
+        assert_eq!(&t.write_bytes(), &bytes_to_write); // still the same as before
 
         // now reset the buffer
         t.empty_write_buffer();
-        assert_eq!(t.write_buffer_as_ref().len(), 0);
+        assert_eq!(t.write_bytes().len(), 0);
 
         // now try write again - the write should succeed
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 2);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
+        assert_eq!(&t.write_bytes(), &bytes_to_write);
     }
 
     #[test]
     fn must_accept_multiple_writes_until_buffer_is_full() {
-        let mut t = TBufferTransport::with_capacity(0, 10);
+        let mut t = TBufferChannel::with_capacity(0, 10);
 
         // first write (all bytes written)
         let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
         let write_0_result = t.write(&bytes_to_write_0);
         assert_eq!(write_0_result.unwrap(), 2);
-        assert_eq!(t.write_buffer_as_ref(), &bytes_to_write_0);
+        assert_eq!(t.write_bytes(), &bytes_to_write_0);
 
         // second write (all bytes written, starting at index 2)
         let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
         let write_1_result = t.write(&bytes_to_write_1);
         assert_eq!(write_1_result.unwrap(), 7);
-        assert_eq!(&t.write_buffer_as_ref()[2..], &bytes_to_write_1);
+        assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1);
 
         // third write (only 1 byte written - that's all we have space for)
         let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
         let write_2_result = t.write(&bytes_to_write_2);
         assert_eq!(write_2_result.unwrap(), 1);
-        assert_eq!(&t.write_buffer_as_ref()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
+        assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
 
         // fourth write (no writes are accepted)
         let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
@@ -219,50 +270,50 @@
         expected.extend_from_slice(&bytes_to_write_0);
         expected.extend_from_slice(&bytes_to_write_1);
         expected.extend_from_slice(&bytes_to_write_2[0..1]);
-        assert_eq!(t.write_buffer_as_ref(), &expected[..]);
+        assert_eq!(t.write_bytes(), &expected[..]);
     }
 
     #[test]
     fn must_empty_read_buffer() {
-        let mut t = TBufferTransport::with_capacity(1, 0);
+        let mut t = TBufferChannel::with_capacity(1, 0);
 
         let bytes_to_read: [u8; 1] = [0x01];
         let result = t.set_readable_bytes(&bytes_to_read);
         assert_eq!(result, 1);
-        assert_eq!(&t.read_buffer(), &bytes_to_read);
+        assert_eq!(t.read_bytes(), &bytes_to_read);
 
         t.empty_read_buffer();
-        assert_eq!(t.read_buffer().len(), 0);
+        assert_eq!(t.read_bytes().len(), 0);
     }
 
     #[test]
     fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
-        let mut t = TBufferTransport::with_capacity(1, 0);
+        let mut t = TBufferChannel::with_capacity(1, 0);
 
         let bytes_to_read_0: [u8; 1] = [0x01];
         let result = t.set_readable_bytes(&bytes_to_read_0);
         assert_eq!(result, 1);
-        assert_eq!(&t.read_buffer(), &bytes_to_read_0);
+        assert_eq!(t.read_bytes(), &bytes_to_read_0);
 
         t.empty_read_buffer();
-        assert_eq!(t.read_buffer().len(), 0);
+        assert_eq!(t.read_bytes().len(), 0);
 
         let bytes_to_read_1: [u8; 1] = [0x02];
         let result = t.set_readable_bytes(&bytes_to_read_1);
         assert_eq!(result, 1);
-        assert_eq!(&t.read_buffer(), &bytes_to_read_1);
+        assert_eq!(t.read_bytes(), &bytes_to_read_1);
     }
 
     #[test]
     fn must_accept_multiple_reads_until_all_bytes_read() {
-        let mut t = TBufferTransport::with_capacity(10, 0);
+        let mut t = TBufferChannel::with_capacity(10, 0);
 
         let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
 
         // check that we're able to set the bytes to be read
         let result = t.set_readable_bytes(&readable_bytes);
         assert_eq!(result, 10);
-        assert_eq!(&t.read_buffer(), &readable_bytes);
+        assert_eq!(t.read_bytes(), &readable_bytes);
 
         // first read
         let mut read_buf_0 = vec![0; 5];
@@ -300,21 +351,21 @@
 
     #[test]
     fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
-        let mut t = TBufferTransport::with_capacity(3, 0);
+        let mut t = TBufferChannel::with_capacity(3, 0);
 
         let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
 
         // check that we're able to set the bytes to be read
         let result = t.set_readable_bytes(&readable_bytes_0);
         assert_eq!(result, 3);
-        assert_eq!(&t.read_buffer(), &readable_bytes_0);
+        assert_eq!(t.read_bytes(), &readable_bytes_0);
 
         let mut read_buf = vec![0; 4];
 
         // drain the read buffer
         let read_result = t.read(&mut read_buf);
         assert_eq!(read_result.unwrap(), 3);
-        assert_eq!(t.read_buffer(), &read_buf[0..3]);
+        assert_eq!(t.read_bytes(), &read_buf[0..3]);
 
         // check that a subsequent read fails
         let read_result = t.read(&mut read_buf);
@@ -332,11 +383,11 @@
         // check that we're able to set the bytes to be read
         let result = t.set_readable_bytes(&readable_bytes_1);
         assert_eq!(result, 2);
-        assert_eq!(&t.read_buffer(), &readable_bytes_1);
+        assert_eq!(t.read_bytes(), &readable_bytes_1);
 
         // read again
         let read_result = t.read(&mut read_buf);
         assert_eq!(read_result.unwrap(), 2);
-        assert_eq!(t.read_buffer(), &read_buf[0..2]);
+        assert_eq!(t.read_bytes(), &read_buf[0..2]);
     }
 }
diff --git a/lib/rs/src/transport/mod.rs b/lib/rs/src/transport/mod.rs
index 1c39f50..9392786 100644
--- a/lib/rs/src/transport/mod.rs
+++ b/lib/rs/src/transport/mod.rs
@@ -15,37 +15,266 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Types required to send and receive bytes over an I/O channel.
+//! Types used to send and receive bytes over an I/O channel.
 //!
-//! The core type is the `TTransport` trait, through which a `TProtocol` can
-//! send and receive primitives over the wire. While `TProtocol` instances deal
-//! with primitive types, `TTransport` instances understand only bytes.
+//! The core types are the `TReadTransport`, `TWriteTransport` and the
+//! `TIoChannel` traits, through which `TInputProtocol` or
+//! `TOutputProtocol` can receive and send primitives over the wire. While
+//! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives
+//! the types in this module understand only bytes.
 
-use std::cell::RefCell;
 use std::io;
-use std::rc::Rc;
+use std::io::{Read, Write};
+use std::ops::{Deref, DerefMut};
+
+#[cfg(test)]
+macro_rules! assert_eq_transport_num_written_bytes {
+    ($transport:ident, $num_written_bytes:expr) => {
+        {
+            assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes);
+        }
+    };
+}
+
+
+#[cfg(test)]
+macro_rules! assert_eq_transport_written_bytes {
+    ($transport:ident, $expected_bytes:ident) => {
+        {
+            assert_eq!($transport.channel.write_bytes(), &$expected_bytes);
+        }
+    };
+}
 
 mod buffered;
 mod framed;
-mod passthru;
 mod socket;
+mod mem;
 
-pub mod mem;
+pub use self::buffered::{TBufferedReadTransport, TBufferedReadTransportFactory,
+                         TBufferedWriteTransport, TBufferedWriteTransportFactory};
+pub use self::framed::{TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport,
+                       TFramedWriteTransportFactory};
+pub use self::mem::TBufferChannel;
+pub use self::socket::TTcpChannel;
 
-pub use self::mem::TBufferTransport;
-pub use self::buffered::{TBufferedTransport, TBufferedTransportFactory};
-pub use self::framed::{TFramedTransport, TFramedTransportFactory};
-pub use self::passthru::TPassThruTransport;
-pub use self::socket::TTcpTransport;
+/// Identifies a transport used by a `TInputProtocol` to receive bytes.
+pub trait TReadTransport: Read {}
 
-/// Identifies an I/O channel that can be used to send and receive bytes.
-pub trait TTransport: io::Read + io::Write {}
-impl<I: io::Read + io::Write> TTransport for I {}
+/// Helper type used by a server to create `TReadTransport` instances for
+/// accepted client connections.
+pub trait TReadTransportFactory {
+    /// Create a `TTransport` that wraps a channel over which bytes are to be read.
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send>;
+}
 
-/// Helper type used by servers to create `TTransport` instances for accepted
-/// client connections.
-pub trait TTransportFactory {
-    /// Create a `TTransport` that wraps an `inner` transport, thus creating
-    /// a transport stack.
-    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport>;
+/// Identifies a transport used by `TOutputProtocol` to send bytes.
+pub trait TWriteTransport: Write {}
+
+/// Helper type used by a server to create `TWriteTransport` instances for
+/// accepted client connections.
+pub trait TWriteTransportFactory {
+    /// Create a `TTransport` that wraps a channel over which bytes are to be sent.
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send>;
+}
+
+impl<T> TReadTransport for T
+where
+    T: Read,
+{
+}
+
+impl<T> TWriteTransport for T
+where
+    T: Write,
+{
+}
+
+// FIXME: implement the Debug trait for boxed transports
+
+impl<T> TReadTransportFactory for Box<T>
+where
+    T: TReadTransportFactory + ?Sized,
+{
+    fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+        (**self).create(channel)
+    }
+}
+
+impl<T> TWriteTransportFactory for Box<T>
+where
+    T: TWriteTransportFactory + ?Sized,
+{
+    fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+        (**self).create(channel)
+    }
+}
+
+/// Identifies a splittable bidirectional I/O channel used to send and receive bytes.
+pub trait TIoChannel: Read + Write {
+    /// Split the channel into a readable half and a writable half, where the
+    /// readable half implements `io::Read` and the writable half implements
+    /// `io::Write`. Returns `None` if the channel was not initialized, or if it
+    /// cannot be split safely.
+    ///
+    /// Returned halves may share the underlying OS channel or buffer resources.
+    /// Implementations **should ensure** that these two halves can be safely
+    /// used independently by concurrent threads.
+    fn split(self) -> ::Result<(::transport::ReadHalf<Self>, ::transport::WriteHalf<Self>)>
+    where
+        Self: Sized;
+}
+
+/// The readable half of an object returned from `TIoChannel::split`.
+#[derive(Debug)]
+pub struct ReadHalf<C>
+where
+    C: Read,
+{
+    handle: C,
+}
+
+/// The writable half of an object returned from `TIoChannel::split`.
+#[derive(Debug)]
+pub struct WriteHalf<C>
+where
+    C: Write,
+{
+    handle: C,
+}
+
+impl<C> Read for ReadHalf<C>
+where
+    C: Read,
+{
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.handle.read(buf)
+    }
+}
+
+impl<C> Write for WriteHalf<C>
+where
+    C: Write,
+{
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.handle.write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.handle.flush()
+    }
+}
+
+impl<C> Deref for ReadHalf<C>
+where
+    C: Read,
+{
+    type Target = C;
+
+    fn deref(&self) -> &Self::Target {
+        &self.handle
+    }
+}
+
+impl<C> DerefMut for ReadHalf<C>
+where
+    C: Read,
+{
+    fn deref_mut(&mut self) -> &mut C {
+        &mut self.handle
+    }
+}
+
+impl<C> Deref for WriteHalf<C>
+where
+    C: Write,
+{
+    type Target = C;
+
+    fn deref(&self) -> &Self::Target {
+        &self.handle
+    }
+}
+
+impl<C> DerefMut for WriteHalf<C>
+where
+    C: Write,
+{
+    fn deref_mut(&mut self) -> &mut C {
+        &mut self.handle
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use std::io::Cursor;
+
+    use super::*;
+
+    #[test]
+    fn must_create_usable_read_channel_from_concrete_read_type() {
+        let r = Cursor::new([0, 1, 2]);
+        let _ = TBufferedReadTransport::new(r);
+    }
+
+    #[test]
+    fn must_create_usable_read_channel_from_boxed_read() {
+        let r: Box<Read> = Box::new(Cursor::new([0, 1, 2]));
+        let _ = TBufferedReadTransport::new(r);
+    }
+
+    #[test]
+    fn must_create_usable_write_channel_from_concrete_write_type() {
+        let w = vec![0u8; 10];
+        let _ = TBufferedWriteTransport::new(w);
+    }
+
+    #[test]
+    fn must_create_usable_write_channel_from_boxed_write() {
+        let w: Box<Write> = Box::new(vec![0u8; 10]);
+        let _ = TBufferedWriteTransport::new(w);
+    }
+
+    #[test]
+    fn must_create_usable_read_transport_from_concrete_read_transport() {
+        let r = Cursor::new([0, 1, 2]);
+        let mut t = TBufferedReadTransport::new(r);
+        takes_read_transport(&mut t)
+    }
+
+    #[test]
+    fn must_create_usable_read_transport_from_boxed_read() {
+        let r = Cursor::new([0, 1, 2]);
+        let mut t: Box<TReadTransport> = Box::new(TBufferedReadTransport::new(r));
+        takes_read_transport(&mut t)
+    }
+
+    #[test]
+    fn must_create_usable_write_transport_from_concrete_write_transport() {
+        let w = vec![0u8; 10];
+        let mut t = TBufferedWriteTransport::new(w);
+        takes_write_transport(&mut t)
+    }
+
+    #[test]
+    fn must_create_usable_write_transport_from_boxed_write() {
+        let w = vec![0u8; 10];
+        let mut t: Box<TWriteTransport> = Box::new(TBufferedWriteTransport::new(w));
+        takes_write_transport(&mut t)
+    }
+
+    fn takes_read_transport<R>(t: &mut R)
+    where
+        R: TReadTransport,
+    {
+        t.bytes();
+    }
+
+    fn takes_write_transport<W>(t: &mut W)
+    where
+        W: TWriteTransport,
+    {
+        t.flush().unwrap();
+    }
 }
diff --git a/lib/rs/src/transport/passthru.rs b/lib/rs/src/transport/passthru.rs
deleted file mode 100644
index 60dc3a6..0000000
--- a/lib/rs/src/transport/passthru.rs
+++ /dev/null
@@ -1,73 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::cell::RefCell;
-use std::rc::Rc;
-use std::io;
-use std::io::{Read, Write};
-
-use super::TTransport;
-
-/// Proxy that wraps an inner `TTransport` and delegates all calls to it.
-///
-/// Unlike other `TTransport` wrappers, `TPassThruTransport` is generic with
-/// regards to the wrapped transport. This allows callers to use methods
-/// specific to the type being wrapped instead of being constrained to methods
-/// on the `TTransport` trait.
-///
-/// # Examples
-///
-/// Create and use a `TPassThruTransport`.
-///
-/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use thrift::transport::{TPassThruTransport, TTcpTransport};
-///
-/// let t = TTcpTransport::new();
-/// let t = TPassThruTransport::new(Rc::new(RefCell::new(Box::new(t))));
-///
-/// // since the type parameter is maintained, we are able
-/// // to use functions specific to `TTcpTransport`
-/// t.inner.borrow_mut().open("localhost:9090").unwrap();
-/// ```
-pub struct TPassThruTransport<I: TTransport> {
-    pub inner: Rc<RefCell<Box<I>>>,
-}
-
-impl<I: TTransport> TPassThruTransport<I> {
-    /// Create a `TPassThruTransport` that wraps an `inner` TTransport.
-    pub fn new(inner: Rc<RefCell<Box<I>>>) -> TPassThruTransport<I> {
-        TPassThruTransport { inner: inner }
-    }
-}
-
-impl<I: TTransport> Read for TPassThruTransport<I> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        self.inner.borrow_mut().read(buf)
-    }
-}
-
-impl<I: TTransport> Write for TPassThruTransport<I> {
-    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        self.inner.borrow_mut().write(buf)
-    }
-
-    fn flush(&mut self) -> io::Result<()> {
-        self.inner.borrow_mut().flush()
-    }
-}
diff --git a/lib/rs/src/transport/socket.rs b/lib/rs/src/transport/socket.rs
index 9f2b8ba..16b59ef 100644
--- a/lib/rs/src/transport/socket.rs
+++ b/lib/rs/src/transport/socket.rs
@@ -21,69 +21,74 @@
 use std::net::{Shutdown, TcpStream};
 use std::ops::Drop;
 
-use ::{TransportError, TransportErrorKind};
+use {TransportErrorKind, new_transport_error};
+use super::{ReadHalf, TIoChannel, WriteHalf};
 
-/// Communicate with a Thrift service over a TCP socket.
+/// Bidirectional TCP/IP channel.
 ///
 /// # Examples
 ///
-/// Create a `TTcpTransport`.
+/// Create a `TTcpChannel`.
 ///
 /// ```no_run
 /// use std::io::{Read, Write};
-/// use thrift::transport::TTcpTransport;
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
 ///
 /// let mut buf = vec![0u8; 4];
-/// t.read(&mut buf).unwrap();
-/// t.write(&vec![0, 1, 2]).unwrap();
+/// c.read(&mut buf).unwrap();
+/// c.write(&vec![0, 1, 2]).unwrap();
 /// ```
 ///
-/// Create a `TTcpTransport` by wrapping an existing `TcpStream`.
+/// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
 ///
 /// ```no_run
 /// use std::io::{Read, Write};
 /// use std::net::TcpStream;
-/// use thrift::transport::TTcpTransport;
+/// use thrift::transport::TTcpChannel;
 ///
 /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
-/// let mut t = TTcpTransport::with_stream(stream);
 ///
-/// // no need to call t.open() since we've already connected above
+/// // no need to call c.open() since we've already connected above
+/// let mut c = TTcpChannel::with_stream(stream);
 ///
 /// let mut buf = vec![0u8; 4];
-/// t.read(&mut buf).unwrap();
-/// t.write(&vec![0, 1, 2]).unwrap();
+/// c.read(&mut buf).unwrap();
+/// c.write(&vec![0, 1, 2]).unwrap();
 /// ```
-#[derive(Default)]
-pub struct TTcpTransport {
+#[derive(Debug, Default)]
+pub struct TTcpChannel {
     stream: Option<TcpStream>,
 }
 
-impl TTcpTransport {
-    /// Create an uninitialized `TTcpTransport`.
+impl TTcpChannel {
+    /// Create an uninitialized `TTcpChannel`.
     ///
-    /// The returned instance must be opened using `TTcpTransport::open(...)`
+    /// The returned instance must be opened using `TTcpChannel::open(...)`
     /// before it can be used.
-    pub fn new() -> TTcpTransport {
-        TTcpTransport { stream: None }
+    pub fn new() -> TTcpChannel {
+        TTcpChannel { stream: None }
     }
 
-    /// Create a `TTcpTransport` that wraps an existing `TcpStream`.
+    /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
     ///
     /// The passed-in stream is assumed to have been opened before being wrapped
-    /// by the created `TTcpTransport` instance.
-    pub fn with_stream(stream: TcpStream) -> TTcpTransport {
-        TTcpTransport { stream: Some(stream) }
+    /// by the created `TTcpChannel` instance.
+    pub fn with_stream(stream: TcpStream) -> TTcpChannel {
+        TTcpChannel { stream: Some(stream) }
     }
 
     /// Connect to `remote_address`, which should have the form `host:port`.
     pub fn open(&mut self, remote_address: &str) -> ::Result<()> {
         if self.stream.is_some() {
-            Err(::Error::Transport(TransportError::new(TransportErrorKind::AlreadyOpen,
-                                                       "transport previously opened")))
+            Err(
+                new_transport_error(
+                    TransportErrorKind::AlreadyOpen,
+                    "tcp connection previously opened",
+                ),
+            )
         } else {
             match TcpStream::connect(&remote_address) {
                 Ok(s) => {
@@ -95,33 +100,62 @@
         }
     }
 
-    /// Shutdown this transport.
+    /// Shut down this channel.
     ///
     /// Both send and receive halves are closed, and this instance can no
     /// longer be used to communicate with another endpoint.
     pub fn close(&mut self) -> ::Result<()> {
-        self.if_set(|s| s.shutdown(Shutdown::Both)).map_err(From::from)
+        self.if_set(|s| s.shutdown(Shutdown::Both))
+            .map_err(From::from)
     }
 
     fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
-        where F: FnMut(&mut TcpStream) -> io::Result<T>
+    where
+        F: FnMut(&mut TcpStream) -> io::Result<T>,
     {
 
         if let Some(ref mut s) = self.stream {
             stream_operation(s)
         } else {
-            Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"))
+            Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"),)
         }
     }
 }
 
-impl Read for TTcpTransport {
+impl TIoChannel for TTcpChannel {
+    fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
+    where
+        Self: Sized,
+    {
+        let mut s = self;
+
+        s.stream
+            .as_mut()
+            .and_then(|s| s.try_clone().ok())
+            .map(
+                |cloned| {
+                    (ReadHalf { handle: TTcpChannel { stream: s.stream.take() } },
+                     WriteHalf { handle: TTcpChannel { stream: Some(cloned) } })
+                },
+            )
+            .ok_or_else(
+                || {
+                    new_transport_error(
+                        TransportErrorKind::Unknown,
+                        "cannot clone underlying tcp stream",
+                    )
+                },
+            )
+    }
+}
+
+impl Read for TTcpChannel {
     fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
         self.if_set(|s| s.read(b))
     }
 }
 
-impl Write for TTcpTransport {
+impl Write for TTcpChannel {
     fn write(&mut self, b: &[u8]) -> io::Result<usize> {
         self.if_set(|s| s.write(b))
     }
@@ -131,11 +165,11 @@
     }
 }
 
-// Do I have to implement the Drop trait? TcpStream closes the socket on drop.
-impl Drop for TTcpTransport {
+// FIXME: Do I have to implement the Drop trait? TcpStream closes the socket on drop.
+impl Drop for TTcpChannel {
     fn drop(&mut self) {
         if let Err(e) = self.close() {
-            warn!("error while closing socket transport: {:?}", e)
+            warn!("error while closing socket: {:?}", e)
         }
     }
 }
diff --git a/lib/rs/test/src/bin/kitchen_sink_client.rs b/lib/rs/test/src/bin/kitchen_sink_client.rs
index 27171be..9738298 100644
--- a/lib/rs/test/src/bin/kitchen_sink_client.rs
+++ b/lib/rs/test/src/bin/kitchen_sink_client.rs
@@ -21,13 +21,11 @@
 extern crate kitchen_sink;
 extern crate thrift;
 
-use std::cell::RefCell;
-use std::rc::Rc;
-
 use kitchen_sink::base_two::{TNapkinServiceSyncClient, TRamenServiceSyncClient};
 use kitchen_sink::midlayer::{MealServiceSyncClient, TMealServiceSyncClient};
 use kitchen_sink::ultimate::{FullMealServiceSyncClient, TFullMealServiceSyncClient};
-use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+use thrift::transport::{ReadHalf, TFramedReadTransport, TFramedWriteTransport, TIoChannel,
+                        TTcpChannel, WriteHalf};
 use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol,
                        TCompactOutputProtocol, TInputProtocol, TOutputProtocol};
 
@@ -50,24 +48,25 @@
         (@arg port: --port +takes_value "Port on which the Thrift test server is listening")
         (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
         (@arg service: --service +takes_value "Service type to contact (\"part\", \"full\")")
-    ).get_matches();
+    )
+            .get_matches();
 
     let host = matches.value_of("host").unwrap_or("127.0.0.1");
     let port = value_t!(matches, "port", u16).unwrap_or(9090);
     let protocol = matches.value_of("protocol").unwrap_or("compact");
     let service = matches.value_of("service").unwrap_or("part");
 
-    let t = open_tcp_transport(host, port)?;
-    let t = Rc::new(RefCell::new(Box::new(TFramedTransport::new(t)) as Box<TTransport>));
+    let (i_chan, o_chan) = tcp_channel(host, port)?;
+    let (i_tran, o_tran) = (TFramedReadTransport::new(i_chan), TFramedWriteTransport::new(o_chan));
 
     let (i_prot, o_prot): (Box<TInputProtocol>, Box<TOutputProtocol>) = match protocol {
         "binary" => {
-            (Box::new(TBinaryInputProtocol::new(t.clone(), true)),
-             Box::new(TBinaryOutputProtocol::new(t.clone(), true)))
+            (Box::new(TBinaryInputProtocol::new(i_tran, true)),
+             Box::new(TBinaryOutputProtocol::new(o_tran, true)))
         }
         "compact" => {
-            (Box::new(TCompactInputProtocol::new(t.clone())),
-             Box::new(TCompactOutputProtocol::new(t.clone())))
+            (Box::new(TCompactInputProtocol::new(i_tran)),
+             Box::new(TCompactOutputProtocol::new(o_tran)))
         }
         unmatched => return Err(format!("unsupported protocol {}", unmatched).into()),
     };
@@ -75,28 +74,31 @@
     run_client(service, i_prot, o_prot)
 }
 
-fn run_client(service: &str,
-              i_prot: Box<TInputProtocol>,
-              o_prot: Box<TOutputProtocol>)
-              -> thrift::Result<()> {
+fn run_client(
+    service: &str,
+    i_prot: Box<TInputProtocol>,
+    o_prot: Box<TOutputProtocol>,
+) -> thrift::Result<()> {
     match service {
         "full" => run_full_meal_service(i_prot, o_prot),
         "part" => run_meal_service(i_prot, o_prot),
-        _ => Err(thrift::Error::from(format!("unknown service type {}", service))),
+        _ => Err(thrift::Error::from(format!("unknown service type {}", service)),),
     }
 }
 
-fn open_tcp_transport(host: &str, port: u16) -> thrift::Result<Rc<RefCell<Box<TTransport>>>> {
-    let mut t = TTcpTransport::new();
-    match t.open(&format!("{}:{}", host, port)) {
-        Ok(()) => Ok(Rc::new(RefCell::new(Box::new(t) as Box<TTransport>))),
-        Err(e) => Err(e),
-    }
+fn tcp_channel(
+    host: &str,
+    port: u16,
+) -> thrift::Result<(ReadHalf<TTcpChannel>, WriteHalf<TTcpChannel>)> {
+    let mut c = TTcpChannel::new();
+    c.open(&format!("{}:{}", host, port))?;
+    c.split()
 }
 
-fn run_meal_service(i_prot: Box<TInputProtocol>,
-                    o_prot: Box<TOutputProtocol>)
-                    -> thrift::Result<()> {
+fn run_meal_service(
+    i_prot: Box<TInputProtocol>,
+    o_prot: Box<TOutputProtocol>,
+) -> thrift::Result<()> {
     let mut client = MealServiceSyncClient::new(i_prot, o_prot);
 
     // client.full_meal(); // <-- IMPORTANT: if you uncomment this, compilation *should* fail
@@ -110,9 +112,10 @@
     Ok(())
 }
 
-fn run_full_meal_service(i_prot: Box<TInputProtocol>,
-                         o_prot: Box<TOutputProtocol>)
-                         -> thrift::Result<()> {
+fn run_full_meal_service(
+    i_prot: Box<TInputProtocol>,
+    o_prot: Box<TOutputProtocol>,
+) -> thrift::Result<()> {
     let mut client = FullMealServiceSyncClient::new(i_prot, o_prot);
 
     execute_call("full", "ramen", || client.ramen(100))?;
@@ -124,17 +127,20 @@
 }
 
 fn execute_call<F, R>(service_type: &str, call_name: &str, mut f: F) -> thrift::Result<()>
-    where F: FnMut() -> thrift::Result<R>
+where
+    F: FnMut() -> thrift::Result<R>,
 {
     let res = f();
 
     match res {
         Ok(_) => println!("{}: completed {} call", service_type, call_name),
         Err(ref e) => {
-            println!("{}: failed {} call with error {:?}",
-                     service_type,
-                     call_name,
-                     e)
+            println!(
+                "{}: failed {} call with error {:?}",
+                service_type,
+                call_name,
+                e
+            )
         }
     }
 
diff --git a/lib/rs/test/src/bin/kitchen_sink_server.rs b/lib/rs/test/src/bin/kitchen_sink_server.rs
index 4ce4fa3..19112cd 100644
--- a/lib/rs/test/src/bin/kitchen_sink_server.rs
+++ b/lib/rs/test/src/bin/kitchen_sink_server.rs
@@ -22,7 +22,7 @@
 extern crate thrift;
 
 use kitchen_sink::base_one::Noodle;
-use kitchen_sink::base_two::{Napkin, Ramen, NapkinServiceSyncHandler, RamenServiceSyncHandler};
+use kitchen_sink::base_two::{Napkin, NapkinServiceSyncHandler, Ramen, RamenServiceSyncHandler};
 use kitchen_sink::midlayer::{Dessert, Meal, MealServiceSyncHandler, MealServiceSyncProcessor};
 use kitchen_sink::ultimate::{Drink, FullMeal, FullMealAndDrinks,
                              FullMealAndDrinksServiceSyncProcessor, FullMealServiceSyncHandler};
@@ -30,8 +30,9 @@
 use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory,
                        TCompactInputProtocolFactory, TCompactOutputProtocolFactory,
                        TInputProtocolFactory, TOutputProtocolFactory};
-use thrift::transport::{TFramedTransportFactory, TTransportFactory};
-use thrift::server::TSimpleServer;
+use thrift::transport::{TFramedReadTransportFactory, TFramedWriteTransportFactory,
+                        TReadTransportFactory, TWriteTransportFactory};
+use thrift::server::TServer;
 
 fn main() {
     match run() {
@@ -52,7 +53,8 @@
         (@arg port: --port +takes_value "port on which the test server listens")
         (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
         (@arg service: --service +takes_value "Service type to contact (\"part\", \"full\")")
-    ).get_matches();
+    )
+            .get_matches();
 
     let port = value_t!(matches, "port", u16).unwrap_or(9090);
     let protocol = matches.value_of("protocol").unwrap_or("compact");
@@ -61,9 +63,8 @@
 
     println!("binding to {}", listen_address);
 
-    let (i_transport_factory, o_transport_factory): (Box<TTransportFactory>,
-                                                     Box<TTransportFactory>) =
-        (Box::new(TFramedTransportFactory {}), Box::new(TFramedTransportFactory {}));
+    let r_transport_factory = TFramedReadTransportFactory::new();
+    let w_transport_factory = TFramedWriteTransportFactory::new();
 
     let (i_protocol_factory, o_protocol_factory): (Box<TInputProtocolFactory>,
                                                    Box<TOutputProtocolFactory>) =
@@ -93,51 +94,75 @@
     // Since what I'm doing is uncommon I'm just going to duplicate the code
     match &*service {
         "part" => {
-            run_meal_server(&listen_address,
-                            i_transport_factory,
-                            i_protocol_factory,
-                            o_transport_factory,
-                            o_protocol_factory)
+            run_meal_server(
+                &listen_address,
+                r_transport_factory,
+                i_protocol_factory,
+                w_transport_factory,
+                o_protocol_factory,
+            )
         }
         "full" => {
-            run_full_meal_server(&listen_address,
-                                 i_transport_factory,
-                                 i_protocol_factory,
-                                 o_transport_factory,
-                                 o_protocol_factory)
+            run_full_meal_server(
+                &listen_address,
+                r_transport_factory,
+                i_protocol_factory,
+                w_transport_factory,
+                o_protocol_factory,
+            )
         }
         unknown => Err(format!("unsupported service type {}", unknown).into()),
     }
 }
 
-fn run_meal_server(listen_address: &str,
-                   i_transport_factory: Box<TTransportFactory>,
-                   i_protocol_factory: Box<TInputProtocolFactory>,
-                   o_transport_factory: Box<TTransportFactory>,
-                   o_protocol_factory: Box<TOutputProtocolFactory>)
-                   -> thrift::Result<()> {
+fn run_meal_server<RTF, IPF, WTF, OPF>(
+    listen_address: &str,
+    r_transport_factory: RTF,
+    i_protocol_factory: IPF,
+    w_transport_factory: WTF,
+    o_protocol_factory: OPF,
+) -> thrift::Result<()>
+where
+    RTF: TReadTransportFactory + 'static,
+    IPF: TInputProtocolFactory + 'static,
+    WTF: TWriteTransportFactory + 'static,
+    OPF: TOutputProtocolFactory + 'static,
+{
     let processor = MealServiceSyncProcessor::new(PartHandler {});
-    let mut server = TSimpleServer::new(i_transport_factory,
-                                        i_protocol_factory,
-                                        o_transport_factory,
-                                        o_protocol_factory,
-                                        processor);
+    let mut server = TServer::new(
+        r_transport_factory,
+        i_protocol_factory,
+        w_transport_factory,
+        o_protocol_factory,
+        processor,
+        1,
+    );
 
     server.listen(listen_address)
 }
 
-fn run_full_meal_server(listen_address: &str,
-                        i_transport_factory: Box<TTransportFactory>,
-                        i_protocol_factory: Box<TInputProtocolFactory>,
-                        o_transport_factory: Box<TTransportFactory>,
-                        o_protocol_factory: Box<TOutputProtocolFactory>)
-                        -> thrift::Result<()> {
+fn run_full_meal_server<RTF, IPF, WTF, OPF>(
+    listen_address: &str,
+    r_transport_factory: RTF,
+    i_protocol_factory: IPF,
+    w_transport_factory: WTF,
+    o_protocol_factory: OPF,
+) -> thrift::Result<()>
+where
+    RTF: TReadTransportFactory + 'static,
+    IPF: TInputProtocolFactory + 'static,
+    WTF: TWriteTransportFactory + 'static,
+    OPF: TOutputProtocolFactory + 'static,
+{
     let processor = FullMealAndDrinksServiceSyncProcessor::new(FullHandler {});
-    let mut server = TSimpleServer::new(i_transport_factory,
-                                        i_protocol_factory,
-                                        o_transport_factory,
-                                        o_protocol_factory,
-                                        processor);
+    let mut server = TServer::new(
+        r_transport_factory,
+        i_protocol_factory,
+        w_transport_factory,
+        o_protocol_factory,
+        processor,
+        1,
+    );
 
     server.listen(listen_address)
 }
@@ -145,21 +170,21 @@
 struct PartHandler;
 
 impl MealServiceSyncHandler for PartHandler {
-    fn handle_meal(&mut self) -> thrift::Result<Meal> {
+    fn handle_meal(&self) -> thrift::Result<Meal> {
         println!("part: handling meal call");
         Ok(meal())
     }
 }
 
 impl RamenServiceSyncHandler for PartHandler {
-    fn handle_ramen(&mut self, _: i32) -> thrift::Result<Ramen> {
+    fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> {
         println!("part: handling ramen call");
         Ok(ramen())
     }
 }
 
 impl NapkinServiceSyncHandler for PartHandler {
-    fn handle_napkin(&mut self) -> thrift::Result<Napkin> {
+    fn handle_napkin(&self) -> thrift::Result<Napkin> {
         println!("part: handling napkin call");
         Ok(napkin())
     }
@@ -171,34 +196,34 @@
 struct FullHandler;
 
 impl FullMealAndDrinksServiceSyncHandler for FullHandler {
-    fn handle_full_meal_and_drinks(&mut self) -> thrift::Result<FullMealAndDrinks> {
+    fn handle_full_meal_and_drinks(&self) -> thrift::Result<FullMealAndDrinks> {
         Ok(FullMealAndDrinks::new(full_meal(), Drink::WHISKEY))
     }
 }
 
 impl FullMealServiceSyncHandler for FullHandler {
-    fn handle_full_meal(&mut self) -> thrift::Result<FullMeal> {
+    fn handle_full_meal(&self) -> thrift::Result<FullMeal> {
         println!("full: handling full meal call");
         Ok(full_meal())
     }
 }
 
 impl MealServiceSyncHandler for FullHandler {
-    fn handle_meal(&mut self) -> thrift::Result<Meal> {
+    fn handle_meal(&self) -> thrift::Result<Meal> {
         println!("full: handling meal call");
         Ok(meal())
     }
 }
 
 impl RamenServiceSyncHandler for FullHandler {
-    fn handle_ramen(&mut self, _: i32) -> thrift::Result<Ramen> {
+    fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> {
         println!("full: handling ramen call");
         Ok(ramen())
     }
 }
 
 impl NapkinServiceSyncHandler for FullHandler {
-    fn handle_napkin(&mut self) -> thrift::Result<Napkin> {
+    fn handle_napkin(&self) -> thrift::Result<Napkin> {
         println!("full: handling napkin call");
         Ok(napkin())
     }
diff --git a/lib/rs/test/src/lib.rs b/lib/rs/test/src/lib.rs
index 8a7ccd0..53f4873 100644
--- a/lib/rs/test/src/lib.rs
+++ b/lib/rs/test/src/lib.rs
@@ -48,6 +48,9 @@
 
     #[test]
     fn must_be_able_to_use_defaults() {
-        let _ = midlayer::Meal { noodle: Some(base_one::Noodle::default()), ..Default::default() };
+        let _ = midlayer::Meal {
+            noodle: Some(base_one::Noodle::default()),
+            ..Default::default()
+        };
     }
 }