THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
diff --git a/lib/rs/src/protocol/mod.rs b/lib/rs/src/protocol/mod.rs
index b230d63..4f13914 100644
--- a/lib/rs/src/protocol/mod.rs
+++ b/lib/rs/src/protocol/mod.rs
@@ -19,59 +19,77 @@
//!
//! # Examples
//!
-//! Create and use a `TOutputProtocol`.
-//!
-//! ```no_run
-//! use std::cell::RefCell;
-//! use std::rc::Rc;
-//! use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, TOutputProtocol, TType};
-//! use thrift::transport::{TTcpTransport, TTransport};
-//!
-//! // create the I/O channel
-//! let mut transport = TTcpTransport::new();
-//! transport.open("127.0.0.1:9090").unwrap();
-//! let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
-//!
-//! // create the protocol to encode types into bytes
-//! let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true);
-//!
-//! // write types
-//! o_prot.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap();
-//! o_prot.write_string("foo").unwrap();
-//! o_prot.write_field_end().unwrap();
-//! ```
-//!
//! Create and use a `TInputProtocol`.
//!
//! ```no_run
-//! use std::cell::RefCell;
-//! use std::rc::Rc;
//! use thrift::protocol::{TBinaryInputProtocol, TInputProtocol};
-//! use thrift::transport::{TTcpTransport, TTransport};
+//! use thrift::transport::TTcpChannel;
//!
//! // create the I/O channel
-//! let mut transport = TTcpTransport::new();
-//! transport.open("127.0.0.1:9090").unwrap();
-//! let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+//! let mut channel = TTcpChannel::new();
+//! channel.open("127.0.0.1:9090").unwrap();
//!
//! // create the protocol to decode bytes into types
-//! let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true);
+//! let mut protocol = TBinaryInputProtocol::new(channel, true);
//!
//! // read types from the wire
-//! let field_identifier = i_prot.read_field_begin().unwrap();
-//! let field_contents = i_prot.read_string().unwrap();
-//! let field_end = i_prot.read_field_end().unwrap();
+//! let field_identifier = protocol.read_field_begin().unwrap();
+//! let field_contents = protocol.read_string().unwrap();
+//! let field_end = protocol.read_field_end().unwrap();
+//! ```
+//!
+//! Create and use a `TOutputProtocol`.
+//!
+//! ```no_run
+//! use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, TOutputProtocol, TType};
+//! use thrift::transport::TTcpChannel;
+//!
+//! // create the I/O channel
+//! let mut channel = TTcpChannel::new();
+//! channel.open("127.0.0.1:9090").unwrap();
+//!
+//! // create the protocol to encode types into bytes
+//! let mut protocol = TBinaryOutputProtocol::new(channel, true);
+//!
+//! // write types
+//! protocol.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap();
+//! protocol.write_string("foo").unwrap();
+//! protocol.write_field_end().unwrap();
//! ```
-use std::cell::RefCell;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::convert::From;
-use std::rc::Rc;
use try_from::TryFrom;
-use ::{ProtocolError, ProtocolErrorKind};
-use ::transport::TTransport;
+use {ProtocolError, ProtocolErrorKind};
+use transport::{TReadTransport, TWriteTransport};
+
+#[cfg(test)]
+macro_rules! assert_eq_written_bytes {
+ ($o_prot:ident, $expected_bytes:ident) => {
+ {
+ assert_eq!($o_prot.transport.write_bytes(), &$expected_bytes);
+ }
+ };
+}
+
+// FIXME: should take both read and write
+#[cfg(test)]
+macro_rules! copy_write_buffer_to_read_buffer {
+ ($o_prot:ident) => {
+ {
+ $o_prot.transport.copy_write_buffer_to_read_buffer();
+ }
+ };
+}
+
+#[cfg(test)]
+macro_rules! set_readable_bytes {
+ ($i_prot:ident, $bytes:expr) => {
+ $i_prot.transport.set_readable_bytes($bytes);
+ }
+}
mod binary;
mod compact;
@@ -107,20 +125,17 @@
/// Create and use a `TInputProtocol`
///
/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
/// use thrift::protocol::{TBinaryInputProtocol, TInputProtocol};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("127.0.0.1:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("127.0.0.1:9090").unwrap();
///
-/// let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true);
+/// let mut protocol = TBinaryInputProtocol::new(channel, true);
///
-/// let field_identifier = i_prot.read_field_begin().unwrap();
-/// let field_contents = i_prot.read_string().unwrap();
-/// let field_end = i_prot.read_field_end().unwrap();
+/// let field_identifier = protocol.read_field_begin().unwrap();
+/// let field_contents = protocol.read_string().unwrap();
+/// let field_end = protocol.read_field_end().unwrap();
/// ```
pub trait TInputProtocol {
/// Read the beginning of a Thrift message.
@@ -171,10 +186,14 @@
/// Skip a field with type `field_type` recursively up to `depth` levels.
fn skip_till_depth(&mut self, field_type: TType, depth: i8) -> ::Result<()> {
if depth == 0 {
- return Err(::Error::Protocol(ProtocolError {
- kind: ProtocolErrorKind::DepthLimit,
- message: format!("cannot parse past {:?}", field_type),
- }));
+ return Err(
+ ::Error::Protocol(
+ ProtocolError {
+ kind: ProtocolErrorKind::DepthLimit,
+ message: format!("cannot parse past {:?}", field_type),
+ },
+ ),
+ );
}
match field_type {
@@ -213,9 +232,11 @@
TType::Map => {
let map_ident = self.read_map_begin()?;
for _ in 0..map_ident.size {
- let key_type = map_ident.key_type
+ let key_type = map_ident
+ .key_type
.expect("non-zero sized map should contain key type");
- let val_type = map_ident.value_type
+ let val_type = map_ident
+ .value_type
.expect("non-zero sized map should contain value type");
self.skip_till_depth(key_type, depth - 1)?;
self.skip_till_depth(val_type, depth - 1)?;
@@ -223,10 +244,14 @@
self.read_map_end()
}
u => {
- Err(::Error::Protocol(ProtocolError {
- kind: ProtocolErrorKind::Unknown,
- message: format!("cannot skip field type {:?}", &u),
- }))
+ Err(
+ ::Error::Protocol(
+ ProtocolError {
+ kind: ProtocolErrorKind::Unknown,
+ message: format!("cannot skip field type {:?}", &u),
+ },
+ ),
+ )
}
}
}
@@ -259,20 +284,17 @@
/// Create and use a `TOutputProtocol`
///
/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
/// use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, TOutputProtocol, TType};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("127.0.0.1:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("127.0.0.1:9090").unwrap();
///
-/// let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true);
+/// let mut protocol = TBinaryOutputProtocol::new(channel, true);
///
-/// o_prot.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap();
-/// o_prot.write_string("foo").unwrap();
-/// o_prot.write_field_end().unwrap();
+/// protocol.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap();
+/// protocol.write_string("foo").unwrap();
+/// protocol.write_field_end().unwrap();
/// ```
pub trait TOutputProtocol {
/// Write the beginning of a Thrift message.
@@ -330,6 +352,192 @@
fn write_byte(&mut self, b: u8) -> ::Result<()>; // FIXME: REMOVE
}
+impl<P> TInputProtocol for Box<P>
+where
+ P: TInputProtocol + ?Sized,
+{
+ fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier> {
+ (**self).read_message_begin()
+ }
+
+ fn read_message_end(&mut self) -> ::Result<()> {
+ (**self).read_message_end()
+ }
+
+ fn read_struct_begin(&mut self) -> ::Result<Option<TStructIdentifier>> {
+ (**self).read_struct_begin()
+ }
+
+ fn read_struct_end(&mut self) -> ::Result<()> {
+ (**self).read_struct_end()
+ }
+
+ fn read_field_begin(&mut self) -> ::Result<TFieldIdentifier> {
+ (**self).read_field_begin()
+ }
+
+ fn read_field_end(&mut self) -> ::Result<()> {
+ (**self).read_field_end()
+ }
+
+ fn read_bool(&mut self) -> ::Result<bool> {
+ (**self).read_bool()
+ }
+
+ fn read_bytes(&mut self) -> ::Result<Vec<u8>> {
+ (**self).read_bytes()
+ }
+
+ fn read_i8(&mut self) -> ::Result<i8> {
+ (**self).read_i8()
+ }
+
+ fn read_i16(&mut self) -> ::Result<i16> {
+ (**self).read_i16()
+ }
+
+ fn read_i32(&mut self) -> ::Result<i32> {
+ (**self).read_i32()
+ }
+
+ fn read_i64(&mut self) -> ::Result<i64> {
+ (**self).read_i64()
+ }
+
+ fn read_double(&mut self) -> ::Result<f64> {
+ (**self).read_double()
+ }
+
+ fn read_string(&mut self) -> ::Result<String> {
+ (**self).read_string()
+ }
+
+ fn read_list_begin(&mut self) -> ::Result<TListIdentifier> {
+ (**self).read_list_begin()
+ }
+
+ fn read_list_end(&mut self) -> ::Result<()> {
+ (**self).read_list_end()
+ }
+
+ fn read_set_begin(&mut self) -> ::Result<TSetIdentifier> {
+ (**self).read_set_begin()
+ }
+
+ fn read_set_end(&mut self) -> ::Result<()> {
+ (**self).read_set_end()
+ }
+
+ fn read_map_begin(&mut self) -> ::Result<TMapIdentifier> {
+ (**self).read_map_begin()
+ }
+
+ fn read_map_end(&mut self) -> ::Result<()> {
+ (**self).read_map_end()
+ }
+
+ fn read_byte(&mut self) -> ::Result<u8> {
+ (**self).read_byte()
+ }
+}
+
+impl<P> TOutputProtocol for Box<P>
+where
+ P: TOutputProtocol + ?Sized,
+{
+ fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> {
+ (**self).write_message_begin(identifier)
+ }
+
+ fn write_message_end(&mut self) -> ::Result<()> {
+ (**self).write_message_end()
+ }
+
+ fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> ::Result<()> {
+ (**self).write_struct_begin(identifier)
+ }
+
+ fn write_struct_end(&mut self) -> ::Result<()> {
+ (**self).write_struct_end()
+ }
+
+ fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> ::Result<()> {
+ (**self).write_field_begin(identifier)
+ }
+
+ fn write_field_end(&mut self) -> ::Result<()> {
+ (**self).write_field_end()
+ }
+
+ fn write_field_stop(&mut self) -> ::Result<()> {
+ (**self).write_field_stop()
+ }
+
+ fn write_bool(&mut self, b: bool) -> ::Result<()> {
+ (**self).write_bool(b)
+ }
+
+ fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
+ (**self).write_bytes(b)
+ }
+
+ fn write_i8(&mut self, i: i8) -> ::Result<()> {
+ (**self).write_i8(i)
+ }
+
+ fn write_i16(&mut self, i: i16) -> ::Result<()> {
+ (**self).write_i16(i)
+ }
+
+ fn write_i32(&mut self, i: i32) -> ::Result<()> {
+ (**self).write_i32(i)
+ }
+
+ fn write_i64(&mut self, i: i64) -> ::Result<()> {
+ (**self).write_i64(i)
+ }
+
+ fn write_double(&mut self, d: f64) -> ::Result<()> {
+ (**self).write_double(d)
+ }
+
+ fn write_string(&mut self, s: &str) -> ::Result<()> {
+ (**self).write_string(s)
+ }
+
+ fn write_list_begin(&mut self, identifier: &TListIdentifier) -> ::Result<()> {
+ (**self).write_list_begin(identifier)
+ }
+
+ fn write_list_end(&mut self) -> ::Result<()> {
+ (**self).write_list_end()
+ }
+
+ fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()> {
+ (**self).write_set_begin(identifier)
+ }
+
+ fn write_set_end(&mut self) -> ::Result<()> {
+ (**self).write_set_end()
+ }
+
+ fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()> {
+ (**self).write_map_begin(identifier)
+ }
+
+ fn write_map_end(&mut self) -> ::Result<()> {
+ (**self).write_map_end()
+ }
+
+ fn flush(&mut self) -> ::Result<()> {
+ (**self).flush()
+ }
+
+ fn write_byte(&mut self, b: u8) -> ::Result<()> {
+ (**self).write_byte(b)
+ }
+}
+
/// Helper type used by servers to create `TInputProtocol` instances for
/// accepted client connections.
///
@@ -338,21 +546,27 @@
/// Create a `TInputProtocolFactory` and use it to create a `TInputProtocol`.
///
/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
/// use thrift::protocol::{TBinaryInputProtocolFactory, TInputProtocolFactory};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("127.0.0.1:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("127.0.0.1:9090").unwrap();
///
-/// let mut i_proto_factory = TBinaryInputProtocolFactory::new();
-/// let i_prot = i_proto_factory.create(transport);
+/// let factory = TBinaryInputProtocolFactory::new();
+/// let protocol = factory.create(Box::new(channel));
/// ```
pub trait TInputProtocolFactory {
- /// Create a `TInputProtocol` that reads bytes from `transport`.
- fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> Box<TInputProtocol>;
+ // Create a `TInputProtocol` that reads bytes from `transport`.
+ fn create(&self, transport: Box<TReadTransport + Send>) -> Box<TInputProtocol + Send>;
+}
+
+impl<T> TInputProtocolFactory for Box<T>
+where
+ T: TInputProtocolFactory + ?Sized,
+{
+ fn create(&self, transport: Box<TReadTransport + Send>) -> Box<TInputProtocol + Send> {
+ (**self).create(transport)
+ }
}
/// Helper type used by servers to create `TOutputProtocol` instances for
@@ -363,21 +577,27 @@
/// Create a `TOutputProtocolFactory` and use it to create a `TOutputProtocol`.
///
/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
/// use thrift::protocol::{TBinaryOutputProtocolFactory, TOutputProtocolFactory};
-/// use thrift::transport::{TTcpTransport, TTransport};
+/// use thrift::transport::TTcpChannel;
///
-/// let mut transport = TTcpTransport::new();
-/// transport.open("127.0.0.1:9090").unwrap();
-/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>));
+/// let mut channel = TTcpChannel::new();
+/// channel.open("127.0.0.1:9090").unwrap();
///
-/// let mut o_proto_factory = TBinaryOutputProtocolFactory::new();
-/// let o_prot = o_proto_factory.create(transport);
+/// let factory = TBinaryOutputProtocolFactory::new();
+/// let protocol = factory.create(Box::new(channel));
/// ```
pub trait TOutputProtocolFactory {
/// Create a `TOutputProtocol` that writes bytes to `transport`.
- fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> Box<TOutputProtocol>;
+ fn create(&self, transport: Box<TWriteTransport + Send>) -> Box<TOutputProtocol + Send>;
+}
+
+impl<T> TOutputProtocolFactory for Box<T>
+where
+ T: TOutputProtocolFactory + ?Sized,
+{
+ fn create(&self, transport: Box<TWriteTransport + Send>) -> Box<TOutputProtocol + Send> {
+ (**self).create(transport)
+ }
}
/// Thrift message identifier.
@@ -394,10 +614,11 @@
impl TMessageIdentifier {
/// Create a `TMessageIdentifier` for a Thrift service-call named `name`
/// with message type `message_type` and sequence number `sequence_number`.
- pub fn new<S: Into<String>>(name: S,
- message_type: TMessageType,
- sequence_number: i32)
- -> TMessageIdentifier {
+ pub fn new<S: Into<String>>(
+ name: S,
+ message_type: TMessageType,
+ sequence_number: i32,
+ ) -> TMessageIdentifier {
TMessageIdentifier {
name: name.into(),
message_type: message_type,
@@ -443,9 +664,10 @@
///
/// `id` should be `None` if `field_type` is `TType::Stop`.
pub fn new<N, S, I>(name: N, field_type: TType, id: I) -> TFieldIdentifier
- where N: Into<Option<S>>,
- S: Into<String>,
- I: Into<Option<i16>>
+ where
+ N: Into<Option<S>>,
+ S: Into<String>,
+ I: Into<Option<i16>>,
{
TFieldIdentifier {
name: name.into().map(|n| n.into()),
@@ -510,8 +732,9 @@
/// Create a `TMapIdentifier` for a map with `size` entries of type
/// `key_type -> value_type`.
pub fn new<K, V>(key_type: K, value_type: V, size: i32) -> TMapIdentifier
- where K: Into<Option<TType>>,
- V: Into<Option<TType>>
+ where
+ K: Into<Option<TType>>,
+ V: Into<Option<TType>>,
{
TMapIdentifier {
key_type: key_type.into(),
@@ -565,10 +788,14 @@
0x03 => Ok(TMessageType::Exception),
0x04 => Ok(TMessageType::OneWay),
unkn => {
- Err(::Error::Protocol(ProtocolError {
- kind: ProtocolErrorKind::InvalidData,
- message: format!("cannot convert {} to TMessageType", unkn),
- }))
+ Err(
+ ::Error::Protocol(
+ ProtocolError {
+ kind: ProtocolErrorKind::InvalidData,
+ message: format!("cannot convert {} to TMessageType", unkn),
+ },
+ ),
+ )
}
}
}
@@ -642,10 +869,14 @@
if expected == actual {
Ok(())
} else {
- Err(::Error::Application(::ApplicationError {
- kind: ::ApplicationErrorKind::BadSequenceId,
- message: format!("expected {} got {}", expected, actual),
- }))
+ Err(
+ ::Error::Application(
+ ::ApplicationError {
+ kind: ::ApplicationErrorKind::BadSequenceId,
+ message: format!("expected {} got {}", expected, actual),
+ },
+ ),
+ )
}
}
@@ -657,10 +888,14 @@
if expected == actual {
Ok(())
} else {
- Err(::Error::Application(::ApplicationError {
- kind: ::ApplicationErrorKind::WrongMethodName,
- message: format!("expected {} got {}", expected, actual),
- }))
+ Err(
+ ::Error::Application(
+ ::ApplicationError {
+ kind: ::ApplicationErrorKind::WrongMethodName,
+ message: format!("expected {} got {}", expected, actual),
+ },
+ ),
+ )
}
}
@@ -672,10 +907,14 @@
if expected == actual {
Ok(())
} else {
- Err(::Error::Application(::ApplicationError {
- kind: ::ApplicationErrorKind::InvalidMessageType,
- message: format!("expected {} got {}", expected, actual),
- }))
+ Err(
+ ::Error::Application(
+ ::ApplicationError {
+ kind: ::ApplicationErrorKind::InvalidMessageType,
+ message: format!("expected {} got {}", expected, actual),
+ },
+ ),
+ )
}
}
@@ -686,10 +925,14 @@
match *field {
Some(_) => Ok(()),
None => {
- Err(::Error::Protocol(::ProtocolError {
- kind: ::ProtocolErrorKind::Unknown,
- message: format!("missing required field {}", field_name),
- }))
+ Err(
+ ::Error::Protocol(
+ ::ProtocolError {
+ kind: ::ProtocolErrorKind::Unknown,
+ message: format!("missing required field {}", field_name),
+ },
+ ),
+ )
}
}
}
@@ -700,10 +943,67 @@
///
/// Return `TFieldIdentifier.id` if an id exists, `Err` otherwise.
pub fn field_id(field_ident: &TFieldIdentifier) -> ::Result<i16> {
- field_ident.id.ok_or_else(|| {
- ::Error::Protocol(::ProtocolError {
- kind: ::ProtocolErrorKind::Unknown,
- message: format!("missing field in in {:?}", field_ident),
- })
- })
+ field_ident
+ .id
+ .ok_or_else(
+ || {
+ ::Error::Protocol(
+ ::ProtocolError {
+ kind: ::ProtocolErrorKind::Unknown,
+ message: format!("missing field in in {:?}", field_ident),
+ },
+ )
+ },
+ )
+}
+
+#[cfg(test)]
+mod tests {
+
+ use std::io::Cursor;
+
+ use super::*;
+ use transport::{TReadTransport, TWriteTransport};
+
+ #[test]
+ fn must_create_usable_input_protocol_from_concrete_input_protocol() {
+ let r: Box<TReadTransport> = Box::new(Cursor::new([0, 1, 2]));
+ let mut t = TCompactInputProtocol::new(r);
+ takes_input_protocol(&mut t)
+ }
+
+ #[test]
+ fn must_create_usable_input_protocol_from_boxed_input() {
+ let r: Box<TReadTransport> = Box::new(Cursor::new([0, 1, 2]));
+ let mut t: Box<TInputProtocol> = Box::new(TCompactInputProtocol::new(r));
+ takes_input_protocol(&mut t)
+ }
+
+ #[test]
+ fn must_create_usable_output_protocol_from_concrete_output_protocol() {
+ let w: Box<TWriteTransport> = Box::new(vec![0u8; 10]);
+ let mut t = TCompactOutputProtocol::new(w);
+ takes_output_protocol(&mut t)
+ }
+
+ #[test]
+ fn must_create_usable_output_protocol_from_boxed_output() {
+ let w: Box<TWriteTransport> = Box::new(vec![0u8; 10]);
+ let mut t: Box<TOutputProtocol> = Box::new(TCompactOutputProtocol::new(w));
+ takes_output_protocol(&mut t)
+ }
+
+ fn takes_input_protocol<R>(t: &mut R)
+ where
+ R: TInputProtocol,
+ {
+ t.read_byte().unwrap();
+ }
+
+ fn takes_output_protocol<W>(t: &mut W)
+ where
+ W: TOutputProtocol,
+ {
+ t.flush().unwrap();
+ }
}