THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/test/rs/src/bin/test_client.rs b/test/rs/src/bin/test_client.rs
index a2ea832..aad78a0 100644
--- a/test/rs/src/bin/test_client.rs
+++ b/test/rs/src/bin/test_client.rs
@@ -22,14 +22,14 @@
 extern crate thrift_test; // huh. I have to do this to use my lib
 
 use ordered_float::OrderedFloat;
-use std::cell::RefCell;
 use std::collections::{BTreeMap, BTreeSet};
 use std::fmt::Debug;
-use std::rc::Rc;
 
 use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol,
                        TCompactOutputProtocol, TInputProtocol, TOutputProtocol};
-use thrift::transport::{TBufferedTransport, TFramedTransport, TTcpTransport, TTransport};
+use thrift::transport::{ReadHalf, TBufferedReadTransport, TBufferedWriteTransport,
+                        TFramedReadTransport, TFramedWriteTransport, TIoChannel, TReadTransport,
+                        TTcpChannel, TWriteTransport, WriteHalf};
 use thrift_test::*;
 
 fn main() {
@@ -58,7 +58,8 @@
         (@arg transport: --transport +takes_value "Thrift transport implementation to use (\"buffered\", \"framed\")")
         (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
         (@arg testloops: -n --testloops +takes_value "Number of times to run tests")
-    ).get_matches();
+    )
+            .get_matches();
 
     let host = matches.value_of("host").unwrap_or("127.0.0.1");
     let port = value_t!(matches, "port", u16).unwrap_or(9090);
@@ -66,32 +67,39 @@
     let transport = matches.value_of("transport").unwrap_or("buffered");
     let protocol = matches.value_of("protocol").unwrap_or("binary");
 
-    let t = open_tcp_transport(host, port)?;
+    let (i_chan, o_chan) = tcp_channel(host, port)?;
 
-    let t: Box<TTransport> = match transport {
-        "buffered" => Box::new(TBufferedTransport::new(t)),
-        "framed" => Box::new(TFramedTransport::new(t)),
+    let (i_tran, o_tran) = match transport {
+        "buffered" => {
+            (Box::new(TBufferedReadTransport::new(i_chan)) as Box<TReadTransport>,
+             Box::new(TBufferedWriteTransport::new(o_chan)) as Box<TWriteTransport>)
+        }
+        "framed" => {
+            (Box::new(TFramedReadTransport::new(i_chan)) as Box<TReadTransport>,
+             Box::new(TFramedWriteTransport::new(o_chan)) as Box<TWriteTransport>)
+        }
         unmatched => return Err(format!("unsupported transport {}", unmatched).into()),
     };
-    let t = Rc::new(RefCell::new(t));
 
     let (i_prot, o_prot): (Box<TInputProtocol>, Box<TOutputProtocol>) = match protocol {
         "binary" => {
-            (Box::new(TBinaryInputProtocol::new(t.clone(), true)),
-             Box::new(TBinaryOutputProtocol::new(t.clone(), true)))
+            (Box::new(TBinaryInputProtocol::new(i_tran, true)),
+             Box::new(TBinaryOutputProtocol::new(o_tran, true)))
         }
         "compact" => {
-            (Box::new(TCompactInputProtocol::new(t.clone())),
-             Box::new(TCompactOutputProtocol::new(t.clone())))
+            (Box::new(TCompactInputProtocol::new(i_tran)),
+             Box::new(TCompactOutputProtocol::new(o_tran)))
         }
         unmatched => return Err(format!("unsupported protocol {}", unmatched).into()),
     };
 
-    println!("connecting to {}:{} with {}+{} stack",
-             host,
-             port,
-             protocol,
-             transport);
+    println!(
+        "connecting to {}:{} with {}+{} stack",
+        host,
+        port,
+        protocol,
+        transport
+    );
 
     let mut client = ThriftTestSyncClient::new(i_prot, o_prot);
 
@@ -102,16 +110,19 @@
     Ok(())
 }
 
