THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/lib/rs/test/src/bin/kitchen_sink_client.rs b/lib/rs/test/src/bin/kitchen_sink_client.rs
index 27171be..9738298 100644
--- a/lib/rs/test/src/bin/kitchen_sink_client.rs
+++ b/lib/rs/test/src/bin/kitchen_sink_client.rs
@@ -21,13 +21,11 @@
 extern crate kitchen_sink;
 extern crate thrift;
 
-use std::cell::RefCell;
-use std::rc::Rc;
-
 use kitchen_sink::base_two::{TNapkinServiceSyncClient, TRamenServiceSyncClient};
 use kitchen_sink::midlayer::{MealServiceSyncClient, TMealServiceSyncClient};
 use kitchen_sink::ultimate::{FullMealServiceSyncClient, TFullMealServiceSyncClient};
-use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+use thrift::transport::{ReadHalf, TFramedReadTransport, TFramedWriteTransport, TIoChannel,
+                        TTcpChannel, WriteHalf};
 use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol,
                        TCompactOutputProtocol, TInputProtocol, TOutputProtocol};
 
@@ -50,24 +48,25 @@
         (@arg port: --port +takes_value "Port on which the Thrift test server is listening")
         (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
         (@arg service: --service +takes_value "Service type to contact (\"part\", \"full\")")
-    ).get_matches();
+    )
+            .get_matches();
 
     let host = matches.value_of("host").unwrap_or("127.0.0.1");
     let port = value_t!(matches, "port", u16).unwrap_or(9090);
     let protocol = matches.value_of("protocol").unwrap_or("compact");
     let service = matches.value_of("service").unwrap_or("part");
 
-    let t = open_tcp_transport(host, port)?;
-    let t = Rc::new(RefCell::new(Box::new(TFramedTransport::new(t)) as Box<TTransport>));
+    let (i_chan, o_chan) = tcp_channel(host, port)?;
+    let (i_tran, o_tran) = (TFramedReadTransport::new(i_chan), TFramedWriteTransport::new(o_chan));
 
     let (i_prot, o_prot): (Box<TInputProtocol>, Box<TOutputProtocol>) = match protocol {
         "binary" => {
-            (Box::new(TBinaryInputProtocol::new(t.clone(), true)),
-             Box::new(TBinaryOutputProtocol::new(t.clone(), true)))
+            (Box::new(TBinaryInputProtocol::new(i_tran, true)),
+             Box::new(TBinaryOutputProtocol::new(o_tran, true)))
         }
         "compact" => {
-            (Box::new(TCompactInputProtocol::new(t.clone())),
-             Box::new(TCompactOutputProtocol::new(t.clone())))
+            (Box::new(TCompactInputProtocol::new(i_tran)),
+             Box::new(TCompactOutputProtocol::new(o_tran)))
         }
         unmatched => return Err(format!("unsupported protocol {}", unmatched).into()),
     };
@@ -75,28 +74,31 @@
     run_client(service, i_prot, o_prot)
 }
 
