THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/lib/rs/src/server/mod.rs b/lib/rs/src/server/mod.rs
index ceac18a..21c392c 100644
--- a/lib/rs/src/server/mod.rs
+++ b/lib/rs/src/server/mod.rs
@@ -15,15 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Types required to implement a Thrift server.
+//! Types used to implement a Thrift server.
 
-use ::protocol::{TInputProtocol, TOutputProtocol};
+use protocol::{TInputProtocol, TOutputProtocol};
 
-mod simple;
 mod multiplexed;
+mod threaded;
 
-pub use self::simple::TSimpleServer;
 pub use self::multiplexed::TMultiplexedProcessor;
+pub use self::threaded::TServer;
 
 /// Handles incoming Thrift messages and dispatches them to the user-defined
 /// handler functions.
@@ -56,14 +56,14 @@
 ///
 /// // `TProcessor` implementation for `SimpleService`
 /// impl TProcessor for SimpleServiceSyncProcessor {
-///     fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
+///     fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
 ///         unimplemented!();
 ///     }
 /// }
 ///
 /// // service functions for SimpleService
 /// trait SimpleServiceSyncHandler {
-///     fn service_call(&mut self) -> thrift::Result<()>;
+///     fn service_call(&self) -> thrift::Result<()>;
 /// }
 ///
 /// //
@@ -73,7 +73,7 @@
 /// // define a handler that will be invoked when `service_call` is received
 /// struct SimpleServiceHandlerImpl;
 /// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
-///     fn service_call(&mut self) -> thrift::Result<()> {
+///     fn service_call(&self) -> thrift::Result<()> {
 ///         unimplemented!();
 ///     }
 /// }
@@ -82,7 +82,7 @@
 /// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
 ///
 /// // at this point you can pass the processor to the server
-/// // let server = TSimpleServer::new(..., processor);
+/// // let server = TServer::new(..., processor);
 /// ```
 pub trait TProcessor {
     /// Process a Thrift service call.
@@ -91,5 +91,5 @@
     /// the response to `o`.
     ///
     /// Returns `()` if the handler was executed; `Err` otherwise.
-    fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>;
+    fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>;
 }
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
index d2314a1..b1243a8 100644
--- a/lib/rs/src/server/multiplexed.rs
+++ b/lib/rs/src/server/multiplexed.rs
@@ -17,9 +17,10 @@
 
 use std::collections::HashMap;
 use std::convert::Into;
+use std::sync::{Arc, Mutex};
 
-use ::{new_application_error, ApplicationErrorKind};
-use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
+use {ApplicationErrorKind, new_application_error};
+use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
 
 use super::TProcessor;
 
@@ -33,8 +34,9 @@
 ///
 /// A `TMultiplexedProcessor` can only handle messages sent by a
 /// `TMultiplexedOutputProtocol`.
