THRIFT-2945 Add Rust support
Client: Rust
Patch: Allen George <allen.george@gmail.com>
This closes #1147
diff --git a/lib/rs/src/server/mod.rs b/lib/rs/src/server/mod.rs
new file mode 100644
index 0000000..ceac18a
--- /dev/null
+++ b/lib/rs/src/server/mod.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Types required to implement a Thrift server.
+
+use ::protocol::{TInputProtocol, TOutputProtocol};
+
+mod simple;
+mod multiplexed;
+
+pub use self::simple::TSimpleServer;
+pub use self::multiplexed::TMultiplexedProcessor;
+
+/// Handles incoming Thrift messages and dispatches them to the user-defined
+/// handler functions.
+///
+/// An implementation is auto-generated for each Thrift service. When used by a
+/// server (for example, a `TSimpleServer`), it will demux incoming service
+/// calls and invoke the corresponding user-defined handler function.
+///
+/// # Examples
+///
+/// Create and start a server using the auto-generated `TProcessor` for
+/// a Thrift service `SimpleService`.
+///
+/// ```no_run
+/// use thrift;
+/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
+/// use thrift::server::TProcessor;
+///
+/// //
+/// // auto-generated
+/// //
+///
+/// // processor for `SimpleService`
+/// struct SimpleServiceSyncProcessor;
+/// impl SimpleServiceSyncProcessor {
+/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // `TProcessor` implementation for `SimpleService`
+/// impl TProcessor for SimpleServiceSyncProcessor {
+/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // service functions for SimpleService
+/// trait SimpleServiceSyncHandler {
+/// fn service_call(&mut self) -> thrift::Result<()>;
+/// }
+///
+/// //
+/// // user-code follows
+/// //
+///
+/// // define a handler that will be invoked when `service_call` is received
+/// struct SimpleServiceHandlerImpl;
+/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
+/// fn service_call(&mut self) -> thrift::Result<()> {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // instantiate the processor
+/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
+///
+/// // at this point you can pass the processor to the server
+/// // let server = TSimpleServer::new(..., processor);
+/// ```
+pub trait TProcessor {
+ /// Process a Thrift service call.
+ ///
+ /// Reads arguments from `i`, executes the user's handler code, and writes
+ /// the response to `o`.
+ ///
+ /// Returns `()` if the handler was executed; `Err` otherwise.
+ fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>;
+}
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
new file mode 100644
index 0000000..d2314a1
--- /dev/null
+++ b/lib/rs/src/server/multiplexed.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::convert::Into;
+
+use ::{new_application_error, ApplicationErrorKind};
+use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol};
+
+use super::TProcessor;
+
+/// A `TProcessor` that can demux service calls to multiple underlying
+/// Thrift services.
+///
+/// Users register service-specific `TProcessor` instances with a
+/// `TMultiplexedProcessor`, and then register that processor with a server
+/// implementation. Following that, all incoming service calls are automatically
+/// routed to the service-specific `TProcessor`.
+///
+/// A `TMultiplexedProcessor` can only handle messages sent by a
+/// `TMultiplexedOutputProtocol`.
+pub struct TMultiplexedProcessor {
+ processors: HashMap<String, Box<TProcessor>>,
+}
+
+impl TMultiplexedProcessor {
+ /// Register a service-specific `processor` for the service named
+ /// `service_name`.
+ ///
+ /// Return `true` if this is the first registration for `service_name`.
+ ///
+ /// Return `false` if a mapping previously existed (the previous mapping is
+ /// *not* overwritten).
+ #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
+ pub fn register_processor<S: Into<String>>(&mut self,
+ service_name: S,
+ processor: Box<TProcessor>)
+ -> bool {
+ let name = service_name.into();
+ if self.processors.contains_key(&name) {
+ false
+ } else {
+ self.processors.insert(name, processor);
+ true
+ }
+ }
+}
+
+impl TProcessor for TMultiplexedProcessor {
+ fn process(&mut self,
+ i_prot: &mut TInputProtocol,
+ o_prot: &mut TOutputProtocol)
+ -> ::Result<()> {
+ let msg_ident = i_prot.read_message_begin()?;
+ let sep_index = msg_ident.name
+ .find(':')
+ .ok_or_else(|| {
+ new_application_error(ApplicationErrorKind::Unknown,
+ "no service separator found in incoming message")
+ })?;
+
+ let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
+
+ match self.processors.get_mut(svc_name) {
+ Some(ref mut processor) => {
+ let new_msg_ident = TMessageIdentifier::new(svc_call,
+ msg_ident.message_type,
+ msg_ident.sequence_number);
+ let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident);
+ processor.process(&mut proxy_i_prot, o_prot)
+ }
+ None => {
+ Err(new_application_error(ApplicationErrorKind::Unknown,
+ format!("no processor found for service {}", svc_name)))
+ }
+ }
+ }
+}
diff --git a/lib/rs/src/server/simple.rs b/lib/rs/src/server/simple.rs
new file mode 100644
index 0000000..89ed977
--- /dev/null
+++ b/lib/rs/src/server/simple.rs
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::net::{TcpListener, TcpStream};
+use std::rc::Rc;
+
+use ::{ApplicationError, ApplicationErrorKind};
+use ::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
+use ::transport::{TTcpTransport, TTransport, TTransportFactory};
+
+use super::TProcessor;
+
+/// Single-threaded blocking Thrift socket server.
+///
+/// A `TSimpleServer` listens on a given address and services accepted
+/// connections *synchronously* and *sequentially* - i.e. in a blocking manner,
+/// one at a time - on the main thread. Each accepted connection has an input
+/// half and an output half, each of which uses a `TTransport` and `TProtocol`
+/// to translate messages to and from byes. Any combination of `TProtocol` and
+/// `TTransport` may be used.
+///
+/// # Examples
+///
+/// Creating and running a `TSimpleServer` using Thrift-compiler-generated
+/// service code.
+///
+/// ```no_run
+/// use thrift;
+/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
+/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
+/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
+/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory};
+/// use thrift::server::{TProcessor, TSimpleServer};
+///
+/// //
+/// // auto-generated
+/// //
+///
+/// // processor for `SimpleService`
+/// struct SimpleServiceSyncProcessor;
+/// impl SimpleServiceSyncProcessor {
+/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // `TProcessor` implementation for `SimpleService`
+/// impl TProcessor for SimpleServiceSyncProcessor {
+/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // service functions for SimpleService
+/// trait SimpleServiceSyncHandler {
+/// fn service_call(&mut self) -> thrift::Result<()>;
+/// }
+///
+/// //
+/// // user-code follows
+/// //
+///
+/// // define a handler that will be invoked when `service_call` is received
+/// struct SimpleServiceHandlerImpl;
+/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
+/// fn service_call(&mut self) -> thrift::Result<()> {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // instantiate the processor
+/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
+///
+/// // instantiate the server
+/// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
+/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
+/// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
+/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
+///
+/// let mut server = TSimpleServer::new(
+/// i_tr_fact,
+/// i_pr_fact,
+/// o_tr_fact,
+/// o_pr_fact,
+/// processor
+/// );
+///
+/// // start listening for incoming connections
+/// match server.listen("127.0.0.1:8080") {
+/// Ok(_) => println!("listen completed"),
+/// Err(e) => println!("listen failed with error {:?}", e),
+/// }
+/// ```
+pub struct TSimpleServer<PR: TProcessor> {
+ i_trans_factory: Box<TTransportFactory>,
+ i_proto_factory: Box<TInputProtocolFactory>,
+ o_trans_factory: Box<TTransportFactory>,
+ o_proto_factory: Box<TOutputProtocolFactory>,
+ processor: PR,
+}
+
+impl<PR: TProcessor> TSimpleServer<PR> {
+ /// Create a `TSimpleServer`.
+ ///
+ /// Each accepted connection has an input and output half, each of which
+ /// requires a `TTransport` and `TProtocol`. `TSimpleServer` uses
+ /// `input_transport_factory` and `input_protocol_factory` to create
+ /// implementations for the input, and `output_transport_factory` and
+ /// `output_protocol_factory` to create implementations for the output.
+ pub fn new(input_transport_factory: Box<TTransportFactory>,
+ input_protocol_factory: Box<TInputProtocolFactory>,
+ output_transport_factory: Box<TTransportFactory>,
+ output_protocol_factory: Box<TOutputProtocolFactory>,
+ processor: PR)
+ -> TSimpleServer<PR> {
+ TSimpleServer {
+ i_trans_factory: input_transport_factory,
+ i_proto_factory: input_protocol_factory,
+ o_trans_factory: output_transport_factory,
+ o_proto_factory: output_protocol_factory,
+ processor: processor,
+ }
+ }
+
+ /// Listen for incoming connections on `listen_address`.
+ ///
+ /// `listen_address` should be in the form `host:port`,
+ /// for example: `127.0.0.1:8080`.
+ ///
+ /// Return `()` if successful.
+ ///
+ /// Return `Err` when the server cannot bind to `listen_address` or there
+ /// is an unrecoverable error.
+ pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
+ let listener = TcpListener::bind(listen_address)?;
+ for stream in listener.incoming() {
+ match stream {
+ Ok(s) => self.handle_incoming_connection(s),
+ Err(e) => warn!("failed to accept remote connection with error {:?}", e),
+ }
+ }
+
+ Err(::Error::Application(ApplicationError {
+ kind: ApplicationErrorKind::Unknown,
+ message: "aborted listen loop".into(),
+ }))
+ }
+
+ fn handle_incoming_connection(&mut self, stream: TcpStream) {
+ // create the shared tcp stream
+ let stream = TTcpTransport::with_stream(stream);
+ let stream: Box<TTransport> = Box::new(stream);
+ let stream = Rc::new(RefCell::new(stream));
+
+ // input protocol and transport
+ let i_tran = self.i_trans_factory.create(stream.clone());
+ let i_tran = Rc::new(RefCell::new(i_tran));
+ let mut i_prot = self.i_proto_factory.create(i_tran);
+
+ // output protocol and transport
+ let o_tran = self.o_trans_factory.create(stream.clone());
+ let o_tran = Rc::new(RefCell::new(o_tran));
+ let mut o_prot = self.o_proto_factory.create(o_tran);
+
+ // process loop
+ loop {
+ let r = self.processor.process(&mut *i_prot, &mut *o_prot);
+ if let Err(e) = r {
+ warn!("processor failed with error: {:?}", e);
+ break; // FIXME: close here
+ }
+ }
+ }
+}