THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
diff --git a/lib/rs/src/protocol/stored.rs b/lib/rs/src/protocol/stored.rs
index 6826c00..b3f305f 100644
--- a/lib/rs/src/protocol/stored.rs
+++ b/lib/rs/src/protocol/stored.rs
@@ -17,8 +17,8 @@
use std::convert::Into;
-use ::ProtocolErrorKind;
-use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TInputProtocol,
+use ProtocolErrorKind;
+use super::{TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier, TMessageIdentifier,
TSetIdentifier, TStructIdentifier};
/// `TInputProtocol` required to use a `TMultiplexedProcessor`.
@@ -40,35 +40,34 @@
/// Create and use a `TStoredInputProtocol`.
///
/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
/// use thrift;
/// use thrift::protocol::{TInputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol};
/// use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TStoredInputProtocol};
/// use thrift::server::TProcessor;
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::{TIoChannel, TTcpChannel};
///
/// // sample processor
/// struct ActualProcessor;
/// impl TProcessor for ActualProcessor {
/// fn process(
-/// &mut self,
+/// &self,
/// _: &mut TInputProtocol,
/// _: &mut TOutputProtocol
/// ) -> thrift::Result<()> {
/// unimplemented!()
/// }
/// }
-/// let mut processor = ActualProcessor {};
+/// let processor = ActualProcessor {};
///
/// // construct the shared transport
-/// let mut transport = TTcpTransport::new();
-/// transport.open("localhost:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("localhost:9090").unwrap();
+///
+/// let (i_chan, o_chan) = channel.split().unwrap();
///
/// // construct the actual input and output protocols
-/// let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true);
-/// let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true);
+/// let mut i_prot = TBinaryInputProtocol::new(i_chan, true);
+/// let mut o_prot = TBinaryOutputProtocol::new(o_chan, true);
///
/// // message identifier received from remote and modified to remove the service name
/// let new_msg_ident = TMessageIdentifier::new("service_call", TMessageType::Call, 1);
@@ -77,6 +76,7 @@
/// let mut proxy_i_prot = TStoredInputProtocol::new(&mut i_prot, new_msg_ident);
/// let res = processor.process(&mut proxy_i_prot, &mut o_prot);
/// ```
+// FIXME: implement Debug
pub struct TStoredInputProtocol<'a> {
inner: &'a mut TInputProtocol,
message_ident: Option<TMessageIdentifier>,
@@ -88,9 +88,10 @@
/// `TInputProtocol`. `message_ident` is the modified message identifier -
/// with service name stripped - that will be passed to
/// `wrapped.read_message_begin(...)`.
- pub fn new(wrapped: &mut TInputProtocol,
- message_ident: TMessageIdentifier)
- -> TStoredInputProtocol {
+ pub fn new(
+ wrapped: &mut TInputProtocol,
+ message_ident: TMessageIdentifier,
+ ) -> TStoredInputProtocol {
TStoredInputProtocol {
inner: wrapped,
message_ident: message_ident.into(),
@@ -100,10 +101,16 @@
impl<'a> TInputProtocol for TStoredInputProtocol<'a> {
fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier> {
- self.message_ident.take().ok_or_else(|| {
- ::errors::new_protocol_error(ProtocolErrorKind::Unknown,
- "message identifier already read")
- })
+ self.message_ident
+ .take()
+ .ok_or_else(
+ || {
+ ::errors::new_protocol_error(
+ ProtocolErrorKind::Unknown,
+ "message identifier already read",
+ )
+ },
+ )
}
fn read_message_end(&mut self) -> ::Result<()> {