-// FIXME: expose "open" through the client interface so I don't have to early open the transport
-fn open_tcp_transport(host: &str, port: u16) -> thrift::Result<Rc<RefCell<Box<TTransport>>>> {
-    let mut t = TTcpTransport::new();
-    match t.open(&format!("{}:{}", host, port)) {
-        Ok(()) => Ok(Rc::new(RefCell::new(Box::new(t) as Box<TTransport>))),
-        Err(e) => Err(e),
-    }
+// FIXME: expose "open" through the client interface so I don't have to early
+// open
+fn tcp_channel(
+    host: &str,
+    port: u16,
+) -> thrift::Result<(ReadHalf<TTcpChannel>, WriteHalf<TTcpChannel>)> {
+    let mut c = TTcpChannel::new();
+    c.open(&format!("{}:{}", host, port))?;
+    c.split()
 }
 
-fn make_thrift_calls(client: &mut ThriftTestSyncClient) -> Result<(), thrift::Error> {
+fn make_thrift_calls(client: &mut ThriftTestSyncClient<Box<TInputProtocol>, Box<TOutputProtocol>>,)
+    -> Result<(), thrift::Error> {
     println!("testVoid");
     client.test_void()?;
 
@@ -131,12 +142,15 @@
     verify_expected_result(client.test_i32(1159348374), 1159348374)?;
 
     println!("testi64");
-    // try!(verify_expected_result(client.test_i64(-8651829879438294565), -8651829879438294565));
+    // try!(verify_expected_result(client.test_i64(-8651829879438294565),
+    // -8651829879438294565));
     verify_expected_result(client.test_i64(i64::min_value()), i64::min_value())?;
 
     println!("testDouble");
-    verify_expected_result(client.test_double(OrderedFloat::from(42.42)),
-                           OrderedFloat::from(42.42))?;
+    verify_expected_result(
+        client.test_double(OrderedFloat::from(42.42)),
+        OrderedFloat::from(42.42),
+    )?;
 
     println!("testTypedef");
     {
@@ -175,10 +189,14 @@
     }
 
     // Xtruct again, with optional values
-    // FIXME: apparently the erlang thrift server does not like opt-in-req-out parameters that are undefined. Joy.
+    // FIXME: apparently the erlang thrift server does not like opt-in-req-out
+    // parameters that are undefined. Joy.
     // {
-    // let x_snd = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: None, i32_thing: None, i64_thing: Some(12938492818) };
-    // let x_cmp = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: Some(0), i32_thing: Some(0), i64_thing: Some(12938492818) }; // the C++ server is responding correctly
+    // let x_snd = Xtruct { string_thing: Some("foo".to_owned()), byte_thing: None,
+    // i32_thing: None, i64_thing: Some(12938492818) };
+    // let x_cmp = Xtruct { string_thing: Some("foo".to_owned()), byte_thing:
+    // Some(0), i32_thing: Some(0), i64_thing: Some(12938492818) }; // the C++
+    // server is responding correctly
     // try!(verify_expected_result(client.test_struct(x_snd), x_cmp));
     // }
     //
@@ -188,22 +206,26 @@
     {
         let x_snd = Xtruct2 {
             byte_thing: Some(32),
-            struct_thing: Some(Xtruct {
-                string_thing: Some("foo".to_owned()),
-                byte_thing: Some(1),
-                i32_thing: Some(324382098),
-                i64_thing: Some(12938492818),
-            }),
+            struct_thing: Some(
+                Xtruct {
+                    string_thing: Some("foo".to_owned()),
+                    byte_thing: Some(1),
+                    i32_thing: Some(324382098),
+                    i64_thing: Some(12938492818),
+                },
+            ),
             i32_thing: Some(293481098),
         };
         let x_cmp = Xtruct2 {
             byte_thing: Some(32),
-            struct_thing: Some(Xtruct {
-                string_thing: Some("foo".to_owned()),
-                byte_thing: Some(1),
-                i32_thing: Some(324382098),
-                i64_thing: Some(12938492818),
-            }),
+            struct_thing: Some(
+                Xtruct {
+                    string_thing: Some("foo".to_owned()),
+                    byte_thing: Some(1),
+                    i32_thing: Some(324382098),
+                    i64_thing: Some(12938492818),
+                },
+            ),
             i32_thing: Some(293481098),
         };
         verify_expected_result(client.test_nest(x_snd), x_cmp)?;
@@ -270,7 +292,8 @@
     }
 
     // nested map
-    // expect : {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2 => 2, 3 => 3, 4 => 4, }, }
+    // expect : {-4 => {-4 => -4, -3 => -3, -2 => -2, -1 => -1, }, 4 => {1 => 1, 2
+    // => 2, 3 => 3, 4 => 4, }, }
     println!("testMapMap");
     {
         let mut m_cmp_nested_0: BTreeMap<i32, i32> = BTreeMap::new();
@@ -302,13 +325,10 @@
             i64_thing: Some(-19234123981),
         };
 
-        verify_expected_result(client.test_multi(1,
-                                                 -123948,
-                                                 -19234123981,
-                                                 m_snd,
-                                                 Numberz::EIGHT,
-                                                 81),
-                               s_cmp)?;
+        verify_expected_result(
+            client.test_multi(1, -123948, -19234123981, m_snd, Numberz::EIGHT, 81),
+            s_cmp,
+        )?;
     }
 
     // Insanity
@@ -324,24 +344,30 @@
         arg_map_usermap.insert(Numberz::EIGHT, 19);
 
         let mut arg_vec_xtructs: Vec<Xtruct> = Vec::new();
-        arg_vec_xtructs.push(Xtruct {
-            string_thing: Some("foo".to_owned()),
-            byte_thing: Some(8),
-            i32_thing: Some(29),
-            i64_thing: Some(92384),
-        });
-        arg_vec_xtructs.push(Xtruct {
-            string_thing: Some("bar".to_owned()),
-            byte_thing: Some(28),
-            i32_thing: Some(2),
-            i64_thing: Some(-1281),
-        });
-        arg_vec_xtructs.push(Xtruct {
-            string_thing: Some("baz".to_owned()),
-            byte_thing: Some(0),
-            i32_thing: Some(3948539),
-            i64_thing: Some(-12938492),
-        });
+        arg_vec_xtructs.push(
+            Xtruct {
+                string_thing: Some("foo".to_owned()),
+                byte_thing: Some(8),
+                i32_thing: Some(29),
+                i64_thing: Some(92384),
+            },
+        );
+        arg_vec_xtructs.push(
+            Xtruct {
+                string_thing: Some("bar".to_owned()),
+                byte_thing: Some(28),
+                i32_thing: Some(2),
+                i64_thing: Some(-1281),
+            },
+        );
+        arg_vec_xtructs.push(
+            Xtruct {
+                string_thing: Some("baz".to_owned()),
+                byte_thing: Some(0),
+                i32_thing: Some(3948539),
+                i64_thing: Some(-12938492),
+            },
+        );
 
         let mut s_cmp_nested_1: BTreeMap<Numberz, Insanity> = BTreeMap::new();
         let insanity = Insanity {
@@ -372,7 +398,7 @@
             Err(thrift::Error::User(ref e)) => {
                 match e.downcast_ref::<Xception>() {
                     Some(x) => Ok(x),
-                    None => Err(thrift::Error::User("did not get expected Xception struct".into())),
+                    None => Err(thrift::Error::User("did not get expected Xception struct".into()),),
                 }
             }
             _ => Err(thrift::Error::User("did not get exception".into())),
@@ -414,7 +440,7 @@
             Err(thrift::Error::User(ref e)) => {
                 match e.downcast_ref::<Xception>() {
                     Some(x) => Ok(x),
-                    None => Err(thrift::Error::User("did not get expected Xception struct".into())),
+                    None => Err(thrift::Error::User("did not get expected Xception struct".into()),),
                 }
             }
             _ => Err(thrift::Error::User("did not get exception".into())),
@@ -435,7 +461,7 @@
             Err(thrift::Error::User(ref e)) => {
                 match e.downcast_ref::<Xception2>() {
                     Some(x) => Ok(x),
-                    None => Err(thrift::Error::User("did not get expected Xception struct".into())),
+                    None => Err(thrift::Error::User("did not get expected Xception struct".into()),),
                 }
             }
             _ => Err(thrift::Error::User("did not get exception".into())),
@@ -443,12 +469,17 @@
 
         let x_cmp = Xception2 {
             error_code: Some(2002),
-            struct_thing: Some(Xtruct {
-                string_thing: Some("This is an Xception2".to_owned()),
-                byte_thing: Some(0), /* since this is an OPT_IN_REQ_OUT field the sender sets a default */
-                i32_thing: Some(0), /* since this is an OPT_IN_REQ_OUT field the sender sets a default */
-                i64_thing: Some(0), /* since this is an OPT_IN_REQ_OUT field the sender sets a default */
-            }),
+            struct_thing: Some(
+                Xtruct {
+                    string_thing: Some("This is an Xception2".to_owned()),
+                    // since this is an OPT_IN_REQ_OUT field the sender sets a default
+                    byte_thing: Some(0),
+                    // since this is an OPT_IN_REQ_OUT field the sender sets a default
+                    i32_thing: Some(0),
+                    // since this is an OPT_IN_REQ_OUT field the sender sets a default
+                    i64_thing: Some(0),
+                },
+            ),
         };
 
         verify_expected_result(Ok(x), &x_cmp)?;
@@ -458,17 +489,18 @@
     {
         let r = client.test_multi_exception("haha".to_owned(), "RETURNED".to_owned());
         let x = match r {
-            Err(e) => {
-                Err(thrift::Error::User(format!("received an unexpected exception {:?}", e).into()))
-            }
+            Err(e) => Err(thrift::Error::User(format!("received an unexpected exception {:?}", e).into(),),),
             _ => r,
         }?;
 
         let x_cmp = Xtruct {
             string_thing: Some("RETURNED".to_owned()),
-            byte_thing: Some(0), // since this is an OPT_IN_REQ_OUT field the sender sets a default
-            i32_thing: Some(0), // since this is an OPT_IN_REQ_OUT field the sender sets a default
-            i64_thing: Some(0), // since this is an OPT_IN_REQ_OUT field the sender sets a default
+            // since this is an OPT_IN_REQ_OUT field the sender sets a default
+            byte_thing: Some(0),
+            // since this is an OPT_IN_REQ_OUT field the sender sets a default
+            i32_thing: Some(0),
+            // since this is an OPT_IN_REQ_OUT field the sender sets a default
+            i64_thing: Some(0),
         };
 
         verify_expected_result(Ok(x), x_cmp)?;
@@ -479,20 +511,22 @@
         client.test_oneway(1)?;
     }
 
-    // final test to verify that the connection is still writable after the one-way call
+    // final test to verify that the connection is still writable after the one-way
+    // call
     client.test_void()
 }
 
-fn verify_expected_result<T: Debug + PartialEq + Sized>(actual: Result<T, thrift::Error>,
-                                                        expected: T)
-                                                        -> Result<(), thrift::Error> {
+#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
+fn verify_expected_result<T: Debug + PartialEq + Sized>(
+    actual: Result<T, thrift::Error>,
+    expected: T,
+) -> Result<(), thrift::Error> {
     match actual {
         Ok(v) => {
             if v == expected {
                 Ok(())
             } else {
-                Err(thrift::Error::User(format!("expected {:?} but got {:?}", &expected, &v)
-                    .into()))
+                Err(thrift::Error::User(format!("expected {:?} but got {:?}", &expected, &v).into()),)
             }
         }
         Err(e) => Err(e),