+// FIXME: implement Debug
 pub struct TMultiplexedProcessor {
-    processors: HashMap<String, Box<TProcessor>>,
+    processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>,
 }
 
 impl TMultiplexedProcessor {
@@ -46,46 +48,62 @@
     /// Return `false` if a mapping previously existed (the previous mapping is
     /// *not* overwritten).
     #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
-    pub fn register_processor<S: Into<String>>(&mut self,
-                                               service_name: S,
-                                               processor: Box<TProcessor>)
-                                               -> bool {
+    pub fn register_processor<S: Into<String>>(
+        &mut self,
+        service_name: S,
+        processor: Box<TProcessor>,
+    ) -> bool {
+        let mut processors = self.processors.lock().unwrap();
+
         let name = service_name.into();
-        if self.processors.contains_key(&name) {
+        if processors.contains_key(&name) {
             false
         } else {
-            self.processors.insert(name, processor);
+            processors.insert(name, Arc::new(processor));
             true
         }
     }
 }
 
 impl TProcessor for TMultiplexedProcessor {
-    fn process(&mut self,
-               i_prot: &mut TInputProtocol,
-               o_prot: &mut TOutputProtocol)
-               -> ::Result<()> {
+    fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
         let msg_ident = i_prot.read_message_begin()?;
-        let sep_index = msg_ident.name
+        let sep_index = msg_ident
+            .name
             .find(':')
-            .ok_or_else(|| {
-                new_application_error(ApplicationErrorKind::Unknown,
-                                      "no service separator found in incoming message")
-            })?;
+            .ok_or_else(
+                || {
+                    new_application_error(
+                        ApplicationErrorKind::Unknown,
+                        "no service separator found in incoming message",
+                    )
+                },
+            )?;
 
         let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
 
-        match self.processors.get_mut(svc_name) {
-            Some(ref mut processor) => {
-                let new_msg_ident = TMessageIdentifier::new(svc_call,
-                                                            msg_ident.message_type,
-                                                            msg_ident.sequence_number);
+        let processor: Option<Arc<Box<TProcessor>>> = {
+            let processors = self.processors.lock().unwrap();
+            processors.get(svc_name).cloned()
+        };
+
+        match processor {
+            Some(arc) => {
+                let new_msg_ident = TMessageIdentifier::new(
+                    svc_call,
+                    msg_ident.message_type,
+                    msg_ident.sequence_number,
+                );
                 let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
-                processor.process(&mut proxy_i_prot, o_prot)
+                (*arc).process(&mut proxy_i_prot, o_prot)
             }
             None => {
-                Err(new_application_error(ApplicationErrorKind::Unknown,
-                                          format!("no processor found for service {}", svc_name)))
+                Err(
+                    new_application_error(
+                        ApplicationErrorKind::Unknown,
+                        format!("no processor found for service {}", svc_name),
+                    ),
+                )
             }
         }
     }
diff --git a/lib/rs/src/server/simple.rs b/lib/rs/src/server/simple.rs
deleted file mode 100644
index 89ed977..0000000
--- a/lib/rs/src/server/simple.rs
+++ /dev/null
@@ -1,189 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::cell::RefCell;
-use std::net::{TcpListener, TcpStream};
-use std::rc::Rc;
-
-use ::{ApplicationError, ApplicationErrorKind};
-use ::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
-use ::transport::{TTcpTransport, TTransport, TTransportFactory};
-
-use super::TProcessor;
-
-/// Single-threaded blocking Thrift socket server.
-///
-/// A `TSimpleServer` listens on a given address and services accepted
-/// connections *synchronously* and *sequentially* - i.e. in a blocking manner,
-/// one at a time - on the main thread. Each accepted connection has an input
-/// half and an output half, each of which uses a `TTransport` and `TProtocol`
-/// to translate messages to and from byes. Any combination of `TProtocol` and
-/// `TTransport` may be used.
-///
-/// # Examples
-///
-/// Creating and running a `TSimpleServer` using Thrift-compiler-generated
-/// service code.
-///
-/// ```no_run
-/// use thrift;
-/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
-/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
-/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
-/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory};
-/// use thrift::server::{TProcessor, TSimpleServer};
-///
-/// //
-/// // auto-generated
-/// //
-///
-/// // processor for `SimpleService`
-/// struct SimpleServiceSyncProcessor;
-/// impl SimpleServiceSyncProcessor {
-///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
-///         unimplemented!();
-///     }
-/// }
-///
-/// // `TProcessor` implementation for `SimpleService`
-/// impl TProcessor for SimpleServiceSyncProcessor {
-///     fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
-///         unimplemented!();
-///     }
-/// }
-///
-/// // service functions for SimpleService
-/// trait SimpleServiceSyncHandler {
-///     fn service_call(&mut self) -> thrift::Result<()>;
-/// }
-///
-/// //
-/// // user-code follows
-/// //
-///
-/// // define a handler that will be invoked when `service_call` is received
-/// struct SimpleServiceHandlerImpl;
-/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
-///     fn service_call(&mut self) -> thrift::Result<()> {
-///         unimplemented!();
-///     }
-/// }
-///
-/// // instantiate the processor
-/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
-///
-/// // instantiate the server
-/// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
-/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
-/// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
-/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
-///
-/// let mut server = TSimpleServer::new(
-///     i_tr_fact,
-///     i_pr_fact,
-///     o_tr_fact,
-///     o_pr_fact,
-///     processor
-/// );
-///
-/// // start listening for incoming connections
-/// match server.listen("127.0.0.1:8080") {
-///   Ok(_)  => println!("listen completed"),
-///   Err(e) => println!("listen failed with error {:?}", e),
-/// }
-/// ```
-pub struct TSimpleServer<PR: TProcessor> {
-    i_trans_factory: Box<TTransportFactory>,
-    i_proto_factory: Box<TInputProtocolFactory>,
-    o_trans_factory: Box<TTransportFactory>,
-    o_proto_factory: Box<TOutputProtocolFactory>,
-    processor: PR,
-}
-
-impl<PR: TProcessor> TSimpleServer<PR> {
-    /// Create a `TSimpleServer`.
-    ///
-    /// Each accepted connection has an input and output half, each of which
-    /// requires a `TTransport` and `TProtocol`. `TSimpleServer` uses
-    /// `input_transport_factory` and `input_protocol_factory` to create
-    /// implementations for the input, and `output_transport_factory` and
-    /// `output_protocol_factory` to create implementations for the output.
-    pub fn new(input_transport_factory: Box<TTransportFactory>,
-               input_protocol_factory: Box<TInputProtocolFactory>,
-               output_transport_factory: Box<TTransportFactory>,
-               output_protocol_factory: Box<TOutputProtocolFactory>,
-               processor: PR)
-               -> TSimpleServer<PR> {
-        TSimpleServer {
-            i_trans_factory: input_transport_factory,
-            i_proto_factory: input_protocol_factory,
-            o_trans_factory: output_transport_factory,
-            o_proto_factory: output_protocol_factory,
-            processor: processor,
-        }
-    }
-
-    /// Listen for incoming connections on `listen_address`.
-    ///
-    /// `listen_address` should be in the form `host:port`,
-    /// for example: `127.0.0.1:8080`.
-    ///
-    /// Return `()` if successful.
-    ///
-    /// Return `Err` when the server cannot bind to `listen_address` or there
-    /// is an unrecoverable error.
-    pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
-        let listener = TcpListener::bind(listen_address)?;
-        for stream in listener.incoming() {
-            match stream {
-                Ok(s) => self.handle_incoming_connection(s),
-                Err(e) => warn!("failed to accept remote connection with error {:?}", e),
-            }
-        }
-
-        Err(::Error::Application(ApplicationError {
-            kind: ApplicationErrorKind::Unknown,
-            message: "aborted listen loop".into(),
-        }))
-    }
-
-    fn handle_incoming_connection(&mut self, stream: TcpStream) {
-        // create the shared tcp stream
-        let stream = TTcpTransport::with_stream(stream);
-        let stream: Box<TTransport> = Box::new(stream);
-        let stream = Rc::new(RefCell::new(stream));
-
-        // input protocol and transport
-        let i_tran = self.i_trans_factory.create(stream.clone());
-        let i_tran = Rc::new(RefCell::new(i_tran));
-        let mut i_prot = self.i_proto_factory.create(i_tran);
-
-        // output protocol and transport
-        let o_tran = self.o_trans_factory.create(stream.clone());
-        let o_tran = Rc::new(RefCell::new(o_tran));
-        let mut o_prot = self.o_proto_factory.create(o_tran);
-
-        // process loop
-        loop {
-            let r = self.processor.process(&mut *i_prot, &mut *o_prot);
-            if let Err(e) = r {
-                warn!("processor failed with error: {:?}", e);
-                break; // FIXME: close here
-            }
-        }
-    }
-}
diff --git a/lib/rs/src/server/threaded.rs b/lib/rs/src/server/threaded.rs
new file mode 100644
index 0000000..a486c5a
--- /dev/null
+++ b/lib/rs/src/server/threaded.rs
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::net::{TcpListener, TcpStream};
+use std::sync::Arc;
+use threadpool::ThreadPool;
+
+use {ApplicationError, ApplicationErrorKind};
+use protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
+use transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
+
+use super::TProcessor;
+
+/// Fixed-size thread-pool blocking Thrift server.
+///
+/// A `TServer` listens on a given address and submits accepted connections
+/// to an **unbounded** queue. Connections from this queue are serviced by
+/// the first available worker thread from a **fixed-size** thread pool. Each
+/// accepted connection is handled by that worker thread, and communication
+/// over this thread occurs sequentially and synchronously (i.e. calls block).
+/// Accepted connections have an input half and an output half, each of which
+/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
+/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
+/// and `TTransport` may be used.
+///
+/// # Examples
+///
+/// Creating and running a `TServer` using Thrift-compiler-generated
+/// service code.
+///
+/// ```no_run
+/// use thrift;
+/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
+/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
+/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
+/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory, TReadTransportFactory, TWriteTransportFactory};
+/// use thrift::server::{TProcessor, TServer};
+///
+/// //
+/// // auto-generated
+/// //
+///
+/// // processor for `SimpleService`
+/// struct SimpleServiceSyncProcessor;
+/// impl SimpleServiceSyncProcessor {
+///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // `TProcessor` implementation for `SimpleService`
+/// impl TProcessor for SimpleServiceSyncProcessor {
+///     fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // service functions for SimpleService
+/// trait SimpleServiceSyncHandler {
+///     fn service_call(&self) -> thrift::Result<()>;
+/// }
+///
+/// //
+/// // user-code follows
+/// //
+///
+/// // define a handler that will be invoked when `service_call` is received
+/// struct SimpleServiceHandlerImpl;
+/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
+///     fn service_call(&self) -> thrift::Result<()> {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // instantiate the processor
+/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
+///
+/// // instantiate the server
+/// let i_tr_fact: Box<TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
+/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
+/// let o_tr_fact: Box<TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
+/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
+///
+/// let mut server = TServer::new(
+///     i_tr_fact,
+///     i_pr_fact,
+///     o_tr_fact,
+///     o_pr_fact,
+///     processor,
+///     10
+/// );
+///
+/// // start listening for incoming connections
+/// match server.listen("127.0.0.1:8080") {
+///   Ok(_)  => println!("listen completed"),
+///   Err(e) => println!("listen failed with error {:?}", e),
+/// }
+/// ```
+#[derive(Debug)]
+pub struct TServer<PRC, RTF, IPF, WTF, OPF>
+where
+    PRC: TProcessor + Send + Sync + 'static,
+    RTF: TReadTransportFactory + 'static,
+    IPF: TInputProtocolFactory + 'static,
+    WTF: TWriteTransportFactory + 'static,
+    OPF: TOutputProtocolFactory + 'static,
+{
+    r_trans_factory: RTF,
+    i_proto_factory: IPF,
+    w_trans_factory: WTF,
+    o_proto_factory: OPF,
+    processor: Arc<PRC>,
+    worker_pool: ThreadPool,
+}
+
+impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
+    where PRC: TProcessor + Send + Sync + 'static,
+          RTF: TReadTransportFactory + 'static,
+          IPF: TInputProtocolFactory + 'static,
+          WTF: TWriteTransportFactory + 'static,
+          OPF: TOutputProtocolFactory + 'static {
+    /// Create a `TServer`.
+    ///
+    /// Each accepted connection has an input and output half, each of which
+    /// requires a `TTransport` and `TProtocol`. `TServer` uses
+    /// `read_transport_factory` and `input_protocol_factory` to create
+    /// implementations for the input, and `write_transport_factory` and
+    /// `output_protocol_factory` to create implementations for the output.
+    pub fn new(
+        read_transport_factory: RTF,
+        input_protocol_factory: IPF,
+        write_transport_factory: WTF,
+        output_protocol_factory: OPF,
+        processor: PRC,
+        num_workers: usize,
+    ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
+        TServer {
+            r_trans_factory: read_transport_factory,
+            i_proto_factory: input_protocol_factory,
+            w_trans_factory: write_transport_factory,
+            o_proto_factory: output_protocol_factory,
+            processor: Arc::new(processor),
+            worker_pool: ThreadPool::new_with_name(
+                "Thrift service processor".to_owned(),
+                num_workers,
+            ),
+        }
+    }
+
+    /// Listen for incoming connections on `listen_address`.
+    ///
+    /// `listen_address` should be in the form `host:port`,
+    /// for example: `127.0.0.1:8080`.
+    ///
+    /// Return `()` if successful.
+    ///
+    /// Return `Err` when the server cannot bind to `listen_address` or there
+    /// is an unrecoverable error.
+    pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
+        let listener = TcpListener::bind(listen_address)?;
+        for stream in listener.incoming() {
+            match stream {
+                Ok(s) => {
+                    let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
+                    let processor = self.processor.clone();
+                    self.worker_pool
+                        .execute(move || handle_incoming_connection(processor, i_prot, o_prot),);
+                }
+                Err(e) => {
+                    warn!("failed to accept remote connection with error {:?}", e);
+                }
+            }
+        }
+
+        Err(
+            ::Error::Application(
+                ApplicationError {
+                    kind: ApplicationErrorKind::Unknown,
+                    message: "aborted listen loop".into(),
+                },
+            ),
+        )
+    }
+
+
+    fn new_protocols_for_connection(
+        &mut self,
+        stream: TcpStream,
+    ) -> ::Result<(Box<TInputProtocol + Send>, Box<TOutputProtocol + Send>)> {
+        // create the shared tcp stream
+        let channel = TTcpChannel::with_stream(stream);
+
+        // split it into two - one to be owned by the
+        // input tran/proto and the other by the output
+        let (r_chan, w_chan) = channel.split()?;
+
+        // input protocol and transport
+        let r_tran = self.r_trans_factory.create(Box::new(r_chan));
+        let i_prot = self.i_proto_factory.create(r_tran);
+
+        // output protocol and transport
+        let w_tran = self.w_trans_factory.create(Box::new(w_chan));
+        let o_prot = self.o_proto_factory.create(w_tran);
+
+        Ok((i_prot, o_prot))
+    }
+}
+
+fn handle_incoming_connection<PRC>(
+    processor: Arc<PRC>,
+    i_prot: Box<TInputProtocol>,
+    o_prot: Box<TOutputProtocol>,
+) where
+    PRC: TProcessor,
+{
+    let mut i_prot = i_prot;
+    let mut o_prot = o_prot;
+    loop {
+        let r = processor.process(&mut *i_prot, &mut *o_prot);
+        if let Err(e) = r {
+            warn!("processor completed with error: {:?}", e);
+            break;
+        }
+    }
+}