THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
diff --git a/lib/rs/src/transport/framed.rs b/lib/rs/src/transport/framed.rs
index 75c12f4..d78d2f7 100644
--- a/lib/rs/src/transport/framed.rs
+++ b/lib/rs/src/transport/framed.rs
@@ -16,165 +16,242 @@
// under the License.
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
-use std::cell::RefCell;
use std::cmp;
use std::io;
use std::io::{ErrorKind, Read, Write};
-use std::rc::Rc;
-use super::{TTransport, TTransportFactory};
+use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
/// Default capacity of the read buffer in bytes.
-const WRITE_BUFFER_CAPACITY: usize = 4096;
+const READ_CAPACITY: usize = 4096;
-/// Default capacity of the write buffer in bytes..
-const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+/// Default capacity of the write buffer in bytes.
+const WRITE_CAPACITY: usize = 4096;
-/// Transport that communicates with endpoints using framed messages.
+/// Transport that reads framed messages.
///
-/// A `TFramedTransport` maintains a fixed-size internal write buffer. All
-/// writes are made to this buffer and are sent to the wrapped transport only
-/// when `TTransport::flush()` is called. On a flush a fixed-length header with a
-/// count of the buffered bytes is written, followed by the bytes themselves.
-///
-/// A `TFramedTransport` also maintains a fixed-size internal read buffer.
-/// On a call to `TTransport::read(...)` one full message - both fixed-length
-/// header and bytes - is read from the wrapped transport and buffered.
-/// Subsequent read calls are serviced from the internal buffer until it is
-/// exhausted, at which point the next full message is read from the wrapped
-/// transport.
+/// A `TFramedReadTransport` maintains a fixed-size internal read buffer.
+/// On a call to `TFramedReadTransport::read(...)` one full message - both
+/// fixed-length header and bytes - is read from the wrapped channel and
+/// buffered. Subsequent read calls are serviced from the internal buffer
+/// until it is exhausted, at which point the next full message is read
+/// from the wrapped channel.
///
/// # Examples
///
-/// Create and use a `TFramedTransport`.
+/// Create and use a `TFramedReadTransport`.
///
/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use std::io::{Read, Write};
-/// use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+/// use std::io::Read;
+/// use thrift::transport::{TFramedReadTransport, TTcpChannel};
///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
///
-/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
-/// let mut t = TFramedTransport::new(t);
+/// let mut t = TFramedReadTransport::new(c);
///
-/// // read
/// t.read(&mut vec![0u8; 1]).unwrap();
-///
-/// // write
-/// t.write(&[0x00]).unwrap();
-/// t.flush().unwrap();
/// ```
-pub struct TFramedTransport {
- rbuf: Box<[u8]>,
- rpos: usize,
- rcap: usize,
- wbuf: Box<[u8]>,
- wpos: usize,
- inner: Rc<RefCell<Box<TTransport>>>,
+#[derive(Debug)]
+pub struct TFramedReadTransport<C>
+where
+ C: Read,
+{
+ buf: Box<[u8]>,
+ pos: usize,
+ cap: usize,
+ chan: C,
}
-impl TFramedTransport {
+impl<C> TFramedReadTransport<C>
+where
+ C: Read,
+{
/// Create a `TFramedTransport` with default-sized internal read and
- /// write buffers that wraps an `inner` `TTransport`.
- pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TFramedTransport {
- TFramedTransport::with_capacity(WRITE_BUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner)
+ /// write buffers that wraps the given `TIoChannel`.
+ pub fn new(channel: C) -> TFramedReadTransport<C> {
+ TFramedReadTransport::with_capacity(READ_CAPACITY, channel)
}
/// Create a `TFramedTransport` with an internal read buffer of size
- /// `read_buffer_capacity` and an internal write buffer of size
- /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
- pub fn with_capacity(read_buffer_capacity: usize,
- write_buffer_capacity: usize,
- inner: Rc<RefCell<Box<TTransport>>>)
- -> TFramedTransport {
- TFramedTransport {
- rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
- rpos: 0,
- rcap: 0,
- wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
- wpos: 0,
- inner: inner,
+ /// `read_capacity` and an internal write buffer of size
+ /// `write_capacity` that wraps the given `TIoChannel`.
+ pub fn with_capacity(read_capacity: usize, channel: C) -> TFramedReadTransport<C> {
+ TFramedReadTransport {
+ buf: vec![0; read_capacity].into_boxed_slice(),
+ pos: 0,
+ cap: 0,
+ chan: channel,
}
}
}
-impl Read for TFramedTransport {
+impl<C> Read for TFramedReadTransport<C>
+where
+ C: Read,
+{
fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
- if self.rcap - self.rpos == 0 {
- let message_size = self.inner.borrow_mut().read_i32::<BigEndian>()? as usize;
- if message_size > self.rbuf.len() {
- return Err(io::Error::new(ErrorKind::Other,
- format!("bytes to be read ({}) exceeds buffer \
+ if self.cap - self.pos == 0 {
+ let message_size = self.chan.read_i32::<BigEndian>()? as usize;
+ if message_size > self.buf.len() {
+ return Err(
+ io::Error::new(
+ ErrorKind::Other,
+ format!(
+ "bytes to be read ({}) exceeds buffer \
capacity ({})",
- message_size,
- self.rbuf.len())));
+ message_size,
+ self.buf.len()
+ ),
+ ),
+ );
}
- self.inner.borrow_mut().read_exact(&mut self.rbuf[..message_size])?;
- self.rpos = 0;
- self.rcap = message_size as usize;
+ self.chan.read_exact(&mut self.buf[..message_size])?;
+ self.pos = 0;
+ self.cap = message_size as usize;
}
- let nread = cmp::min(b.len(), self.rcap - self.rpos);
- b[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
- self.rpos += nread;
+ let nread = cmp::min(b.len(), self.cap - self.pos);
+ b[..nread].clone_from_slice(&self.buf[self.pos..self.pos + nread]);
+ self.pos += nread;
Ok(nread)
}
}
-impl Write for TFramedTransport {
+/// Factory for creating instances of `TFramedReadTransport`.
+#[derive(Default)]
+pub struct TFramedReadTransportFactory;
+
+impl TFramedReadTransportFactory {
+ pub fn new() -> TFramedReadTransportFactory {
+ TFramedReadTransportFactory {}
+ }
+}
+
+impl TReadTransportFactory for TFramedReadTransportFactory {
+ /// Create a `TFramedReadTransport`.
+ fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+ Box::new(TFramedReadTransport::new(channel))
+ }
+}
+
+/// Transport that writes framed messages.
+///
+/// A `TFramedWriteTransport` maintains a fixed-size internal write buffer. All
+/// writes are made to this buffer and are sent to the wrapped channel only
+/// when `TFramedWriteTransport::flush()` is called. On a flush a fixed-length
+/// header with a count of the buffered bytes is written, followed by the bytes
+/// themselves.
+///
+/// # Examples
+///
+/// Create and use a `TFramedWriteTransport`.
+///
+/// ```no_run
+/// use std::io::Write;
+/// use thrift::transport::{TFramedWriteTransport, TTcpChannel};
+///
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
+///
+/// let mut t = TFramedWriteTransport::new(c);
+///
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+#[derive(Debug)]
+pub struct TFramedWriteTransport<C>
+where
+ C: Write,
+{
+ buf: Box<[u8]>,
+ pos: usize,
+ channel: C,
+}
+
+impl<C> TFramedWriteTransport<C>
+where
+ C: Write,
+{
+ /// Create a `TFramedTransport` with default-sized internal read and
+ /// write buffers that wraps the given `TIoChannel`.
+ pub fn new(channel: C) -> TFramedWriteTransport<C> {
+ TFramedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
+ }
+
+ /// Create a `TFramedTransport` with an internal read buffer of size
+ /// `read_capacity` and an internal write buffer of size
+ /// `write_capacity` that wraps the given `TIoChannel`.
+ pub fn with_capacity(write_capacity: usize, channel: C) -> TFramedWriteTransport<C> {
+ TFramedWriteTransport {
+ buf: vec![0; write_capacity].into_boxed_slice(),
+ pos: 0,
+ channel: channel,
+ }
+ }
+}
+
+impl<C> Write for TFramedWriteTransport<C>
+where
+ C: Write,
+{
fn write(&mut self, b: &[u8]) -> io::Result<usize> {
- if b.len() > (self.wbuf.len() - self.wpos) {
- return Err(io::Error::new(ErrorKind::Other,
- format!("bytes to be written ({}) exceeds buffer \
+ if b.len() > (self.buf.len() - self.pos) {
+ return Err(
+ io::Error::new(
+ ErrorKind::Other,
+ format!(
+ "bytes to be written ({}) exceeds buffer \
capacity ({})",
- b.len(),
- self.wbuf.len() - self.wpos)));
+ b.len(),
+ self.buf.len() - self.pos
+ ),
+ ),
+ );
}
let nwrite = b.len(); // always less than available write buffer capacity
- self.wbuf[self.wpos..(self.wpos + nwrite)].clone_from_slice(b);
- self.wpos += nwrite;
+ self.buf[self.pos..(self.pos + nwrite)].clone_from_slice(b);
+ self.pos += nwrite;
Ok(nwrite)
}
fn flush(&mut self) -> io::Result<()> {
- let message_size = self.wpos;
+ let message_size = self.pos;
if let 0 = message_size {
return Ok(());
} else {
- self.inner.borrow_mut().write_i32::<BigEndian>(message_size as i32)?;
+ self.channel
+ .write_i32::<BigEndian>(message_size as i32)?;
}
let mut byte_index = 0;
- while byte_index < self.wpos {
- let nwrite = self.inner.borrow_mut().write(&self.wbuf[byte_index..self.wpos])?;
- byte_index = cmp::min(byte_index + nwrite, self.wpos);
+ while byte_index < self.pos {
+ let nwrite = self.channel.write(&self.buf[byte_index..self.pos])?;
+ byte_index = cmp::min(byte_index + nwrite, self.pos);
}
- self.wpos = 0;
- self.inner.borrow_mut().flush()
+ self.pos = 0;
+ self.channel.flush()
}
}
-/// Factory for creating instances of `TFramedTransport`.
+/// Factory for creating instances of `TFramedWriteTransport`.
#[derive(Default)]
-pub struct TFramedTransportFactory;
+pub struct TFramedWriteTransportFactory;
-impl TFramedTransportFactory {
- // Create a `TFramedTransportFactory`.
- pub fn new() -> TFramedTransportFactory {
- TFramedTransportFactory {}
+impl TFramedWriteTransportFactory {
+ pub fn new() -> TFramedWriteTransportFactory {
+ TFramedWriteTransportFactory {}
}
}
-impl TTransportFactory for TFramedTransportFactory {
- fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
- Box::new(TFramedTransport::new(inner)) as Box<TTransport>
+impl TWriteTransportFactory for TFramedWriteTransportFactory {
+ /// Create a `TFramedWriteTransport`.
+ fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+ Box::new(TFramedWriteTransport::new(channel))
}
}
@@ -183,5 +260,5 @@
// use std::io::{Read, Write};
//
// use super::*;
- // use ::transport::mem::TBufferTransport;
+ // use ::transport::mem::TBufferChannel;
}