THRIFT-4176: Implement threaded server for Rust
Client: rs

* Create a TIoChannel construct
* Separate TTransport into TReadTransport and TWriteTransport
* Restructure types to avoid shared ownership
* Remove user-visible boxing and ref-counting
* Replace TSimpleServer with a thread-pool based TServer

This closes #1255
diff --git a/lib/rs/src/transport/mem.rs b/lib/rs/src/transport/mem.rs
index 97ec503..86ac6bb 100644
--- a/lib/rs/src/transport/mem.rs
+++ b/lib/rs/src/transport/mem.rs
@@ -17,9 +17,11 @@
 
 use std::cmp;
 use std::io;
+use std::sync::{Arc, Mutex};
 
-/// Simple transport that contains both a fixed-length internal read buffer and
-/// a fixed-length internal write buffer.
+use super::{ReadHalf, TIoChannel, WriteHalf};
+
+/// In-memory read and write channel with fixed-size read and write buffers.
 ///
 /// On a `write` bytes are written to the internal write buffer. Writes are no
 /// longer accepted once this buffer is full. Callers must `empty_write_buffer()`
@@ -29,37 +31,61 @@
 /// `set_readable_bytes(...)`. Callers can then read until the buffer is
 /// depleted. No further reads are accepted until the internal read buffer is
 /// replenished again.
