THRIFT-2945 Add Rust support
Client: Rust
Patch: Allen George <allen.george@gmail.com>
This closes #1147
diff --git a/lib/rs/src/server/simple.rs b/lib/rs/src/server/simple.rs
new file mode 100644
index 0000000..89ed977
--- /dev/null
+++ b/lib/rs/src/server/simple.rs
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::net::{TcpListener, TcpStream};
+use std::rc::Rc;
+
+use ::{ApplicationError, ApplicationErrorKind};
+use ::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
+use ::transport::{TTcpTransport, TTransport, TTransportFactory};
+
+use super::TProcessor;
+
+/// Single-threaded blocking Thrift socket server.
+///
+/// A `TSimpleServer` listens on a given address and services accepted
+/// connections *synchronously* and *sequentially* - i.e. in a blocking manner,
+/// one at a time - on the main thread. Each accepted connection has an input
+/// half and an output half, each of which uses a `TTransport` and `TProtocol`
+/// to translate messages to and from byes. Any combination of `TProtocol` and
+/// `TTransport` may be used.
+///
+/// # Examples
+///
+/// Creating and running a `TSimpleServer` using Thrift-compiler-generated
+/// service code.
+///
+/// ```no_run
+/// use thrift;
+/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
+/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
+/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
+/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory};
+/// use thrift::server::{TProcessor, TSimpleServer};
+///
+/// //
+/// // auto-generated
+/// //
+///
+/// // processor for `SimpleService`
+/// struct SimpleServiceSyncProcessor;
+/// impl SimpleServiceSyncProcessor {
+/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // `TProcessor` implementation for `SimpleService`
+/// impl TProcessor for SimpleServiceSyncProcessor {
+/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // service functions for SimpleService
+/// trait SimpleServiceSyncHandler {
+/// fn service_call(&mut self) -> thrift::Result<()>;
+/// }
+///
+/// //
+/// // user-code follows
+/// //
+///
+/// // define a handler that will be invoked when `service_call` is received
+/// struct SimpleServiceHandlerImpl;
+/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
+/// fn service_call(&mut self) -> thrift::Result<()> {
+/// unimplemented!();
+/// }
+/// }
+///
+/// // instantiate the processor
+/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
+///
+/// // instantiate the server
+/// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
+/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
+/// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
+/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
+///
+/// let mut server = TSimpleServer::new(
+/// i_tr_fact,
+/// i_pr_fact,
+/// o_tr_fact,
+/// o_pr_fact,
+/// processor
+/// );
+///
+/// // start listening for incoming connections
+/// match server.listen("127.0.0.1:8080") {
+/// Ok(_) => println!("listen completed"),
+/// Err(e) => println!("listen failed with error {:?}", e),
+/// }
+/// ```
+pub struct TSimpleServer<PR: TProcessor> {
+ i_trans_factory: Box<TTransportFactory>,
+ i_proto_factory: Box<TInputProtocolFactory>,
+ o_trans_factory: Box<TTransportFactory>,
+ o_proto_factory: Box<TOutputProtocolFactory>,
+ processor: PR,
+}
+
+impl<PR: TProcessor> TSimpleServer<PR> {
+ /// Create a `TSimpleServer`.
+ ///
+ /// Each accepted connection has an input and output half, each of which
+ /// requires a `TTransport` and `TProtocol`. `TSimpleServer` uses
+ /// `input_transport_factory` and `input_protocol_factory` to create
+ /// implementations for the input, and `output_transport_factory` and
+ /// `output_protocol_factory` to create implementations for the output.
+ pub fn new(input_transport_factory: Box<TTransportFactory>,
+ input_protocol_factory: Box<TInputProtocolFactory>,
+ output_transport_factory: Box<TTransportFactory>,
+ output_protocol_factory: Box<TOutputProtocolFactory>,
+ processor: PR)
+ -> TSimpleServer<PR> {
+ TSimpleServer {
+ i_trans_factory: input_transport_factory,
+ i_proto_factory: input_protocol_factory,
+ o_trans_factory: output_transport_factory,
+ o_proto_factory: output_protocol_factory,
+ processor: processor,
+ }
+ }
+
+ /// Listen for incoming connections on `listen_address`.
+ ///
+ /// `listen_address` should be in the form `host:port`,
+ /// for example: `127.0.0.1:8080`.
+ ///
+ /// Return `()` if successful.
+ ///
+ /// Return `Err` when the server cannot bind to `listen_address` or there
+ /// is an unrecoverable error.
+ pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
+ let listener = TcpListener::bind(listen_address)?;
+ for stream in listener.incoming() {
+ match stream {
+ Ok(s) => self.handle_incoming_connection(s),
+ Err(e) => warn!("failed to accept remote connection with error {:?}", e),
+ }
+ }
+
+ Err(::Error::Application(ApplicationError {
+ kind: ApplicationErrorKind::Unknown,
+ message: "aborted listen loop".into(),
+ }))
+ }
+
+ fn handle_incoming_connection(&mut self, stream: TcpStream) {
+ // create the shared tcp stream
+ let stream = TTcpTransport::with_stream(stream);
+ let stream: Box<TTransport> = Box::new(stream);
+ let stream = Rc::new(RefCell::new(stream));
+
+ // input protocol and transport
+ let i_tran = self.i_trans_factory.create(stream.clone());
+ let i_tran = Rc::new(RefCell::new(i_tran));
+ let mut i_prot = self.i_proto_factory.create(i_tran);
+
+ // output protocol and transport
+ let o_tran = self.o_trans_factory.create(stream.clone());
+ let o_tran = Rc::new(RefCell::new(o_tran));
+ let mut o_prot = self.o_proto_factory.create(o_tran);
+
+ // process loop
+ loop {
+ let r = self.processor.process(&mut *i_prot, &mut *o_prot);
+ if let Err(e) = r {
+ warn!("processor failed with error: {:?}", e);
+ break; // FIXME: close here
+ }
+ }
+ }
+}