-fn run_client(service: &str,
-              i_prot: Box<TInputProtocol>,
-              o_prot: Box<TOutputProtocol>)
-              -> thrift::Result<()> {
+fn run_client(
+    service: &str,
+    i_prot: Box<TInputProtocol>,
+    o_prot: Box<TOutputProtocol>,
+) -> thrift::Result<()> {
     match service {
         "full" => run_full_meal_service(i_prot, o_prot),
         "part" => run_meal_service(i_prot, o_prot),
-        _ => Err(thrift::Error::from(format!("unknown service type {}", service))),
+        _ => Err(thrift::Error::from(format!("unknown service type {}", service)),),
     }
 }
 
-fn open_tcp_transport(host: &str, port: u16) -> thrift::Result<Rc<RefCell<Box<TTransport>>>> {
-    let mut t = TTcpTransport::new();
-    match t.open(&format!("{}:{}", host, port)) {
-        Ok(()) => Ok(Rc::new(RefCell::new(Box::new(t) as Box<TTransport>))),
-        Err(e) => Err(e),
-    }
+fn tcp_channel(
+    host: &str,
+    port: u16,
+) -> thrift::Result<(ReadHalf<TTcpChannel>, WriteHalf<TTcpChannel>)> {
+    let mut c = TTcpChannel::new();
+    c.open(&format!("{}:{}", host, port))?;
+    c.split()
 }
 
-fn run_meal_service(i_prot: Box<TInputProtocol>,
-                    o_prot: Box<TOutputProtocol>)
-                    -> thrift::Result<()> {
+fn run_meal_service(
+    i_prot: Box<TInputProtocol>,
+    o_prot: Box<TOutputProtocol>,
+) -> thrift::Result<()> {
     let mut client = MealServiceSyncClient::new(i_prot, o_prot);
 
     // client.full_meal(); // <-- IMPORTANT: if you uncomment this, compilation *should* fail
@@ -110,9 +112,10 @@
     Ok(())
 }
 
-fn run_full_meal_service(i_prot: Box<TInputProtocol>,
-                         o_prot: Box<TOutputProtocol>)
-                         -> thrift::Result<()> {
+fn run_full_meal_service(
+    i_prot: Box<TInputProtocol>,
+    o_prot: Box<TOutputProtocol>,
+) -> thrift::Result<()> {
     let mut client = FullMealServiceSyncClient::new(i_prot, o_prot);
 
     execute_call("full", "ramen", || client.ramen(100))?;
@@ -124,17 +127,20 @@
 }
 
 fn execute_call<F, R>(service_type: &str, call_name: &str, mut f: F) -> thrift::Result<()>
-    where F: FnMut() -> thrift::Result<R>
+where
+    F: FnMut() -> thrift::Result<R>,
 {
     let res = f();
 
     match res {
         Ok(_) => println!("{}: completed {} call", service_type, call_name),
         Err(ref e) => {
-            println!("{}: failed {} call with error {:?}",
-                     service_type,
-                     call_name,
-                     e)
+            println!(
+                "{}: failed {} call with error {:?}",
+                service_type,
+                call_name,
+                e
+            )
         }
     }
 
diff --git a/lib/rs/test/src/bin/kitchen_sink_server.rs b/lib/rs/test/src/bin/kitchen_sink_server.rs
index 4ce4fa3..19112cd 100644
--- a/lib/rs/test/src/bin/kitchen_sink_server.rs
+++ b/lib/rs/test/src/bin/kitchen_sink_server.rs
@@ -22,7 +22,7 @@
 extern crate thrift;
 
 use kitchen_sink::base_one::Noodle;
-use kitchen_sink::base_two::{Napkin, Ramen, NapkinServiceSyncHandler, RamenServiceSyncHandler};
+use kitchen_sink::base_two::{Napkin, NapkinServiceSyncHandler, Ramen, RamenServiceSyncHandler};
 use kitchen_sink::midlayer::{Dessert, Meal, MealServiceSyncHandler, MealServiceSyncProcessor};
 use kitchen_sink::ultimate::{Drink, FullMeal, FullMealAndDrinks,
                              FullMealAndDrinksServiceSyncProcessor, FullMealServiceSyncHandler};
@@ -30,8 +30,9 @@
 use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory,
                        TCompactInputProtocolFactory, TCompactOutputProtocolFactory,
                        TInputProtocolFactory, TOutputProtocolFactory};
-use thrift::transport::{TFramedTransportFactory, TTransportFactory};
-use thrift::server::TSimpleServer;
+use thrift::transport::{TFramedReadTransportFactory, TFramedWriteTransportFactory,
+                        TReadTransportFactory, TWriteTransportFactory};
+use thrift::server::TServer;
 
 fn main() {
     match run() {
@@ -52,7 +53,8 @@
         (@arg port: --port +takes_value "port on which the test server listens")
         (@arg protocol: --protocol +takes_value "Thrift protocol implementation to use (\"binary\", \"compact\")")
         (@arg service: --service +takes_value "Service type to contact (\"part\", \"full\")")
-    ).get_matches();
+    )
+            .get_matches();
 
     let port = value_t!(matches, "port", u16).unwrap_or(9090);
     let protocol = matches.value_of("protocol").unwrap_or("compact");
@@ -61,9 +63,8 @@
 
     println!("binding to {}", listen_address);
 
-    let (i_transport_factory, o_transport_factory): (Box<TTransportFactory>,
-                                                     Box<TTransportFactory>) =
-        (Box::new(TFramedTransportFactory {}), Box::new(TFramedTransportFactory {}));
+    let r_transport_factory = TFramedReadTransportFactory::new();
+    let w_transport_factory = TFramedWriteTransportFactory::new();
 
     let (i_protocol_factory, o_protocol_factory): (Box<TInputProtocolFactory>,
                                                    Box<TOutputProtocolFactory>) =
@@ -93,51 +94,75 @@
     // Since what I'm doing is uncommon I'm just going to duplicate the code
     match &*service {
         "part" => {
-            run_meal_server(&listen_address,
-                            i_transport_factory,
-                            i_protocol_factory,
-                            o_transport_factory,
-                            o_protocol_factory)
+            run_meal_server(
+                &listen_address,
+                r_transport_factory,
+                i_protocol_factory,
+                w_transport_factory,
+                o_protocol_factory,
+            )
         }
         "full" => {
-            run_full_meal_server(&listen_address,
-                                 i_transport_factory,
-                                 i_protocol_factory,
-                                 o_transport_factory,
-                                 o_protocol_factory)
+            run_full_meal_server(
+                &listen_address,
+                r_transport_factory,
+                i_protocol_factory,
+                w_transport_factory,
+                o_protocol_factory,
+            )
         }
         unknown => Err(format!("unsupported service type {}", unknown).into()),
     }
 }
 
-fn run_meal_server(listen_address: &str,
-                   i_transport_factory: Box<TTransportFactory>,
-                   i_protocol_factory: Box<TInputProtocolFactory>,
-                   o_transport_factory: Box<TTransportFactory>,
-                   o_protocol_factory: Box<TOutputProtocolFactory>)
-                   -> thrift::Result<()> {
+fn run_meal_server<RTF, IPF, WTF, OPF>(
+    listen_address: &str,
+    r_transport_factory: RTF,
+    i_protocol_factory: IPF,
+    w_transport_factory: WTF,
+    o_protocol_factory: OPF,
+) -> thrift::Result<()>
+where
+    RTF: TReadTransportFactory + 'static,
+    IPF: TInputProtocolFactory + 'static,
+    WTF: TWriteTransportFactory + 'static,
+    OPF: TOutputProtocolFactory + 'static,
+{
     let processor = MealServiceSyncProcessor::new(PartHandler {});
-    let mut server = TSimpleServer::new(i_transport_factory,
-                                        i_protocol_factory,
-                                        o_transport_factory,
-                                        o_protocol_factory,
-                                        processor);
+    let mut server = TServer::new(
+        r_transport_factory,
+        i_protocol_factory,
+        w_transport_factory,
+        o_protocol_factory,
+        processor,
+        1,
+    );
 
     server.listen(listen_address)
 }
 
-fn run_full_meal_server(listen_address: &str,
-                        i_transport_factory: Box<TTransportFactory>,
-                        i_protocol_factory: Box<TInputProtocolFactory>,
-                        o_transport_factory: Box<TTransportFactory>,
-                        o_protocol_factory: Box<TOutputProtocolFactory>)
-                        -> thrift::Result<()> {
+fn run_full_meal_server<RTF, IPF, WTF, OPF>(
+    listen_address: &str,
+    r_transport_factory: RTF,
+    i_protocol_factory: IPF,
+    w_transport_factory: WTF,
+    o_protocol_factory: OPF,
+) -> thrift::Result<()>
+where
+    RTF: TReadTransportFactory + 'static,
+    IPF: TInputProtocolFactory + 'static,
+    WTF: TWriteTransportFactory + 'static,
+    OPF: TOutputProtocolFactory + 'static,
+{
     let processor = FullMealAndDrinksServiceSyncProcessor::new(FullHandler {});
-    let mut server = TSimpleServer::new(i_transport_factory,
-                                        i_protocol_factory,
-                                        o_transport_factory,
-                                        o_protocol_factory,
-                                        processor);
+    let mut server = TServer::new(
+        r_transport_factory,
+        i_protocol_factory,
+        w_transport_factory,
+        o_protocol_factory,
+        processor,
+        1,
+    );
 
     server.listen(listen_address)
 }
@@ -145,21 +170,21 @@
 struct PartHandler;
 
 impl MealServiceSyncHandler for PartHandler {
-    fn handle_meal(&mut self) -> thrift::Result<Meal> {
+    fn handle_meal(&self) -> thrift::Result<Meal> {
         println!("part: handling meal call");
         Ok(meal())
     }
 }
 
 impl RamenServiceSyncHandler for PartHandler {
-    fn handle_ramen(&mut self, _: i32) -> thrift::Result<Ramen> {
+    fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> {
         println!("part: handling ramen call");
         Ok(ramen())
     }
 }
 
 impl NapkinServiceSyncHandler for PartHandler {
-    fn handle_napkin(&mut self) -> thrift::Result<Napkin> {
+    fn handle_napkin(&self) -> thrift::Result<Napkin> {
         println!("part: handling napkin call");
         Ok(napkin())
     }
@@ -171,34 +196,34 @@
 struct FullHandler;
 
 impl FullMealAndDrinksServiceSyncHandler for FullHandler {
-    fn handle_full_meal_and_drinks(&mut self) -> thrift::Result<FullMealAndDrinks> {
+    fn handle_full_meal_and_drinks(&self) -> thrift::Result<FullMealAndDrinks> {
         Ok(FullMealAndDrinks::new(full_meal(), Drink::WHISKEY))
     }
 }
 
 impl FullMealServiceSyncHandler for FullHandler {
-    fn handle_full_meal(&mut self) -> thrift::Result<FullMeal> {
+    fn handle_full_meal(&self) -> thrift::Result<FullMeal> {
         println!("full: handling full meal call");
         Ok(full_meal())
     }
 }
 
 impl MealServiceSyncHandler for FullHandler {
-    fn handle_meal(&mut self) -> thrift::Result<Meal> {
+    fn handle_meal(&self) -> thrift::Result<Meal> {
         println!("full: handling meal call");
         Ok(meal())
     }
 }
 
 impl RamenServiceSyncHandler for FullHandler {
-    fn handle_ramen(&mut self, _: i32) -> thrift::Result<Ramen> {
+    fn handle_ramen(&self, _: i32) -> thrift::Result<Ramen> {
         println!("full: handling ramen call");
         Ok(ramen())
     }
 }
 
 impl NapkinServiceSyncHandler for FullHandler {
-    fn handle_napkin(&mut self) -> thrift::Result<Napkin> {
+    fn handle_napkin(&self) -> thrift::Result<Napkin> {
         println!("full: handling napkin call");
         Ok(napkin())
     }
diff --git a/lib/rs/test/src/lib.rs b/lib/rs/test/src/lib.rs
index 8a7ccd0..53f4873 100644
--- a/lib/rs/test/src/lib.rs
+++ b/lib/rs/test/src/lib.rs
@@ -48,6 +48,9 @@
 
     #[test]
     fn must_be_able_to_use_defaults() {
-        let _ = midlayer::Meal { noodle: Some(base_one::Noodle::default()), ..Default::default() };
+        let _ = midlayer::Meal {
+            noodle: Some(base_one::Noodle::default()),
+            ..Default::default()
+        };
     }
 }