-pub struct TBufferTransport {
-    rbuf: Box<[u8]>,
-    rpos: usize,
-    ridx: usize,
-    rcap: usize,
-    wbuf: Box<[u8]>,
-    wpos: usize,
-    wcap: usize,
+#[derive(Debug)]
+pub struct TBufferChannel {
+    read: Arc<Mutex<ReadData>>,
+    write: Arc<Mutex<WriteData>>,
 }
 
-impl TBufferTransport {
-    /// Constructs a new, empty `TBufferTransport` with the given
+#[derive(Debug)]
+struct ReadData {
+    buf: Box<[u8]>,
+    pos: usize,
+    idx: usize,
+    cap: usize,
+}
+
+#[derive(Debug)]
+struct WriteData {
+    buf: Box<[u8]>,
+    pos: usize,
+    cap: usize,
+}
+
+impl TBufferChannel {
+    /// Constructs a new, empty `TBufferChannel` with the given
     /// read buffer capacity and write buffer capacity.
-    pub fn with_capacity(read_buffer_capacity: usize,
-                         write_buffer_capacity: usize)
-                         -> TBufferTransport {
-        TBufferTransport {
-            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
-            ridx: 0,
-            rpos: 0,
-            rcap: read_buffer_capacity,
-            wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
-            wpos: 0,
-            wcap: write_buffer_capacity,
+    pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel {
+        TBufferChannel {
+            read: Arc::new(
+                Mutex::new(
+                    ReadData {
+                        buf: vec![0; read_capacity].into_boxed_slice(),
+                        idx: 0,
+                        pos: 0,
+                        cap: read_capacity,
+                    },
+                ),
+            ),
+            write: Arc::new(
+                Mutex::new(
+                    WriteData {
+                        buf: vec![0; write_capacity].into_boxed_slice(),
+                        pos: 0,
+                        cap: write_capacity,
+                    },
+                ),
+            ),
         }
     }
 
-    /// Return a slice containing the bytes held by the internal read buffer.
-    /// Returns an empty slice if no readable bytes are present.
-    pub fn read_buffer(&self) -> &[u8] {
-        &self.rbuf[..self.ridx]
+    /// Return a copy of the bytes held by the internal read buffer.
+    /// Returns an empty vector if no readable bytes are present.
+    pub fn read_bytes(&self) -> Vec<u8> {
+        let rdata = self.read.as_ref().lock().unwrap();
+        let mut buf = vec![0u8; rdata.idx];
+        buf.copy_from_slice(&rdata.buf[..rdata.idx]);
+        buf
     }
 
     // FIXME: do I really need this API call?
@@ -68,8 +94,9 @@
     ///
     /// Subsequent calls to `read` will return nothing.
     pub fn empty_read_buffer(&mut self) {
-        self.rpos = 0;
-        self.ridx = 0;
+        let mut rdata = self.read.as_ref().lock().unwrap();
+        rdata.pos = 0;
+        rdata.idx = 0;
     }
 
     /// Copy bytes from the source buffer `buf` into the internal read buffer,
@@ -77,37 +104,36 @@
     /// which is `min(buf.len(), internal_read_buf.len())`.
     pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
         self.empty_read_buffer();
-        let max_bytes = cmp::min(self.rcap, buf.len());
-        self.rbuf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
-        self.ridx = max_bytes;
+        let mut rdata = self.read.as_ref().lock().unwrap();
+        let max_bytes = cmp::min(rdata.cap, buf.len());
+        rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
+        rdata.idx = max_bytes;
         max_bytes
     }
 
-    /// Return a slice containing the bytes held by the internal write buffer.
-    /// Returns an empty slice if no bytes were written.
-    pub fn write_buffer_as_ref(&self) -> &[u8] {
-        &self.wbuf[..self.wpos]
-    }
-
-    /// Return a vector with a copy of the bytes held by the internal write buffer.
+    /// Return a copy of the bytes held by the internal write buffer.
     /// Returns an empty vector if no bytes were written.
-    pub fn write_buffer_to_vec(&self) -> Vec<u8> {
-        let mut buf = vec![0u8; self.wpos];
-        buf.copy_from_slice(&self.wbuf[..self.wpos]);
+    pub fn write_bytes(&self) -> Vec<u8> {
+        let wdata = self.write.as_ref().lock().unwrap();
+        let mut buf = vec![0u8; wdata.pos];
+        buf.copy_from_slice(&wdata.buf[..wdata.pos]);
         buf
     }
 
     /// Resets the internal write buffer, making it seem like no bytes were
-    /// written. Calling `write_buffer` after this returns an empty slice.
+    /// written. Calling `write_buffer` after this returns an empty vector.
     pub fn empty_write_buffer(&mut self) {
-        self.wpos = 0;
+        let mut wdata = self.write.as_ref().lock().unwrap();
+        wdata.pos = 0;
     }
 
     /// Overwrites the contents of the read buffer with the contents of the
     /// write buffer. The write buffer is emptied after this operation.
     pub fn copy_write_buffer_to_read_buffer(&mut self) {
+        // FIXME: redo this entire method
         let buf = {
-            let b = self.write_buffer_as_ref();
+            let wdata = self.write.as_ref().lock().unwrap();
+            let b = &wdata.buf[..wdata.pos];
             let mut b_ret = vec![0; b.len()];
             b_ret.copy_from_slice(b);
             b_ret
@@ -120,20 +146,45 @@
     }
 }
 
-impl io::Read for TBufferTransport {
+impl TIoChannel for TBufferChannel {
+    fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
+    where
+        Self: Sized,
+    {
+        Ok(
+            (ReadHalf {
+                 handle: TBufferChannel {
+                     read: self.read.clone(),
+                     write: self.write.clone(),
+                 },
+             },
+             WriteHalf {
+                 handle: TBufferChannel {
+                     read: self.read.clone(),
+                     write: self.write.clone(),
+                 },
+             }),
+        )
+    }
+}
+
+impl io::Read for TBufferChannel {
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        let nread = cmp::min(buf.len(), self.ridx - self.rpos);
-        buf[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
-        self.rpos += nread;
+        let mut rdata = self.read.as_ref().lock().unwrap();
+        let nread = cmp::min(buf.len(), rdata.idx - rdata.pos);
+        buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]);
+        rdata.pos += nread;
         Ok(nread)
     }
 }
 
-impl io::Write for TBufferTransport {
+impl io::Write for TBufferChannel {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        let nwrite = cmp::min(buf.len(), self.wcap - self.wpos);
-        self.wbuf[self.wpos..self.wpos + nwrite].clone_from_slice(&buf[..nwrite]);
-        self.wpos += nwrite;
+        let mut wdata = self.write.as_ref().lock().unwrap();
+        let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos);
+        let (start, end) = (wdata.pos, wdata.pos + nwrite);
+        wdata.buf[start..end].clone_from_slice(&buf[..nwrite]);
+        wdata.pos += nwrite;
         Ok(nwrite)
     }
 
@@ -146,68 +197,68 @@
 mod tests {
     use std::io::{Read, Write};
 
-    use super::TBufferTransport;
+    use super::TBufferChannel;
 
     #[test]
     fn must_empty_write_buffer() {
-        let mut t = TBufferTransport::with_capacity(0, 1);
+        let mut t = TBufferChannel::with_capacity(0, 1);
 
         let bytes_to_write: [u8; 1] = [0x01];
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 1);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
+        assert_eq!(&t.write_bytes(), &bytes_to_write);
 
         t.empty_write_buffer();
-        assert_eq!(t.write_buffer_as_ref().len(), 0);
+        assert_eq!(t.write_bytes().len(), 0);
     }
 
     #[test]
     fn must_accept_writes_after_buffer_emptied() {
-        let mut t = TBufferTransport::with_capacity(0, 2);
+        let mut t = TBufferChannel::with_capacity(0, 2);
 
         let bytes_to_write: [u8; 2] = [0x01, 0x02];
 
         // first write (all bytes written)
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 2);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
+        assert_eq!(&t.write_bytes(), &bytes_to_write);
 
         // try write again (nothing should be written)
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 0);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write); // still the same as before
+        assert_eq!(&t.write_bytes(), &bytes_to_write); // still the same as before
 
         // now reset the buffer
         t.empty_write_buffer();
-        assert_eq!(t.write_buffer_as_ref().len(), 0);
+        assert_eq!(t.write_bytes().len(), 0);
 
         // now try write again - the write should succeed
         let result = t.write(&bytes_to_write);
         assert_eq!(result.unwrap(), 2);
-        assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
+        assert_eq!(&t.write_bytes(), &bytes_to_write);
     }
 
     #[test]
     fn must_accept_multiple_writes_until_buffer_is_full() {
-        let mut t = TBufferTransport::with_capacity(0, 10);
+        let mut t = TBufferChannel::with_capacity(0, 10);
 
         // first write (all bytes written)
         let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
         let write_0_result = t.write(&bytes_to_write_0);
         assert_eq!(write_0_result.unwrap(), 2);
-        assert_eq!(t.write_buffer_as_ref(), &bytes_to_write_0);
+        assert_eq!(t.write_bytes(), &bytes_to_write_0);
 
         // second write (all bytes written, starting at index 2)
         let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
         let write_1_result = t.write(&bytes_to_write_1);
         assert_eq!(write_1_result.unwrap(), 7);
-        assert_eq!(&t.write_buffer_as_ref()[2..], &bytes_to_write_1);
+        assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1);
 
         // third write (only 1 byte written - that's all we have space for)
         let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
         let write_2_result = t.write(&bytes_to_write_2);
         assert_eq!(write_2_result.unwrap(), 1);
-        assert_eq!(&t.write_buffer_as_ref()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
+        assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
 
         // fourth write (no writes are accepted)
         let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
@@ -219,50 +270,50 @@
         expected.extend_from_slice(&bytes_to_write_0);
         expected.extend_from_slice(&bytes_to_write_1);
         expected.extend_from_slice(&bytes_to_write_2[0..1]);
-        assert_eq!(t.write_buffer_as_ref(), &expected[..]);
+        assert_eq!(t.write_bytes(), &expected[..]);
     }
 
     #[test]
     fn must_empty_read_buffer() {
-        let mut t = TBufferTransport::with_capacity(1, 0);
+        let mut t = TBufferChannel::with_capacity(1, 0);
 
         let bytes_to_read: [u8; 1] = [0x01];
         let result = t.set_readable_bytes(&bytes_to_read);
         assert_eq!(result, 1);
-        assert_eq!(&t.read_buffer(), &bytes_to_read);
+        assert_eq!(t.read_bytes(), &bytes_to_read);
 
         t.empty_read_buffer();
-        assert_eq!(t.read_buffer().len(), 0);
+        assert_eq!(t.read_bytes().len(), 0);
     }
 
     #[test]
     fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
-        let mut t = TBufferTransport::with_capacity(1, 0);
+        let mut t = TBufferChannel::with_capacity(1, 0);
 
         let bytes_to_read_0: [u8; 1] = [0x01];
         let result = t.set_readable_bytes(&bytes_to_read_0);
         assert_eq!(result, 1);
-        assert_eq!(&t.read_buffer(), &bytes_to_read_0);
+        assert_eq!(t.read_bytes(), &bytes_to_read_0);
 
         t.empty_read_buffer();
-        assert_eq!(t.read_buffer().len(), 0);
+        assert_eq!(t.read_bytes().len(), 0);
 
         let bytes_to_read_1: [u8; 1] = [0x02];
         let result = t.set_readable_bytes(&bytes_to_read_1);
         assert_eq!(result, 1);
-        assert_eq!(&t.read_buffer(), &bytes_to_read_1);
+        assert_eq!(t.read_bytes(), &bytes_to_read_1);
     }
 
     #[test]
     fn must_accept_multiple_reads_until_all_bytes_read() {
-        let mut t = TBufferTransport::with_capacity(10, 0);
+        let mut t = TBufferChannel::with_capacity(10, 0);
 
         let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
 
         // check that we're able to set the bytes to be read
         let result = t.set_readable_bytes(&readable_bytes);
         assert_eq!(result, 10);
-        assert_eq!(&t.read_buffer(), &readable_bytes);
+        assert_eq!(t.read_bytes(), &readable_bytes);
 
         // first read
         let mut read_buf_0 = vec![0; 5];
@@ -300,21 +351,21 @@
 
     #[test]
     fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
-        let mut t = TBufferTransport::with_capacity(3, 0);
+        let mut t = TBufferChannel::with_capacity(3, 0);
 
         let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
 
         // check that we're able to set the bytes to be read
         let result = t.set_readable_bytes(&readable_bytes_0);
         assert_eq!(result, 3);
-        assert_eq!(&t.read_buffer(), &readable_bytes_0);
+        assert_eq!(t.read_bytes(), &readable_bytes_0);
 
         let mut read_buf = vec![0; 4];
 
         // drain the read buffer
         let read_result = t.read(&mut read_buf);
         assert_eq!(read_result.unwrap(), 3);
-        assert_eq!(t.read_buffer(), &read_buf[0..3]);
+        assert_eq!(t.read_bytes(), &read_buf[0..3]);
 
         // check that a subsequent read fails
         let read_result = t.read(&mut read_buf);
@@ -332,11 +383,11 @@
         // check that we're able to set the bytes to be read
         let result = t.set_readable_bytes(&readable_bytes_1);
         assert_eq!(result, 2);
-        assert_eq!(&t.read_buffer(), &readable_bytes_1);
+        assert_eq!(t.read_bytes(), &readable_bytes_1);
 
         // read again
         let read_result = t.read(&mut read_buf);
         assert_eq!(read_result.unwrap(), 2);
-        assert_eq!(t.read_buffer(), &read_buf[0..2]);
+        assert_eq!(t.read_bytes(), &read_buf[0..2]);
     }
 }