THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/lib/rs/src/transport/socket.rs b/lib/rs/src/transport/socket.rs
index 9f2b8ba..16b59ef 100644
--- a/lib/rs/src/transport/socket.rs
+++ b/lib/rs/src/transport/socket.rs
@@ -21,69 +21,74 @@
 use std::net::{Shutdown, TcpStream};
 use std::ops::Drop;
 
-use ::{TransportError, TransportErrorKind};
+use {TransportErrorKind, new_transport_error};
+use super::{ReadHalf, TIoChannel, WriteHalf};
 
-/// Communicate with a Thrift service over a TCP socket.
+/// Bidirectional TCP/IP channel.
 ///
 /// # Examples
 ///
-/// Create a `TTcpTransport`.
+/// Create a `TTcpChannel`.
 ///
 /// ```no_run
 /// use std::io::{Read, Write};
-/// use thrift::transport::TTcpTransport;
+/// use thrift::transport::TTcpChannel;
 ///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
 ///
 /// let mut buf = vec![0u8; 4];
-/// t.read(&mut buf).unwrap();
-/// t.write(&vec![0, 1, 2]).unwrap();
+/// c.read(&mut buf).unwrap();
+/// c.write(&vec![0, 1, 2]).unwrap();
 /// ```
 ///
-/// Create a `TTcpTransport` by wrapping an existing `TcpStream`.
+/// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
 ///
 /// ```no_run
 /// use std::io::{Read, Write};
 /// use std::net::TcpStream;
-/// use thrift::transport::TTcpTransport;
+/// use thrift::transport::TTcpChannel;
 ///
 /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
-/// let mut t = TTcpTransport::with_stream(stream);
 ///
-/// // no need to call t.open() since we've already connected above
+/// // no need to call c.open() since we've already connected above
+/// let mut c = TTcpChannel::with_stream(stream);
 ///
 /// let mut buf = vec![0u8; 4];
-/// t.read(&mut buf).unwrap();
-/// t.write(&vec![0, 1, 2]).unwrap();
+/// c.read(&mut buf).unwrap();
+/// c.write(&vec![0, 1, 2]).unwrap();
 /// ```
-#[derive(Default)]
-pub struct TTcpTransport {
+#[derive(Debug, Default)]
+pub struct TTcpChannel {
     stream: Option<TcpStream>,
 }
 
-impl TTcpTransport {
-    /// Create an uninitialized `TTcpTransport`.
+impl TTcpChannel {
+    /// Create an uninitialized `TTcpChannel`.
     ///
-    /// The returned instance must be opened using `TTcpTransport::open(...)`
+    /// The returned instance must be opened using `TTcpChannel::open(...)`
     /// before it can be used.
-    pub fn new() -> TTcpTransport {
-        TTcpTransport { stream: None }
+    pub fn new() -> TTcpChannel {
+        TTcpChannel { stream: None }
     }
 
-    /// Create a `TTcpTransport` that wraps an existing `TcpStream`.
+    /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
     ///
     /// The passed-in stream is assumed to have been opened before being wrapped
-    /// by the created `TTcpTransport` instance.
-    pub fn with_stream(stream: TcpStream) -> TTcpTransport {
-        TTcpTransport { stream: Some(stream) }
+    /// by the created `TTcpChannel` instance.
+    pub fn with_stream(stream: TcpStream) -> TTcpChannel {
+        TTcpChannel { stream: Some(stream) }
     }
 
     /// Connect to `remote_address`, which should have the form `host:port`.
     pub fn open(&mut self, remote_address: &str) -> ::Result<()> {
         if self.stream.is_some() {
-            Err(::Error::Transport(TransportError::new(TransportErrorKind::AlreadyOpen,
-                                                       "transport previously opened")))
+            Err(
+                new_transport_error(
+                    TransportErrorKind::AlreadyOpen,
+                    "tcp connection previously opened",
+                ),
+            )
         } else {
             match TcpStream::connect(&remote_address) {
                 Ok(s) => {
@@ -95,33 +100,62 @@
         }
     }
 
-    /// Shutdown this transport.
+    /// Shut down this channel.
     ///
     /// Both send and receive halves are closed, and this instance can no
     /// longer be used to communicate with another endpoint.
     pub fn close(&mut self) -> ::Result<()> {
-        self.if_set(|s| s.shutdown(Shutdown::Both)).map_err(From::from)
+        self.if_set(|s| s.shutdown(Shutdown::Both))
+            .map_err(From::from)
     }
 
     fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
-        where F: FnMut(&mut TcpStream) -> io::Result<T>
+    where
+        F: FnMut(&mut TcpStream) -> io::Result<T>,
     {
 
         if let Some(ref mut s) = self.stream {
             stream_operation(s)
         } else {
-            Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"))
+            Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"),)
         }
     }
 }
 
-impl Read for TTcpTransport {
+impl TIoChannel for TTcpChannel {
+    fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
+    where
+        Self: Sized,
+    {
+        let mut s = self;
+
+        s.stream
+            .as_mut()
+            .and_then(|s| s.try_clone().ok())
+            .map(
+                |cloned| {
+                    (ReadHalf { handle: TTcpChannel { stream: s.stream.take() } },
+                     WriteHalf { handle: TTcpChannel { stream: Some(cloned) } })
+                },
+            )
+            .ok_or_else(
+                || {
+                    new_transport_error(
+                        TransportErrorKind::Unknown,
+                        "cannot clone underlying tcp stream",
+                    )
+                },
+            )
+    }
+}
+
+impl Read for TTcpChannel {
     fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
         self.if_set(|s| s.read(b))
     }
 }
 
-impl Write for TTcpTransport {
+impl Write for TTcpChannel {
     fn write(&mut self, b: &[u8]) -> io::Result<usize> {
         self.if_set(|s| s.write(b))
     }
@@ -131,11 +165,11 @@
     }
 }
 
-// Do I have to implement the Drop trait? TcpStream closes the socket on drop.
-impl Drop for TTcpTransport {
+// FIXME: Do I have to implement the Drop trait? TcpStream closes the socket on drop.
+impl Drop for TTcpChannel {
     fn drop(&mut self) {
         if let Err(e) = self.close() {
-            warn!("error while closing socket transport: {:?}", e)
+            warn!("error while closing socket: {:?}", e)
         }
     }
 }