THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/tutorial/rs/README.md b/tutorial/rs/README.md
index 4d0d7c8..384e9f8 100644
--- a/tutorial/rs/README.md
+++ b/tutorial/rs/README.md
@@ -35,13 +35,12 @@
 extern crate try_from;
 
 // generated Rust module
-mod tutorial;
+use tutorial;
 
-use std::cell::RefCell;
-use std::rc::Rc;
-use thrift::protocol::{TInputProtocol, TOutputProtocol};
 use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol};
-use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+use thrift::protocol::{TInputProtocol, TOutputProtocol};
+use thrift::transport::{TFramedReadTransport, TFramedWriteTransport};
+use thrift::transport::{TIoChannel, TTcpChannel};
 use tutorial::{CalculatorSyncClient, TCalculatorSyncClient};
 use tutorial::{Operation, Work};
 
@@ -61,28 +60,16 @@
     //
 
     println!("connect to server on 127.0.0.1:9090");
-    let mut t = TTcpTransport::new();
-    let t = match t.open("127.0.0.1:9090") {
-        Ok(()) => t,
-        Err(e) => {
-            return Err(
-                format!("failed to connect with {:?}", e).into()
-            );
-        }
-    };
+    let mut c = TTcpTransport::new();
+    c.open("127.0.0.1:9090")?;
 
