THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
diff --git a/lib/rs/src/transport/socket.rs b/lib/rs/src/transport/socket.rs
index 9f2b8ba..16b59ef 100644
--- a/lib/rs/src/transport/socket.rs
+++ b/lib/rs/src/transport/socket.rs
@@ -21,69 +21,74 @@
use std::net::{Shutdown, TcpStream};
use std::ops::Drop;
-use ::{TransportError, TransportErrorKind};
+use {TransportErrorKind, new_transport_error};
+use super::{ReadHalf, TIoChannel, WriteHalf};
-/// Communicate with a Thrift service over a TCP socket.
+/// Bidirectional TCP/IP channel.
///
/// # Examples
///
-/// Create a `TTcpTransport`.
+/// Create a `TTcpChannel`.
///
/// ```no_run
/// use std::io::{Read, Write};
-/// use thrift::transport::TTcpTransport;
+/// use thrift::transport::TTcpChannel;
///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
///
/// let mut buf = vec![0u8; 4];
-/// t.read(&mut buf).unwrap();
-/// t.write(&vec![0, 1, 2]).unwrap();
+/// c.read(&mut buf).unwrap();
+/// c.write(&vec![0, 1, 2]).unwrap();
/// ```
///
-/// Create a `TTcpTransport` by wrapping an existing `TcpStream`.
+/// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
///
/// ```no_run
/// use std::io::{Read, Write};
/// use std::net::TcpStream;
-/// use thrift::transport::TTcpTransport;
+/// use thrift::transport::TTcpChannel;
///
/// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
-/// let mut t = TTcpTransport::with_stream(stream);
///
-/// // no need to call t.open() since we've already connected above
+/// // no need to call c.open() since we've already connected above
+/// let mut c = TTcpChannel::with_stream(stream);
///
/// let mut buf = vec![0u8; 4];
-/// t.read(&mut buf).unwrap();
-/// t.write(&vec![0, 1, 2]).unwrap();
+/// c.read(&mut buf).unwrap();
+/// c.write(&vec![0, 1, 2]).unwrap();
/// ```
-#[derive(Default)]
-pub struct TTcpTransport {
+#[derive(Debug, Default)]
+pub struct TTcpChannel {
stream: Option<TcpStream>,
}
-impl TTcpTransport {
- /// Create an uninitialized `TTcpTransport`.
+impl TTcpChannel {
+ /// Create an uninitialized `TTcpChannel`.
///
- /// The returned instance must be opened using `TTcpTransport::open(...)`
+ /// The returned instance must be opened using `TTcpChannel::open(...)`
/// before it can be used.
- pub fn new() -> TTcpTransport {
- TTcpTransport { stream: None }
+ pub fn new() -> TTcpChannel {
+ TTcpChannel { stream: None }
}
- /// Create a `TTcpTransport` that wraps an existing `TcpStream`.
+ /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
///
/// The passed-in stream is assumed to have been opened before being wrapped
- /// by the created `TTcpTransport` instance.
- pub fn with_stream(stream: TcpStream) -> TTcpTransport {
- TTcpTransport { stream: Some(stream) }
+ /// by the created `TTcpChannel` instance.
+ pub fn with_stream(stream: TcpStream) -> TTcpChannel {
+ TTcpChannel { stream: Some(stream) }
}
/// Connect to `remote_address`, which should have the form `host:port`.
pub fn open(&mut self, remote_address: &str) -> ::Result<()> {
if self.stream.is_some() {
- Err(::Error::Transport(TransportError::new(TransportErrorKind::AlreadyOpen,
- "transport previously opened")))
+ Err(
+ new_transport_error(
+ TransportErrorKind::AlreadyOpen,
+ "tcp connection previously opened",
+ ),
+ )
} else {
match TcpStream::connect(&remote_address) {
Ok(s) => {
@@ -95,33 +100,62 @@
}
}
- /// Shutdown this transport.
+ /// Shut down this channel.
///
/// Both send and receive halves are closed, and this instance can no
/// longer be used to communicate with another endpoint.
pub fn close(&mut self) -> ::Result<()> {
- self.if_set(|s| s.shutdown(Shutdown::Both)).map_err(From::from)
+ self.if_set(|s| s.shutdown(Shutdown::Both))
+ .map_err(From::from)
}
fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
- where F: FnMut(&mut TcpStream) -> io::Result<T>
+ where
+ F: FnMut(&mut TcpStream) -> io::Result<T>,
{
if let Some(ref mut s) = self.stream {
stream_operation(s)
} else {
- Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"))
+ Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"),)
}
}
}
-impl Read for TTcpTransport {
+impl TIoChannel for TTcpChannel {
+ fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
+ where
+ Self: Sized,
+ {
+ let mut s = self;
+
+ s.stream
+ .as_mut()
+ .and_then(|s| s.try_clone().ok())
+ .map(
+ |cloned| {
+ (ReadHalf { handle: TTcpChannel { stream: s.stream.take() } },
+ WriteHalf { handle: TTcpChannel { stream: Some(cloned) } })
+ },
+ )
+ .ok_or_else(
+ || {
+ new_transport_error(
+ TransportErrorKind::Unknown,
+ "cannot clone underlying tcp stream",
+ )
+ },
+ )
+ }
+}
+
+impl Read for TTcpChannel {
fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
self.if_set(|s| s.read(b))
}
}
-impl Write for TTcpTransport {
+impl Write for TTcpChannel {
fn write(&mut self, b: &[u8]) -> io::Result<usize> {
self.if_set(|s| s.write(b))
}
@@ -131,11 +165,11 @@
}
}
-// Do I have to implement the Drop trait? TcpStream closes the socket on drop.
-impl Drop for TTcpTransport {
+// FIXME: Do I have to implement the Drop trait? TcpStream closes the socket on drop.
+impl Drop for TTcpChannel {
fn drop(&mut self) {
if let Err(e) = self.close() {
- warn!("error while closing socket transport: {:?}", e)
+ warn!("error while closing socket: {:?}", e)
}
}
}