THRIFT-4176: Implement threaded server for Rust
Client: rs
* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer
This closes #1255
diff --git a/lib/rs/src/transport/buffered.rs b/lib/rs/src/transport/buffered.rs
index 3f240d8..b588ec1 100644
--- a/lib/rs/src/transport/buffered.rs
+++ b/lib/rs/src/transport/buffered.rs
@@ -15,104 +15,94 @@
// specific language governing permissions and limitations
// under the License.
-use std::cell::RefCell;
use std::cmp;
use std::io;
use std::io::{Read, Write};
-use std::rc::Rc;
-use super::{TTransport, TTransportFactory};
+use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
/// Default capacity of the read buffer in bytes.
-const DEFAULT_RBUFFER_CAPACITY: usize = 4096;
+const READ_CAPACITY: usize = 4096;
/// Default capacity of the write buffer in bytes..
-const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+const WRITE_CAPACITY: usize = 4096;
-/// Transport that communicates with endpoints using a byte stream.
+/// Transport that reads messages via an internal buffer.
///
-/// A `TBufferedTransport` maintains a fixed-size internal write buffer. All
-/// writes are made to this buffer and are sent to the wrapped transport only
-/// when `TTransport::flush()` is called. On a flush a fixed-length header with a
-/// count of the buffered bytes is written, followed by the bytes themselves.
-///
-/// A `TBufferedTransport` also maintains a fixed-size internal read buffer.
-/// On a call to `TTransport::read(...)` one full message - both fixed-length
-/// header and bytes - is read from the wrapped transport and buffered.
+/// A `TBufferedReadTransport` maintains a fixed-size internal read buffer.
+/// On a call to `TBufferedReadTransport::read(...)` one full message - both
+/// fixed-length header and bytes - is read from the wrapped channel and buffered.
/// Subsequent read calls are serviced from the internal buffer until it is
/// exhausted, at which point the next full message is read from the wrapped
-/// transport.
+/// channel.
///
/// # Examples
///
-/// Create and use a `TBufferedTransport`.
+/// Create and use a `TBufferedReadTransport`.
///
/// ```no_run
-/// use std::cell::RefCell;
-/// use std::rc::Rc;
-/// use std::io::{Read, Write};
-/// use thrift::transport::{TBufferedTransport, TTcpTransport, TTransport};
+/// use std::io::Read;
+/// use thrift::transport::{TBufferedReadTransport, TTcpChannel};
///
-/// let mut t = TTcpTransport::new();
-/// t.open("localhost:9090").unwrap();
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
///
-/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
-/// let mut t = TBufferedTransport::new(t);
+/// let mut t = TBufferedReadTransport::new(c);
///
-/// // read
/// t.read(&mut vec![0u8; 1]).unwrap();
-///
-/// // write
-/// t.write(&[0x00]).unwrap();
-/// t.flush().unwrap();
/// ```
-pub struct TBufferedTransport {
- rbuf: Box<[u8]>,
- rpos: usize,
- rcap: usize,
- wbuf: Vec<u8>,
- inner: Rc<RefCell<Box<TTransport>>>,
+#[derive(Debug)]
+pub struct TBufferedReadTransport<C>
+where
+ C: Read,
+{
+ buf: Box<[u8]>,
+ pos: usize,
+ cap: usize,
+ chan: C,
}
-impl TBufferedTransport {
+impl<C> TBufferedReadTransport<C>
+where
+ C: Read,
+{
/// Create a `TBufferedTransport` with default-sized internal read and
- /// write buffers that wraps an `inner` `TTransport`.
- pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TBufferedTransport {
- TBufferedTransport::with_capacity(DEFAULT_RBUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner)
+ /// write buffers that wraps the given `TIoChannel`.
+ pub fn new(channel: C) -> TBufferedReadTransport<C> {
+ TBufferedReadTransport::with_capacity(READ_CAPACITY, channel)
}
/// Create a `TBufferedTransport` with an internal read buffer of size
- /// `read_buffer_capacity` and an internal write buffer of size
- /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
- pub fn with_capacity(read_buffer_capacity: usize,
- write_buffer_capacity: usize,
- inner: Rc<RefCell<Box<TTransport>>>)
- -> TBufferedTransport {
- TBufferedTransport {
- rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
- rpos: 0,
- rcap: 0,
- wbuf: Vec::with_capacity(write_buffer_capacity),
- inner: inner,
+ /// `read_capacity` and an internal write buffer of size
+ /// `write_capacity` that wraps the given `TIoChannel`.
+ pub fn with_capacity(read_capacity: usize, channel: C) -> TBufferedReadTransport<C> {
+ TBufferedReadTransport {
+ buf: vec![0; read_capacity].into_boxed_slice(),
+ pos: 0,
+ cap: 0,
+ chan: channel,
}
}
fn get_bytes(&mut self) -> io::Result<&[u8]> {
- if self.rcap - self.rpos == 0 {
- self.rpos = 0;
- self.rcap = self.inner.borrow_mut().read(&mut self.rbuf)?;
+ if self.cap - self.pos == 0 {
+ self.pos = 0;
+ self.cap = self.chan.read(&mut self.buf)?;
}
- Ok(&self.rbuf[self.rpos..self.rcap])
+ Ok(&self.buf[self.pos..self.cap])
}
fn consume(&mut self, consumed: usize) {
// TODO: was a bug here += <-- test somehow
- self.rpos = cmp::min(self.rcap, self.rpos + consumed);
+ self.pos = cmp::min(self.cap, self.pos + consumed);
}
}
-impl Read for TBufferedTransport {
+impl<C> Read for TBufferedReadTransport<C>
+where
+ C: Read,
+{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut bytes_read = 0;
@@ -137,65 +127,127 @@
}
}
-impl Write for TBufferedTransport {
+/// Factory for creating instances of `TBufferedReadTransport`.
+#[derive(Default)]
+pub struct TBufferedReadTransportFactory;
+
+impl TBufferedReadTransportFactory {
+ pub fn new() -> TBufferedReadTransportFactory {
+ TBufferedReadTransportFactory {}
+ }
+}
+
+impl TReadTransportFactory for TBufferedReadTransportFactory {
+ /// Create a `TBufferedReadTransport`.
+ fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> {
+ Box::new(TBufferedReadTransport::new(channel))
+ }
+}
+
+/// Transport that writes messages via an internal buffer.
+///
+/// A `TBufferedWriteTransport` maintains a fixed-size internal write buffer.
+/// All writes are made to this buffer and are sent to the wrapped channel only
+/// when `TBufferedWriteTransport::flush()` is called. On a flush a fixed-length
+/// header with a count of the buffered bytes is written, followed by the bytes
+/// themselves.
+///
+/// # Examples
+///
+/// Create and use a `TBufferedWriteTransport`.
+///
+/// ```no_run
+/// use std::io::Write;
+/// use thrift::transport::{TBufferedWriteTransport, TTcpChannel};
+///
+/// let mut c = TTcpChannel::new();
+/// c.open("localhost:9090").unwrap();
+///
+/// let mut t = TBufferedWriteTransport::new(c);
+///
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+#[derive(Debug)]
+pub struct TBufferedWriteTransport<C>
+where
+ C: Write,
+{
+ buf: Vec<u8>,
+ channel: C,
+}
+
+impl<C> TBufferedWriteTransport<C>
+where
+ C: Write,
+{
+ /// Create a `TBufferedTransport` with default-sized internal read and
+ /// write buffers that wraps the given `TIoChannel`.
+ pub fn new(channel: C) -> TBufferedWriteTransport<C> {
+ TBufferedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
+ }
+
+ /// Create a `TBufferedTransport` with an internal read buffer of size
+ /// `read_capacity` and an internal write buffer of size
+ /// `write_capacity` that wraps the given `TIoChannel`.
+ pub fn with_capacity(write_capacity: usize, channel: C) -> TBufferedWriteTransport<C> {
+ TBufferedWriteTransport {
+ buf: Vec::with_capacity(write_capacity),
+ channel: channel,
+ }
+ }
+}
+
+impl<C> Write for TBufferedWriteTransport<C>
+where
+ C: Write,
+{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- let avail_bytes = cmp::min(buf.len(), self.wbuf.capacity() - self.wbuf.len());
- self.wbuf.extend_from_slice(&buf[..avail_bytes]);
- assert!(self.wbuf.len() <= self.wbuf.capacity(),
- "copy overflowed buffer");
+ let avail_bytes = cmp::min(buf.len(), self.buf.capacity() - self.buf.len());
+ self.buf.extend_from_slice(&buf[..avail_bytes]);
+ assert!(
+ self.buf.len() <= self.buf.capacity(),
+ "copy overflowed buffer"
+ );
Ok(avail_bytes)
}
fn flush(&mut self) -> io::Result<()> {
- self.inner.borrow_mut().write_all(&self.wbuf)?;
- self.inner.borrow_mut().flush()?;
- self.wbuf.clear();
+ self.channel.write_all(&self.buf)?;
+ self.channel.flush()?;
+ self.buf.clear();
Ok(())
}
}
-/// Factory for creating instances of `TBufferedTransport`
+/// Factory for creating instances of `TBufferedWriteTransport`.
#[derive(Default)]
-pub struct TBufferedTransportFactory;
+pub struct TBufferedWriteTransportFactory;
-impl TBufferedTransportFactory {
- /// Create a `TBufferedTransportFactory`.
- pub fn new() -> TBufferedTransportFactory {
- TBufferedTransportFactory {}
+impl TBufferedWriteTransportFactory {
+ pub fn new() -> TBufferedWriteTransportFactory {
+ TBufferedWriteTransportFactory {}
}
}
-impl TTransportFactory for TBufferedTransportFactory {
- fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
- Box::new(TBufferedTransport::new(inner)) as Box<TTransport>
+impl TWriteTransportFactory for TBufferedWriteTransportFactory {
+ /// Create a `TBufferedWriteTransport`.
+ fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> {
+ Box::new(TBufferedWriteTransport::new(channel))
}
}
#[cfg(test)]
mod tests {
- use std::cell::RefCell;
use std::io::{Read, Write};
- use std::rc::Rc;
use super::*;
- use ::transport::{TPassThruTransport, TTransport};
- use ::transport::mem::TBufferTransport;
-
- macro_rules! new_transports {
- ($wbc:expr, $rbc:expr) => (
- {
- let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity($wbc, $rbc))));
- let thru: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
- let thru = Rc::new(RefCell::new(thru));
- (mem, thru)
- }
- );
- }
+ use transport::TBufferChannel;
#[test]
fn must_return_zero_if_read_buffer_is_empty() {
- let (_, thru) = new_transports!(10, 0);
- let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+ let mem = TBufferChannel::with_capacity(10, 0);
+ let mut t = TBufferedReadTransport::with_capacity(10, mem);
let mut b = vec![0; 10];
let read_result = t.read(&mut b);
@@ -205,8 +257,8 @@
#[test]
fn must_return_zero_if_caller_reads_into_zero_capacity_buffer() {
- let (_, thru) = new_transports!(10, 0);
- let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+ let mem = TBufferChannel::with_capacity(10, 0);
+ let mut t = TBufferedReadTransport::with_capacity(10, mem);
let read_result = t.read(&mut []);
@@ -215,10 +267,10 @@
#[test]
fn must_return_zero_if_nothing_more_can_be_read() {
- let (mem, thru) = new_transports!(4, 0);
- let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+ let mem = TBufferChannel::with_capacity(4, 0);
+ let mut t = TBufferedReadTransport::with_capacity(4, mem);
- mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+ t.chan.set_readable_bytes(&[0, 1, 2, 3]);
// read buffer is exactly the same size as bytes available
let mut buf = vec![0u8; 4];
@@ -239,10 +291,10 @@
#[test]
fn must_fill_user_buffer_with_only_as_many_bytes_as_available() {
- let (mem, thru) = new_transports!(4, 0);
- let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+ let mem = TBufferChannel::with_capacity(4, 0);
+ let mut t = TBufferedReadTransport::with_capacity(4, mem);
- mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+ t.chan.set_readable_bytes(&[0, 1, 2, 3]);
// read buffer is much larger than the bytes available
let mut buf = vec![0u8; 8];
@@ -268,15 +320,16 @@
// we have a much smaller buffer than the
// underlying transport has bytes available
- let (mem, thru) = new_transports!(10, 0);
- let mut t = TBufferedTransport::with_capacity(2, 0, thru);
+ let mem = TBufferChannel::with_capacity(10, 0);
+ let mut t = TBufferedReadTransport::with_capacity(2, mem);
// fill the underlying transport's byte buffer
let mut readable_bytes = [0u8; 10];
for i in 0..10 {
readable_bytes[i] = i as u8;
}
- mem.borrow_mut().set_readable_bytes(&readable_bytes);
+
+ t.chan.set_readable_bytes(&readable_bytes);
// we ask to read into a buffer that's much larger
// than the one the buffered transport has; as a result
@@ -312,8 +365,8 @@
#[test]
fn must_return_zero_if_nothing_can_be_written() {
- let (_, thru) = new_transports!(0, 0);
- let mut t = TBufferedTransport::with_capacity(0, 0, thru);
+ let mem = TBufferChannel::with_capacity(0, 0);
+ let mut t = TBufferedWriteTransport::with_capacity(0, mem);
let b = vec![0; 10];
let r = t.write(&b);
@@ -323,19 +376,20 @@
#[test]
fn must_return_zero_if_caller_calls_write_with_empty_buffer() {
- let (mem, thru) = new_transports!(0, 10);
- let mut t = TBufferedTransport::with_capacity(0, 10, thru);
+ let mem = TBufferChannel::with_capacity(0, 10);
+ let mut t = TBufferedWriteTransport::with_capacity(10, mem);
let r = t.write(&[]);
+ let expected: [u8; 0] = [];
assert_eq!(r.unwrap(), 0);
- assert_eq!(mem.borrow_mut().write_buffer_as_ref(), &[]);
+ assert_eq_transport_written_bytes!(t, expected);
}
#[test]
fn must_return_zero_if_write_buffer_full() {
- let (_, thru) = new_transports!(0, 0);
- let mut t = TBufferedTransport::with_capacity(0, 4, thru);
+ let mem = TBufferChannel::with_capacity(0, 0);
+ let mut t = TBufferedWriteTransport::with_capacity(4, mem);
let b = [0x00, 0x01, 0x02, 0x03];
@@ -350,26 +404,22 @@
#[test]
fn must_only_write_to_inner_transport_on_flush() {
- let (mem, thru) = new_transports!(10, 10);
- let mut t = TBufferedTransport::new(thru);
+ let mem = TBufferChannel::with_capacity(10, 10);
+ let mut t = TBufferedWriteTransport::new(mem);
let b: [u8; 5] = [0, 1, 2, 3, 4];
assert_eq!(t.write(&b).unwrap(), 5);
- assert_eq!(mem.borrow_mut().write_buffer_as_ref().len(), 0);
+ assert_eq_transport_num_written_bytes!(t, 0);
assert!(t.flush().is_ok());
- {
- let inner = mem.borrow_mut();
- let underlying_buffer = inner.write_buffer_as_ref();
- assert_eq!(b, underlying_buffer);
- }
+ assert_eq_transport_written_bytes!(t, b);
}
#[test]
fn must_write_successfully_after_flush() {
- let (mem, thru) = new_transports!(0, 5);
- let mut t = TBufferedTransport::with_capacity(0, 5, thru);
+ let mem = TBufferChannel::with_capacity(0, 5);
+ let mut t = TBufferedWriteTransport::with_capacity(5, mem);
// write and flush
let b: [u8; 5] = [0, 1, 2, 3, 4];
@@ -377,24 +427,16 @@
assert!(t.flush().is_ok());
// check the flushed bytes
- {
- let inner = mem.borrow_mut();
- let underlying_buffer = inner.write_buffer_as_ref();
- assert_eq!(b, underlying_buffer);
- }
+ assert_eq_transport_written_bytes!(t, b);
// reset our underlying transport
- mem.borrow_mut().empty_write_buffer();
+ t.channel.empty_write_buffer();
// write and flush again
assert_eq!(t.write(&b).unwrap(), 5);
assert!(t.flush().is_ok());
// check the flushed bytes
- {
- let inner = mem.borrow_mut();
- let underlying_buffer = inner.write_buffer_as_ref();
- assert_eq!(b, underlying_buffer);
- }
+ assert_eq_transport_written_bytes!(t, b);
}
}