THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
diff --git a/lib/rs/src/server/mod.rs b/lib/rs/src/server/mod.rs
index ceac18a..21c392c 100644
--- a/lib/rs/src/server/mod.rs
+++ b/lib/rs/src/server/mod.rs
@@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-//! Types required to implement a Thrift server.
+//! Types used to implement a Thrift server.
-use ::protocol::{TInputProtocol, TOutputProtocol};
+use protocol::{TInputProtocol, TOutputProtocol};
-mod simple;
mod multiplexed;
+mod threaded;
-pub use self::simple::TSimpleServer;
pub use self::multiplexed::TMultiplexedProcessor;
+pub use self::threaded::TServer;
/// Handles incoming Thrift messages and dispatches them to the user-defined
/// handler functions.
@@ -56,14 +56,14 @@
///
/// // `TProcessor` implementation for `SimpleService`
/// impl TProcessor for SimpleServiceSyncProcessor {
-/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
+/// fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
/// unimplemented!();
/// }
/// }
///
/// // service functions for SimpleService
/// trait SimpleServiceSyncHandler {
-/// fn service_call(&mut self) -> thrift::Result<()>;
+/// fn service_call(&self) -> thrift::Result<()>;
/// }
///
/// //
@@ -73,7 +73,7 @@
/// // define a handler that will be invoked when `service_call` is received
/// struct SimpleServiceHandlerImpl;
/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
-/// fn service_call(&mut self) -> thrift::Result<()> {
+/// fn service_call(&self) -> thrift::Result<()> {
/// unimplemented!();
/// }
/// }
@@ -82,7 +82,7 @@
/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
///
/// // at this point you can pass the processor to the server
-/// // let server = TSimpleServer::new(..., processor);
+/// // let server = TServer::new(..., processor);
/// ```
pub trait TProcessor {
/// Process a Thrift service call.
@@ -91,5 +91,5 @@
/// the response to `o`.
///
/// Returns `()` if the handler was executed; `Err` otherwise.
- fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>;
+ fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>;
}
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
index d2314a1..b1243a8 100644
--- a/lib/rs/src/server/multiplexed.rs
+++ b/lib/rs/src/server/multiplexed.rs
@@ -17,9 +17,10 @@
use std::collections::HashMap;
use std::convert::Into;
+use std::sync::{Arc, Mutex};
-use ::{new_application_error, ApplicationErrorKind};
-use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
+use {ApplicationErrorKind, new_application_error};
+use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
use super::TProcessor;
@@ -33,8 +34,9 @@
///
/// A `TMultiplexedProcessor` can only handle messages sent by a
/// `TMultiplexedOutputProtocol`.
+// FIXME: implement Debug
pub struct TMultiplexedProcessor {
- processors: HashMap<String, Box<TProcessor>>,
+ processors: Mutex<HashMap<String, Arc<Box<TProcessor>>>>,
}
impl TMultiplexedProcessor {
@@ -46,46 +48,62 @@
/// Return `false` if a mapping previously existed (the previous mapping is
/// *not* overwritten).
#[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
- pub fn register_processor<S: Into<String>>(&mut self,
- service_name: S,
- processor: Box<TProcessor>)
- -> bool {
+ pub fn register_processor<S: Into<String>>(
+ &mut self,
+ service_name: S,
+ processor: Box<TProcessor>,
+ ) -> bool {
+ let mut processors = self.processors.lock().unwrap();
+
let name = service_name.into();
- if self.processors.contains_key(&name) {
+ if processors.contains_key(&name) {
false
} else {
- self.processors.insert(name, processor);
+ processors.insert(name, Arc::new(processor));
true
}
}
}
impl TProcessor for TMultiplexedProcessor {
- fn process(&mut self,
- i_prot: &mut TInputProtocol,
- o_prot: &mut TOutputProtocol)
- -> ::Result<()> {
+ fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> {
let msg_ident = i_prot.read_message_begin()?;
- let sep_index = msg_ident.name
+ let sep_index = msg_ident
+ .name
.find(':')
- .ok_or_else(|| {
- new_application_error(ApplicationErrorKind::Unknown,
- "no service separator found in incoming message")
- })?;
+ .ok_or_else(
+ || {
+ new_application_error(
+ ApplicationErrorKind::Unknown,
+ "no service separator found in incoming message",
+ )
+ },
+ )?;
let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
- match self.processors.get_mut(svc_name) {
- Some(ref mut processor) => {
- let new_msg_ident = TMessageIdentifier::new(svc_call,
- msg_ident.message_type,
- msg_ident.sequence_number);
+ let processor: Option<Arc<Box<TProcessor>>> = {
+ let processors = self.processors.lock().unwrap();
+ processors.get(svc_name).cloned()
+ };
+
+ match processor {
+ Some(arc) => {
+ let new_msg_ident = TMessageIdentifier::new(
+ svc_call,
+ msg_ident.message_type,
+ msg_ident.sequence_number,
+ );
let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
- processor.process(&mut proxy_i_prot, o_prot)
+ (*arc).process(&mut proxy_i_prot, o_prot)
}
None => {
- Err(new_application_error(ApplicationErrorKind::Unknown,
- format!("no processor found for service {}", svc_name)))
+ Err(
+ new_application_error(
+ ApplicationErrorKind::Unknown,
+ format!("no processor found for service {}", svc_name),
+ ),
+ )
}
}
}
diff --git a/lib/rs/src/server/simple.rs b/lib/rs/src/server/simple.rs
deleted file mode 100644
index 89ed977..0000000
--- a/lib/rs/src/server/simple.rs
+++ /dev/null
@@ -1,189 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::cell::RefCell;
-use std::net::{TcpListener, TcpStream};
-use std::rc::Rc;
-
-use ::{ApplicationError, ApplicationErrorKind};
-use ::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
-use ::transport::{TTcpTransport, TTransport, TTransportFactory};
-
-use super::TProcessor;
-
-/// Single-threaded blocking Thrift socket server.
-///
-/// A `TSimpleServer` listens on a given address and services accepted
-/// connections *synchronously* and *sequentially* - i.e. in a blocking manner,
-/// one at a time - on the main thread. Each accepted connection has an input
-/// half and an output half, each of which uses a `TTransport` and `TProtocol`
-/// to translate messages to and from byes. Any combination of `TProtocol` and
-/// `TTransport` may be used.
-///
-/// # Examples
-///
-/// Creating and running a `TSimpleServer` using Thrift-compiler-generated
-/// service code.
-///
-/// ```no_run
-/// use thrift;
-/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
-/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
-/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
-/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory};
-/// use thrift::server::{TProcessor, TSimpleServer};
-///
-/// //
-/// // auto-generated
-/// //
-///
-/// // processor for `SimpleService`
-/// struct SimpleServiceSyncProcessor;
-/// impl SimpleServiceSyncProcessor {
-/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
-/// unimplemented!();
-/// }
-/// }
-///
-/// // `TProcessor` implementation for `SimpleService`
-/// impl TProcessor for SimpleServiceSyncProcessor {
-/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
-/// unimplemented!();
-/// }
-/// }
-///
-/// // service functions for SimpleService
-/// trait SimpleServiceSyncHandler {
-/// fn service_call(&mut self) -> thrift::Result<()>;
-/// }
-///
-/// //
-/// // user-code follows
-/// //
-///
-/// // define a handler that will be invoked when `service_call` is received
-/// struct SimpleServiceHandlerImpl;
-/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
-/// fn service_call(&mut self) -> thrift::Result<()> {
-/// unimplemented!();
-/// }
-/// }
-///
-/// // instantiate the processor
-/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
-///
-/// // instantiate the server
-/// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
-/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
-/// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
-/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
-///
-/// let mut server = TSimpleServer::new(
-/// i_tr_fact,
-/// i_pr_fact,
-/// o_tr_fact,
-/// o_pr_fact,
-/// processor
-/// );
-///
-/// // start listening for incoming connections
-/// match server.listen("127.0.0.1:8080") {
-/// Ok(_) => println!("listen completed"),
-/// Err(e) => println!("listen failed with error {:?}", e),
-/// }
-/// ```
-pub struct TSimpleServer<PR: TProcessor> {
- i_trans_factory: Box<TTransportFactory>,
- i_proto_factory: Box<TInputProtocolFactory>,
- o_trans_factory: Box<TTransportFactory>,
- o_proto_factory: Box<TOutputProtocolFactory>,
- processor: PR,
-}
-
-impl<PR: TProcessor> TSimpleServer<PR> {
- /// Create a `TSimpleServer`.
- ///
- /// Each accepted connection has an input and output half, each of which
- /// requires a `TTransport` and `TProtocol`. `TSimpleServer` uses
- /// `input_transport_factory` and `input_protocol_factory` to create
- /// implementations for the input, and `output_transport_factory` and
- /// `output_protocol_factory` to create implementations for the output.
- pub fn new(input_transport_factory: Box<TTransportFactory>,
- input_protocol_factory: Box<TInputProtocolFactory>,
- output_transport_factory: Box<TTransportFactory>,
- output_protocol_factory: Box<TOutputProtocolFactory>,
- processor: PR)
- -> TSimpleServer<PR> {
- TSimpleServer {
- i_trans_factory: input_transport_factory,
- i_proto_factory: input_protocol_factory,
- o_trans_factory: output_transport_factory,
- o_proto_factory: output_protocol_factory,
- processor: processor,
- }
- }
-
- /// Listen for incoming connections on `listen_address`.
- ///
- /// `listen_address` should be in the form `host:port`,
- /// for example: `127.0.0.1:8080`.
- ///
- /// Return `()` if successful.
- ///
- /// Return `Err` when the server cannot bind to `listen_address` or there
- /// is an unrecoverable error.
- pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
- let listener = TcpListener::bind(listen_address)?;
- for stream in listener.incoming() {
- match stream {
- Ok(s) => self.handle_incoming_connection(s),
- Err(e) => warn!("failed to accept remote connection with error {:?}", e),
- }
- }
-
- Err(::Error::Application(ApplicationError {
- kind: ApplicationErrorKind::Unknown,
- message: "aborted listen loop".into(),
- }))
- }
-
- fn handle_incoming_connection(&mut self, stream: TcpStream) {
- // create the shared tcp stream
- let stream = TTcpTransport::with_stream(stream);
- let stream: Box<TTransport> = Box::new(stream);
- let stream = Rc::new(RefCell::new(stream));
-
- // input protocol and transport
- let i_tran = self.i_trans_factory.create(stream.clone());
- let i_tran = Rc::new(RefCell::new(i_tran));
- let mut i_prot = self.i_proto_factory.create(i_tran);
-
- // output protocol and transport
- let o_tran = self.o_trans_factory.create(stream.clone());
- let o_tran = Rc::new(RefCell::new(o_tran));
- let mut o_prot = self.o_proto_factory.create(o_tran);
-
- // process loop
- loop {
- let r = self.processor.process(&mut *i_prot, &mut *o_prot);
- if let Err(e) = r {
- warn!("processor failed with error: {:?}", e);
- break; // FIXME: close here
- }
- }
- }
-}
diff --git a/lib/rs/src/server/threaded.rs b/lib/rs/src/server/threaded.rs
new file mode 100644
index 0000000..a486c5a
--- /dev/null
+++ b/lib/rs/src/server/threaded.rs
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::net::{TcpListener, TcpStream};
+use std::sync::Arc;
+use threadpool::ThreadPool;
+
+use {ApplicationError, ApplicationErrorKind};
+use protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
+use transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
+
+use super::TProcessor;
+
+/// Fixed-size thread-pool blocking Thrift server.
+///
+/// A `TServer` listens on a given address and submits accepted connections
+/// to an **unbounded** queue. Connections from this queue are serviced by
+/// the first available worker thread from a **fixed-size** thread pool. Each
+/// accepted connection is handled by that worker thread, and communication
+/// over this thread occurs sequentially and synchronously (i.e. calls block).
+/// Accepted connections have an input half and an output half, each of which
+/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
+/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
+/// and `TTransport` may be used.
+///
+/// # Examples
+///
+/// Creating and running a `TServer` using Thrift-compiler-generated
+/// service code.
+///
+/// ```no_run
+/// use thrift;
+/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
+/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
+/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
+/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory, TReadTransportFactory, TWriteTransportFactory};
+/// use thrift::server::{TProcessor, TServer};
+///
+/// //
+/// // auto-generated
+/// //
+///
+/// // processor for `SimpleService`
+/// struct SimpleServiceSyncProcessor;
+/// impl SimpleServiceSyncProcessor {
+/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // `TProcessor` implementation for `SimpleService`
+/// impl TProcessor for SimpleServiceSyncProcessor {
+/// fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // service functions for SimpleService
+/// trait SimpleServiceSyncHandler {
+/// fn service_call(&self) -> thrift::Result<()>;
+/// }
+///
+/// //
+/// // user-code follows
+/// //
+///
+/// // define a handler that will be invoked when `service_call` is received
+/// struct SimpleServiceHandlerImpl;
+/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
+/// fn service_call(&self) -> thrift::Result<()> {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // instantiate the processor
+/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
+///
+/// // instantiate the server
+/// let i_tr_fact: Box<TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
+/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
+/// let o_tr_fact: Box<TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
+/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
+///
+/// let mut server = TServer::new(
+/// i_tr_fact,
+/// i_pr_fact,
+/// o_tr_fact,
+/// o_pr_fact,
+/// processor,
+/// 10
+/// );
+///
+/// // start listening for incoming connections
+/// match server.listen("127.0.0.1:8080") {
+/// Ok(_) => println!("listen completed"),
+/// Err(e) => println!("listen failed with error {:?}", e),
+/// }
+/// ```
+#[derive(Debug)]
+pub struct TServer<PRC, RTF, IPF, WTF, OPF>
+where
+ PRC: TProcessor + Send + Sync + 'static,
+ RTF: TReadTransportFactory + 'static,
+ IPF: TInputProtocolFactory + 'static,
+ WTF: TWriteTransportFactory + 'static,
+ OPF: TOutputProtocolFactory + 'static,
+{
+ r_trans_factory: RTF,
+ i_proto_factory: IPF,
+ w_trans_factory: WTF,
+ o_proto_factory: OPF,
+ processor: Arc<PRC>,
+ worker_pool: ThreadPool,
+}
+
+impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
+ where PRC: TProcessor + Send + Sync + 'static,
+ RTF: TReadTransportFactory + 'static,
+ IPF: TInputProtocolFactory + 'static,
+ WTF: TWriteTransportFactory + 'static,
+ OPF: TOutputProtocolFactory + 'static {
+ /// Create a `TServer`.
+ ///
+ /// Each accepted connection has an input and output half, each of which
+ /// requires a `TTransport` and `TProtocol`. `TServer` uses
+ /// `read_transport_factory` and `input_protocol_factory` to create
+ /// implementations for the input, and `write_transport_factory` and
+ /// `output_protocol_factory` to create implementations for the output.
+ pub fn new(
+ read_transport_factory: RTF,
+ input_protocol_factory: IPF,
+ write_transport_factory: WTF,
+ output_protocol_factory: OPF,
+ processor: PRC,
+ num_workers: usize,
+ ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
+ TServer {
+ r_trans_factory: read_transport_factory,
+ i_proto_factory: input_protocol_factory,
+ w_trans_factory: write_transport_factory,
+ o_proto_factory: output_protocol_factory,
+ processor: Arc::new(processor),
+ worker_pool: ThreadPool::new_with_name(
+ "Thrift service processor".to_owned(),
+ num_workers,
+ ),
+ }
+ }
+
+ /// Listen for incoming connections on `listen_address`.
+ ///
+ /// `listen_address` should be in the form `host:port`,
+ /// for example: `127.0.0.1:8080`.
+ ///
+ /// Return `()` if successful.
+ ///
+ /// Return `Err` when the server cannot bind to `listen_address` or there
+ /// is an unrecoverable error.
+ pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
+ let listener = TcpListener::bind(listen_address)?;
+ for stream in listener.incoming() {
+ match stream {
+ Ok(s) => {
+ let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
+ let processor = self.processor.clone();
+ self.worker_pool
+ .execute(move || handle_incoming_connection(processor, i_prot, o_prot),);
+ }
+ Err(e) => {
+ warn!("failed to accept remote connection with error {:?}", e);
+ }
+ }
+ }
+
+ Err(
+ ::Error::Application(
+ ApplicationError {
+ kind: ApplicationErrorKind::Unknown,
+ message: "aborted listen loop".into(),
+ },
+ ),
+ )
+ }
+
+
+ fn new_protocols_for_connection(
+ &mut self,
+ stream: TcpStream,
+ ) -> ::Result<(Box<TInputProtocol + Send>, Box<TOutputProtocol + Send>)> {
+ // create the shared tcp stream
+ let channel = TTcpChannel::with_stream(stream);
+
+ // split it into two - one to be owned by the
+ // input tran/proto and the other by the output
+ let (r_chan, w_chan) = channel.split()?;
+
+ // input protocol and transport
+ let r_tran = self.r_trans_factory.create(Box::new(r_chan));
+ let i_prot = self.i_proto_factory.create(r_tran);
+
+ // output protocol and transport
+ let w_tran = self.w_trans_factory.create(Box::new(w_chan));
+ let o_prot = self.o_proto_factory.create(w_tran);
+
+ Ok((i_prot, o_prot))
+ }
+}
+
+fn handle_incoming_connection<PRC>(
+ processor: Arc<PRC>,
+ i_prot: Box<TInputProtocol>,
+ o_prot: Box<TOutputProtocol>,
+) where
+ PRC: TProcessor,
+{
+ let mut i_prot = i_prot;
+ let mut o_prot = o_prot;
+ loop {
+ let r = processor.process(&mut *i_prot, &mut *o_prot);
+ if let Err(e) = r {
+ warn!("processor completed with error: {:?}", e);
+ break;
+ }
+ }
+}