THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
index d2314a1..b1243a8 100644
--- a/lib/rs/src/server/multiplexed.rs
+++ b/lib/rs/src/server/multiplexed.rs
@@ -17,9 +17,10 @@
 
 use std::collections::HashMap;
 use std::convert::Into;
+use std::sync::{Arc, Mutex};
 
-use ::{new_application_error, ApplicationErrorKind};
-use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
+use {ApplicationErrorKind, new_application_error};
+use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
 
 use super::TProcessor;
 
@@ -33,8 +34,9 @@
 ///
 /// A `TMultiplexedProcessor` can only handle messages sent by a
 /// `TMultiplexedOutputProtocol`.
+// FIXME: implement Debug
 pub struct TMultiplexedProcessor {
-    processors: HashMap<String, Box<TProcessor>>,
+    processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>,
 }
 
 impl TMultiplexedProcessor {
@@ -46,46 +48,62 @@
     /// Return `false` if a mapping previously existed (the previous mapping is
     /// *not* overwritten).
     #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
-    pub fn register_processor<S: Into<String>>(&mut self,
-                                               service_name: S,
-                                               processor: Box<TProcessor>)
-                                               -> bool {
+    pub fn register_processor<S: Into<String>>(
+        &mut self,
+        service_name: S,
+        processor: Box<TProcessor>,
+    ) -> bool {
+        let mut processors = self.processors.lock().unwrap();
+
         let name = service_name.into();
-        if self.processors.contains_key(&name) {
+        if processors.contains_key(&name) {
             false
         } else {
-            self.processors.insert(name, processor);
+            processors.insert(name, Arc::new(processor));
             true
         }
     }
 }
 
 impl TProcessor for TMultiplexedProcessor {
-    fn process(&mut self,
-               i_prot: &mut TInputProtocol,
-               o_prot: &mut TOutputProtocol)
-               -> ::Result<()> {
+    fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
         let msg_ident = i_prot.read_message_begin()?;
-        let sep_index = msg_ident.name
+        let sep_index = msg_ident
+            .name
             .find(':')
-            .ok_or_else(|| {
-                new_application_error(ApplicationErrorKind::Unknown,
-                                      "no service separator found in incoming message")
-            })?;
+            .ok_or_else(
+                || {
+                    new_application_error(
+                        ApplicationErrorKind::Unknown,
+                        "no service separator found in incoming message",
+                    )
+                },
+            )?;
 
         let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
 
-        match self.processors.get_mut(svc_name) {
-            Some(ref mut processor) => {
-                let new_msg_ident = TMessageIdentifier::new(svc_call,
-                                                            msg_ident.message_type,
-                                                            msg_ident.sequence_number);
+        let processor: Option<Arc<Box<TProcessor>>> = {
+            let processors = self.processors.lock().unwrap();
+            processors.get(svc_name).cloned()
+        };
+
+        match processor {
+            Some(arc) => {
+                let new_msg_ident = TMessageIdentifier::new(
+                    svc_call,
+                    msg_ident.message_type,
+                    msg_ident.sequence_number,
+                );
                 let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
-                processor.process(&mut proxy_i_prot, o_prot)
+                (*arc).process(&mut proxy_i_prot, o_prot)
             }
             None => {
-                Err(new_application_error(ApplicationErrorKind::Unknown,
-                                          format!("no processor found for service {}", svc_name)))
+                Err(
+                    new_application_error(
+                        ApplicationErrorKind::Unknown,
+                        format!("no processor found for service {}", svc_name),
+                    ),
+                )
             }
         }
     }