| Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one | 
|  | 2 | // or more contributor license agreements. See the NOTICE file | 
|  | 3 | // distributed with this work for additional information | 
|  | 4 | // regarding copyright ownership. The ASF licenses this file | 
|  | 5 | // to you under the Apache License, Version 2.0 (the | 
|  | 6 | // "License"); you may not use this file except in compliance | 
|  | 7 | // with the License. You may obtain a copy of the License at | 
|  | 8 | // | 
|  | 9 | //   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | 10 | // | 
|  | 11 | // Unless required by applicable law or agreed to in writing, | 
|  | 12 | // software distributed under the License is distributed on an | 
|  | 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | 14 | // KIND, either express or implied. See the License for the | 
|  | 15 | // specific language governing permissions and limitations | 
|  | 16 | // under the License. | 
|  | 17 |  | 
|  | 18 | use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; | 
|  | 19 | use std::cell::RefCell; | 
|  | 20 | use std::cmp; | 
|  | 21 | use std::io; | 
|  | 22 | use std::io::{ErrorKind, Read, Write}; | 
|  | 23 | use std::rc::Rc; | 
|  | 24 |  | 
|  | 25 | use super::{TTransport, TTransportFactory}; | 
|  | 26 |  | 
|  | 27 | /// Default capacity of the read buffer in bytes. | 
|  | 28 | const WRITE_BUFFER_CAPACITY: usize = 4096; | 
|  | 29 |  | 
|  | 30 | /// Default capacity of the write buffer in bytes.. | 
|  | 31 | const DEFAULT_WBUFFER_CAPACITY: usize = 4096; | 
|  | 32 |  | 
|  | 33 | /// Transport that communicates with endpoints using framed messages. | 
|  | 34 | /// | 
|  | 35 | /// A `TFramedTransport` maintains a fixed-size internal write buffer. All | 
|  | 36 | /// writes are made to this buffer and are sent to the wrapped transport only | 
|  | 37 | /// when `TTransport::flush()` is called. On a flush a fixed-length header with a | 
|  | 38 | /// count of the buffered bytes is written, followed by the bytes themselves. | 
|  | 39 | /// | 
|  | 40 | /// A `TFramedTransport` also maintains a fixed-size internal read buffer. | 
|  | 41 | /// On a call to `TTransport::read(...)` one full message - both fixed-length | 
|  | 42 | /// header and bytes - is read from the wrapped transport and buffered. | 
|  | 43 | /// Subsequent read calls are serviced from the internal buffer until it is | 
|  | 44 | /// exhausted, at which point the next full message is read from the wrapped | 
|  | 45 | /// transport. | 
|  | 46 | /// | 
|  | 47 | /// # Examples | 
|  | 48 | /// | 
|  | 49 | /// Create and use a `TFramedTransport`. | 
|  | 50 | /// | 
|  | 51 | /// ```no_run | 
|  | 52 | /// use std::cell::RefCell; | 
|  | 53 | /// use std::rc::Rc; | 
|  | 54 | /// use std::io::{Read, Write}; | 
|  | 55 | /// use thrift::transport::{TFramedTransport, TTcpTransport, TTransport}; | 
|  | 56 | /// | 
|  | 57 | /// let mut t = TTcpTransport::new(); | 
|  | 58 | /// t.open("localhost:9090").unwrap(); | 
|  | 59 | /// | 
|  | 60 | /// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>)); | 
|  | 61 | /// let mut t = TFramedTransport::new(t); | 
|  | 62 | /// | 
|  | 63 | /// // read | 
|  | 64 | /// t.read(&mut vec![0u8; 1]).unwrap(); | 
|  | 65 | /// | 
|  | 66 | /// // write | 
|  | 67 | /// t.write(&[0x00]).unwrap(); | 
|  | 68 | /// t.flush().unwrap(); | 
|  | 69 | /// ``` | 
|  | 70 | pub struct TFramedTransport { | 
|  | 71 | rbuf: Box<[u8]>, | 
|  | 72 | rpos: usize, | 
|  | 73 | rcap: usize, | 
|  | 74 | wbuf: Box<[u8]>, | 
|  | 75 | wpos: usize, | 
|  | 76 | inner: Rc<RefCell<Box<TTransport>>>, | 
|  | 77 | } | 
|  | 78 |  | 
|  | 79 | impl TFramedTransport { | 
|  | 80 | /// Create a `TFramedTransport` with default-sized internal read and | 
|  | 81 | /// write buffers that wraps an `inner` `TTransport`. | 
|  | 82 | pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TFramedTransport { | 
|  | 83 | TFramedTransport::with_capacity(WRITE_BUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner) | 
|  | 84 | } | 
|  | 85 |  | 
|  | 86 | /// Create a `TFramedTransport` with an internal read buffer of size | 
|  | 87 | /// `read_buffer_capacity` and an internal write buffer of size | 
|  | 88 | /// `write_buffer_capacity` that wraps an `inner` `TTransport`. | 
|  | 89 | pub fn with_capacity(read_buffer_capacity: usize, | 
|  | 90 | write_buffer_capacity: usize, | 
|  | 91 | inner: Rc<RefCell<Box<TTransport>>>) | 
|  | 92 | -> TFramedTransport { | 
|  | 93 | TFramedTransport { | 
|  | 94 | rbuf: vec![0; read_buffer_capacity].into_boxed_slice(), | 
|  | 95 | rpos: 0, | 
|  | 96 | rcap: 0, | 
|  | 97 | wbuf: vec![0; write_buffer_capacity].into_boxed_slice(), | 
|  | 98 | wpos: 0, | 
|  | 99 | inner: inner, | 
|  | 100 | } | 
|  | 101 | } | 
|  | 102 | } | 
|  | 103 |  | 
|  | 104 | impl Read for TFramedTransport { | 
|  | 105 | fn read(&mut self, b: &mut [u8]) -> io::Result<usize> { | 
|  | 106 | if self.rcap - self.rpos == 0 { | 
|  | 107 | let message_size = self.inner.borrow_mut().read_i32::<BigEndian>()? as usize; | 
|  | 108 | if message_size > self.rbuf.len() { | 
|  | 109 | return Err(io::Error::new(ErrorKind::Other, | 
|  | 110 | format!("bytes to be read ({}) exceeds buffer \ | 
|  | 111 | capacity ({})", | 
|  | 112 | message_size, | 
|  | 113 | self.rbuf.len()))); | 
|  | 114 | } | 
|  | 115 | self.inner.borrow_mut().read_exact(&mut self.rbuf[..message_size])?; | 
|  | 116 | self.rpos = 0; | 
|  | 117 | self.rcap = message_size as usize; | 
|  | 118 | } | 
|  | 119 |  | 
|  | 120 | let nread = cmp::min(b.len(), self.rcap - self.rpos); | 
|  | 121 | b[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]); | 
|  | 122 | self.rpos += nread; | 
|  | 123 |  | 
|  | 124 | Ok(nread) | 
|  | 125 | } | 
|  | 126 | } | 
|  | 127 |  | 
|  | 128 | impl Write for TFramedTransport { | 
|  | 129 | fn write(&mut self, b: &[u8]) -> io::Result<usize> { | 
|  | 130 | if b.len() > (self.wbuf.len() - self.wpos) { | 
|  | 131 | return Err(io::Error::new(ErrorKind::Other, | 
|  | 132 | format!("bytes to be written ({}) exceeds buffer \ | 
|  | 133 | capacity ({})", | 
|  | 134 | b.len(), | 
|  | 135 | self.wbuf.len() - self.wpos))); | 
|  | 136 | } | 
|  | 137 |  | 
|  | 138 | let nwrite = b.len(); // always less than available write buffer capacity | 
|  | 139 | self.wbuf[self.wpos..(self.wpos + nwrite)].clone_from_slice(b); | 
|  | 140 | self.wpos += nwrite; | 
|  | 141 | Ok(nwrite) | 
|  | 142 | } | 
|  | 143 |  | 
|  | 144 | fn flush(&mut self) -> io::Result<()> { | 
|  | 145 | let message_size = self.wpos; | 
|  | 146 |  | 
|  | 147 | if let 0 = message_size { | 
|  | 148 | return Ok(()); | 
|  | 149 | } else { | 
|  | 150 | self.inner.borrow_mut().write_i32::<BigEndian>(message_size as i32)?; | 
|  | 151 | } | 
|  | 152 |  | 
|  | 153 | let mut byte_index = 0; | 
|  | 154 | while byte_index < self.wpos { | 
|  | 155 | let nwrite = self.inner.borrow_mut().write(&self.wbuf[byte_index..self.wpos])?; | 
|  | 156 | byte_index = cmp::min(byte_index + nwrite, self.wpos); | 
|  | 157 | } | 
|  | 158 |  | 
|  | 159 | self.wpos = 0; | 
|  | 160 | self.inner.borrow_mut().flush() | 
|  | 161 | } | 
|  | 162 | } | 
|  | 163 |  | 
|  | 164 | /// Factory for creating instances of `TFramedTransport`. | 
|  | 165 | #[derive(Default)] | 
|  | 166 | pub struct TFramedTransportFactory; | 
|  | 167 |  | 
|  | 168 | impl TFramedTransportFactory { | 
|  | 169 | // Create a `TFramedTransportFactory`. | 
|  | 170 | pub fn new() -> TFramedTransportFactory { | 
|  | 171 | TFramedTransportFactory {} | 
|  | 172 | } | 
|  | 173 | } | 
|  | 174 |  | 
|  | 175 | impl TTransportFactory for TFramedTransportFactory { | 
|  | 176 | fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> { | 
|  | 177 | Box::new(TFramedTransport::new(inner)) as Box<TTransport> | 
|  | 178 | } | 
|  | 179 | } | 
|  | 180 |  | 
|  | 181 | #[cfg(test)] | 
|  | 182 | mod tests { | 
|  | 183 | //    use std::io::{Read, Write}; | 
|  | 184 | // | 
|  | 185 | //    use super::*; | 
|  | 186 | //    use ::transport::mem::TBufferTransport; | 
|  | 187 | } |