-    let t = Rc::new(RefCell::new(
-        Box::new(t) as Box<TTransport>
-    ));
-    let t = Rc::new(RefCell::new(
-        Box::new(TFramedTransport::new(t)) as Box<TTransport>
-    ));
-
-    let i_prot: Box<TInputProtocol> = Box::new(
-        TCompactInputProtocol::new(t.clone())
+    let (i_chan, o_chan) = c.split()?;
+    
+    let i_prot = TCompactInputProtocol::new(
+        TFramedReadTransport::new(i_chan)
     );
-    let o_prot: Box<TOutputProtocol> = Box::new(
-        TCompactOutputProtocol::new(t.clone())
+    let o_prot = TCompactOutputProtocol::new(
+        TFramedWriteTransport::new(o_chan)
     );
 
     let client = CalculatorSyncClient::new(i_prot, o_prot);
@@ -177,10 +164,10 @@
 ```thrift
 typedef i64 UserId
 
-typedef map<string, Bonk> MapType
+typedef map<string, UserId> MapType
 ```
 ```rust
-pub type UserId = 164;
+pub type UserId = i64;
 
 pub type MapType = BTreeMap<String, Bonk>;
 ```
@@ -327,4 +314,4 @@
 ## Known Issues
 
 * Struct constants are not supported
-* Map, list and set constants require a const holder struct
\ No newline at end of file
+* Map, list and set constants require a const holder struct
diff --git a/tutorial/rs/src/bin/tutorial_client.rs b/tutorial/rs/src/bin/tutorial_client.rs
index 2b0d4f9..24ab4be 100644
--- a/tutorial/rs/src/bin/tutorial_client.rs
+++ b/tutorial/rs/src/bin/tutorial_client.rs
@@ -21,15 +21,12 @@
 extern crate thrift;
 extern crate thrift_tutorial;
 
-use std::cell::RefCell;
-use std::rc::Rc;
-
-use thrift::protocol::{TInputProtocol, TOutputProtocol};
 use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol};
-use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+use thrift::transport::{ReadHalf, TFramedReadTransport, TFramedWriteTransport, TIoChannel,
+                        TTcpChannel, WriteHalf};
 
 use thrift_tutorial::shared::TSharedServiceSyncClient;
-use thrift_tutorial::tutorial::{CalculatorSyncClient, TCalculatorSyncClient, Operation, Work};
+use thrift_tutorial::tutorial::{CalculatorSyncClient, Operation, TCalculatorSyncClient, Work};
 
 fn main() {
     match run() {
@@ -73,7 +70,8 @@
     let logid = 32;
 
     // let's do...a multiply!
-    let res = client.calculate(logid, Work::new(7, 8, Operation::MULTIPLY, None))?;
+    let res = client
+        .calculate(logid, Work::new(7, 8, Operation::MULTIPLY, None))?;
     println!("multiplied 7 and 8 and got {}", res);
 
     // let's get the log for it
@@ -102,34 +100,31 @@
     Ok(())
 }
 
-fn new_client(host: &str, port: u16) -> thrift::Result<CalculatorSyncClient> {
-    let mut t = TTcpTransport::new();
+type ClientInputProtocol = TCompactInputProtocol<TFramedReadTransport<ReadHalf<TTcpChannel>>>;
+type ClientOutputProtocol = TCompactOutputProtocol<TFramedWriteTransport<WriteHalf<TTcpChannel>>>;
+
+fn new_client
+    (
+    host: &str,
+    port: u16,
+) -> thrift::Result<CalculatorSyncClient<ClientInputProtocol, ClientOutputProtocol>> {
+    let mut c = TTcpChannel::new();
 
     // open the underlying TCP stream
     println!("connecting to tutorial server on {}:{}", host, port);
-    let t = match t.open(&format!("{}:{}", host, port)) {
-        Ok(()) => t,
-        Err(e) => {
-            return Err(format!("failed to open tcp stream to {}:{} error:{:?}",
-                               host,
-                               port,
-                               e)
-                .into());
-        }
-    };
+    c.open(&format!("{}:{}", host, port))?;
 
-    // refcounted because it's shared by both input and output transports
-    let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
+    // clone the TCP channel into two halves, one which
+    // we'll use for reading, the other for writing
+    let (i_chan, o_chan) = c.split()?;
 
-    // wrap a raw socket (slow) with a buffered transport of some kind
-    let t = Box::new(TFramedTransport::new(t)) as Box<TTransport>;
-
-    // refcounted again because it's shared by both input and output protocols
-    let t = Rc::new(RefCell::new(t));
+    // wrap the raw sockets (slow) with a buffered transport of some kind
+    let i_tran = TFramedReadTransport::new(i_chan);
+    let o_tran = TFramedWriteTransport::new(o_chan);
 
     // now create the protocol implementations
-    let i_prot = Box::new(TCompactInputProtocol::new(t.clone())) as Box<TInputProtocol>;
-    let o_prot = Box::new(TCompactOutputProtocol::new(t.clone())) as Box<TOutputProtocol>;
+    let i_prot = TCompactInputProtocol::new(i_tran);
+    let o_prot = TCompactOutputProtocol::new(o_tran);
 
     // we're done!
     Ok(CalculatorSyncClient::new(i_prot, o_prot))
diff --git a/tutorial/rs/src/bin/tutorial_server.rs b/tutorial/rs/src/bin/tutorial_server.rs
index 9cc1866..8db8eed 100644
--- a/tutorial/rs/src/bin/tutorial_server.rs
+++ b/tutorial/rs/src/bin/tutorial_server.rs
@@ -24,12 +24,12 @@
 use std::collections::HashMap;
 use std::convert::{From, Into};
 use std::default::Default;
+use std::sync::Mutex;
 
-use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
 use thrift::protocol::{TCompactInputProtocolFactory, TCompactOutputProtocolFactory};
-use thrift::server::TSimpleServer;
+use thrift::server::TServer;
 
-use thrift::transport::{TFramedTransportFactory, TTransportFactory};
+use thrift::transport::{TFramedReadTransportFactory, TFramedWriteTransportFactory};
 use thrift_tutorial::shared::{SharedServiceSyncHandler, SharedStruct};
 use thrift_tutorial::tutorial::{CalculatorSyncHandler, CalculatorSyncProcessor};
 use thrift_tutorial::tutorial::{InvalidOperation, Operation, Work};
@@ -58,33 +58,36 @@
 
     println!("binding to {}", listen_address);
 
-    let i_tran_fact: Box<TTransportFactory> = Box::new(TFramedTransportFactory::new());
-    let i_prot_fact: Box<TInputProtocolFactory> = Box::new(TCompactInputProtocolFactory::new());
+    let i_tran_fact = TFramedReadTransportFactory::new();
+    let i_prot_fact = TCompactInputProtocolFactory::new();
 
-    let o_tran_fact: Box<TTransportFactory> = Box::new(TFramedTransportFactory::new());
-    let o_prot_fact: Box<TOutputProtocolFactory> = Box::new(TCompactOutputProtocolFactory::new());
+    let o_tran_fact = TFramedWriteTransportFactory::new();
+    let o_prot_fact = TCompactOutputProtocolFactory::new();
 
     // demux incoming messages
     let processor = CalculatorSyncProcessor::new(CalculatorServer { ..Default::default() });
 
     // create the server and start listening
-    let mut server = TSimpleServer::new(i_tran_fact,
-                                        i_prot_fact,
-                                        o_tran_fact,
-                                        o_prot_fact,
-                                        processor);
+    let mut server = TServer::new(
+        i_tran_fact,
+        i_prot_fact,
+        o_tran_fact,
+        o_prot_fact,
+        processor,
+        10,
+    );
 
     server.listen(&listen_address)
 }
 
 /// Handles incoming Calculator service calls.
 struct CalculatorServer {
-    log: HashMap<i32, SharedStruct>,
+    log: Mutex<HashMap<i32, SharedStruct>>,
 }
 
 impl Default for CalculatorServer {
     fn default() -> CalculatorServer {
-        CalculatorServer { log: HashMap::new() }
+        CalculatorServer { log: Mutex::new(HashMap::new()) }
     }
 }
 
@@ -94,9 +97,9 @@
 
 // SharedService handler
 impl SharedServiceSyncHandler for CalculatorServer {
-    fn handle_get_struct(&mut self, key: i32) -> thrift::Result<SharedStruct> {
-        self.log
-            .get(&key)
+    fn handle_get_struct(&self, key: i32) -> thrift::Result<SharedStruct> {
+        let log = self.log.lock().unwrap();
+        log.get(&key)
             .cloned()
             .ok_or_else(|| format!("could not find log for key {}", key).into())
     }
@@ -104,25 +107,27 @@
 
 // Calculator handler
 impl CalculatorSyncHandler for CalculatorServer {
-    fn handle_ping(&mut self) -> thrift::Result<()> {
+    fn handle_ping(&self) -> thrift::Result<()> {
         println!("pong!");
         Ok(())
     }
 
-    fn handle_add(&mut self, num1: i32, num2: i32) -> thrift::Result<i32> {
+    fn handle_add(&self, num1: i32, num2: i32) -> thrift::Result<i32> {
         println!("handling add: n1:{} n2:{}", num1, num2);
         Ok(num1 + num2)
     }
 
-    fn handle_calculate(&mut self, logid: i32, w: Work) -> thrift::Result<i32> {
+    fn handle_calculate(&self, logid: i32, w: Work) -> thrift::Result<i32> {
         println!("handling calculate: l:{}, w:{:?}", logid, w);
 
         let res = if let Some(ref op) = w.op {
             if w.num1.is_none() || w.num2.is_none() {
-                Err(InvalidOperation {
-                    what_op: Some(*op as i32),
-                    why: Some("no operands specified".to_owned()),
-                })
+                Err(
+                    InvalidOperation {
+                        what_op: Some(*op as i32),
+                        why: Some("no operands specified".to_owned()),
+                    },
+                )
             } else {
                 // so that I don't have to call unwrap() multiple times below
                 let num1 = w.num1.as_ref().expect("operands checked");
@@ -134,10 +139,12 @@
                     Operation::MULTIPLY => Ok(num1 * num2),
                     Operation::DIVIDE => {
                         if *num2 == 0 {
-                            Err(InvalidOperation {
-                                what_op: Some(*op as i32),
-                                why: Some("divide by 0".to_owned()),
-                            })
+                            Err(
+                                InvalidOperation {
+                                    what_op: Some(*op as i32),
+                                    why: Some("divide by 0".to_owned()),
+                                },
+                            )
                         } else {
                             Ok(num1 / num2)
                         }
@@ -145,12 +152,13 @@
                 }
             }
         } else {
-            Err(InvalidOperation::new(None, "no operation specified".to_owned()))
+            Err(InvalidOperation::new(None, "no operation specified".to_owned()),)
         };
 
         // if the operation was successful log it
         if let Ok(ref v) = res {
-            self.log.insert(logid, SharedStruct::new(logid, format!("{}", v)));
+            let mut log = self.log.lock().unwrap();
+            log.insert(logid, SharedStruct::new(logid, format!("{}", v)));
         }
 
         // the try! macro automatically maps errors
@@ -161,7 +169,7 @@
         res.map_err(From::from)
     }
 
-    fn handle_zip(&mut self) -> thrift::Result<()> {
+    fn handle_zip(&self) -> thrift::Result<()> {
         println!("handling zip");
         Ok(())
     }