Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | use std::cell::RefCell; |
| 19 | use std::cmp; |
| 20 | use std::io; |
| 21 | use std::io::{Read, Write}; |
| 22 | use std::rc::Rc; |
| 23 | |
| 24 | use super::{TTransport, TTransportFactory}; |
| 25 | |
| 26 | /// Default capacity of the read buffer in bytes. |
| 27 | const DEFAULT_RBUFFER_CAPACITY: usize = 4096; |
| 28 | |
| 29 | /// Default capacity of the write buffer in bytes.. |
| 30 | const DEFAULT_WBUFFER_CAPACITY: usize = 4096; |
| 31 | |
| 32 | /// Transport that communicates with endpoints using a byte stream. |
| 33 | /// |
| 34 | /// A `TBufferedTransport` maintains a fixed-size internal write buffer. All |
| 35 | /// writes are made to this buffer and are sent to the wrapped transport only |
| 36 | /// when `TTransport::flush()` is called. On a flush a fixed-length header with a |
| 37 | /// count of the buffered bytes is written, followed by the bytes themselves. |
| 38 | /// |
| 39 | /// A `TBufferedTransport` also maintains a fixed-size internal read buffer. |
| 40 | /// On a call to `TTransport::read(...)` one full message - both fixed-length |
| 41 | /// header and bytes - is read from the wrapped transport and buffered. |
| 42 | /// Subsequent read calls are serviced from the internal buffer until it is |
| 43 | /// exhausted, at which point the next full message is read from the wrapped |
| 44 | /// transport. |
| 45 | /// |
| 46 | /// # Examples |
| 47 | /// |
| 48 | /// Create and use a `TBufferedTransport`. |
| 49 | /// |
| 50 | /// ```no_run |
| 51 | /// use std::cell::RefCell; |
| 52 | /// use std::rc::Rc; |
| 53 | /// use std::io::{Read, Write}; |
| 54 | /// use thrift::transport::{TBufferedTransport, TTcpTransport, TTransport}; |
| 55 | /// |
| 56 | /// let mut t = TTcpTransport::new(); |
| 57 | /// t.open("localhost:9090").unwrap(); |
| 58 | /// |
| 59 | /// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>)); |
| 60 | /// let mut t = TBufferedTransport::new(t); |
| 61 | /// |
| 62 | /// // read |
| 63 | /// t.read(&mut vec![0u8; 1]).unwrap(); |
| 64 | /// |
| 65 | /// // write |
| 66 | /// t.write(&[0x00]).unwrap(); |
| 67 | /// t.flush().unwrap(); |
| 68 | /// ``` |
| 69 | pub struct TBufferedTransport { |
| 70 | rbuf: Box<[u8]>, |
| 71 | rpos: usize, |
| 72 | rcap: usize, |
| 73 | wbuf: Vec<u8>, |
| 74 | inner: Rc<RefCell<Box<TTransport>>>, |
| 75 | } |
| 76 | |
| 77 | impl TBufferedTransport { |
| 78 | /// Create a `TBufferedTransport` with default-sized internal read and |
| 79 | /// write buffers that wraps an `inner` `TTransport`. |
| 80 | pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TBufferedTransport { |
| 81 | TBufferedTransport::with_capacity(DEFAULT_RBUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner) |
| 82 | } |
| 83 | |
| 84 | /// Create a `TBufferedTransport` with an internal read buffer of size |
| 85 | /// `read_buffer_capacity` and an internal write buffer of size |
| 86 | /// `write_buffer_capacity` that wraps an `inner` `TTransport`. |
| 87 | pub fn with_capacity(read_buffer_capacity: usize, |
| 88 | write_buffer_capacity: usize, |
| 89 | inner: Rc<RefCell<Box<TTransport>>>) |
| 90 | -> TBufferedTransport { |
| 91 | TBufferedTransport { |
| 92 | rbuf: vec![0; read_buffer_capacity].into_boxed_slice(), |
| 93 | rpos: 0, |
| 94 | rcap: 0, |
| 95 | wbuf: Vec::with_capacity(write_buffer_capacity), |
| 96 | inner: inner, |
| 97 | } |
| 98 | } |
| 99 | |
| 100 | fn get_bytes(&mut self) -> io::Result<&[u8]> { |
| 101 | if self.rcap - self.rpos == 0 { |
| 102 | self.rpos = 0; |
| 103 | self.rcap = self.inner.borrow_mut().read(&mut self.rbuf)?; |
| 104 | } |
| 105 | |
| 106 | Ok(&self.rbuf[self.rpos..self.rcap]) |
| 107 | } |
| 108 | |
| 109 | fn consume(&mut self, consumed: usize) { |
| 110 | // TODO: was a bug here += <-- test somehow |
| 111 | self.rpos = cmp::min(self.rcap, self.rpos + consumed); |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | impl Read for TBufferedTransport { |
| 116 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| 117 | let mut bytes_read = 0; |
| 118 | |
| 119 | loop { |
| 120 | let nread = { |
| 121 | let avail_bytes = self.get_bytes()?; |
| 122 | let avail_space = buf.len() - bytes_read; |
| 123 | let nread = cmp::min(avail_space, avail_bytes.len()); |
| 124 | buf[bytes_read..(bytes_read + nread)].copy_from_slice(&avail_bytes[..nread]); |
| 125 | nread |
| 126 | }; |
| 127 | |
| 128 | self.consume(nread); |
| 129 | bytes_read += nread; |
| 130 | |
| 131 | if bytes_read == buf.len() || nread == 0 { |
| 132 | break; |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | Ok(bytes_read) |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | impl Write for TBufferedTransport { |
| 141 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| 142 | let avail_bytes = cmp::min(buf.len(), self.wbuf.capacity() - self.wbuf.len()); |
| 143 | self.wbuf.extend_from_slice(&buf[..avail_bytes]); |
| 144 | assert!(self.wbuf.len() <= self.wbuf.capacity(), |
| 145 | "copy overflowed buffer"); |
| 146 | Ok(avail_bytes) |
| 147 | } |
| 148 | |
| 149 | fn flush(&mut self) -> io::Result<()> { |
| 150 | self.inner.borrow_mut().write_all(&self.wbuf)?; |
| 151 | self.inner.borrow_mut().flush()?; |
| 152 | self.wbuf.clear(); |
| 153 | Ok(()) |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | /// Factory for creating instances of `TBufferedTransport` |
| 158 | #[derive(Default)] |
| 159 | pub struct TBufferedTransportFactory; |
| 160 | |
| 161 | impl TBufferedTransportFactory { |
| 162 | /// Create a `TBufferedTransportFactory`. |
| 163 | pub fn new() -> TBufferedTransportFactory { |
| 164 | TBufferedTransportFactory {} |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | impl TTransportFactory for TBufferedTransportFactory { |
| 169 | fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> { |
| 170 | Box::new(TBufferedTransport::new(inner)) as Box<TTransport> |
| 171 | } |
| 172 | } |
| 173 | |
| 174 | #[cfg(test)] |
| 175 | mod tests { |
| 176 | use std::cell::RefCell; |
| 177 | use std::io::{Read, Write}; |
| 178 | use std::rc::Rc; |
| 179 | |
| 180 | use super::*; |
| 181 | use ::transport::{TPassThruTransport, TTransport}; |
| 182 | use ::transport::mem::TBufferTransport; |
| 183 | |
| 184 | macro_rules! new_transports { |
| 185 | ($wbc:expr, $rbc:expr) => ( |
| 186 | { |
| 187 | let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity($wbc, $rbc)))); |
| 188 | let thru: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() }); |
| 189 | let thru = Rc::new(RefCell::new(thru)); |
| 190 | (mem, thru) |
| 191 | } |
| 192 | ); |
| 193 | } |
| 194 | |
| 195 | #[test] |
| 196 | fn must_return_zero_if_read_buffer_is_empty() { |
| 197 | let (_, thru) = new_transports!(10, 0); |
| 198 | let mut t = TBufferedTransport::with_capacity(10, 0, thru); |
| 199 | |
| 200 | let mut b = vec![0; 10]; |
| 201 | let read_result = t.read(&mut b); |
| 202 | |
| 203 | assert_eq!(read_result.unwrap(), 0); |
| 204 | } |
| 205 | |
| 206 | #[test] |
| 207 | fn must_return_zero_if_caller_reads_into_zero_capacity_buffer() { |
| 208 | let (_, thru) = new_transports!(10, 0); |
| 209 | let mut t = TBufferedTransport::with_capacity(10, 0, thru); |
| 210 | |
| 211 | let read_result = t.read(&mut []); |
| 212 | |
| 213 | assert_eq!(read_result.unwrap(), 0); |
| 214 | } |
| 215 | |
| 216 | #[test] |
| 217 | fn must_return_zero_if_nothing_more_can_be_read() { |
| 218 | let (mem, thru) = new_transports!(4, 0); |
| 219 | let mut t = TBufferedTransport::with_capacity(4, 0, thru); |
| 220 | |
| 221 | mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]); |
| 222 | |
| 223 | // read buffer is exactly the same size as bytes available |
| 224 | let mut buf = vec![0u8; 4]; |
| 225 | let read_result = t.read(&mut buf); |
| 226 | |
| 227 | // we've read exactly 4 bytes |
| 228 | assert_eq!(read_result.unwrap(), 4); |
| 229 | assert_eq!(&buf, &[0, 1, 2, 3]); |
| 230 | |
| 231 | // try read again |
| 232 | let buf_again = vec![0u8; 4]; |
| 233 | let read_result = t.read(&mut buf); |
| 234 | |
| 235 | // this time, 0 bytes and we haven't changed the buffer |
| 236 | assert_eq!(read_result.unwrap(), 0); |
| 237 | assert_eq!(&buf_again, &[0, 0, 0, 0]) |
| 238 | } |
| 239 | |
| 240 | #[test] |
| 241 | fn must_fill_user_buffer_with_only_as_many_bytes_as_available() { |
| 242 | let (mem, thru) = new_transports!(4, 0); |
| 243 | let mut t = TBufferedTransport::with_capacity(4, 0, thru); |
| 244 | |
| 245 | mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]); |
| 246 | |
| 247 | // read buffer is much larger than the bytes available |
| 248 | let mut buf = vec![0u8; 8]; |
| 249 | let read_result = t.read(&mut buf); |
| 250 | |
| 251 | // we've read exactly 4 bytes |
| 252 | assert_eq!(read_result.unwrap(), 4); |
| 253 | assert_eq!(&buf[..4], &[0, 1, 2, 3]); |
| 254 | |
| 255 | // try read again |
| 256 | let read_result = t.read(&mut buf[4..]); |
| 257 | |
| 258 | // this time, 0 bytes and we haven't changed the buffer |
| 259 | assert_eq!(read_result.unwrap(), 0); |
| 260 | assert_eq!(&buf, &[0, 1, 2, 3, 0, 0, 0, 0]) |
| 261 | } |
| 262 | |
| 263 | #[test] |
| 264 | fn must_read_successfully() { |
| 265 | // this test involves a few loops within the buffered transport |
| 266 | // itself where it has to drain the underlying transport in order |
| 267 | // to service a read |
| 268 | |
| 269 | // we have a much smaller buffer than the |
| 270 | // underlying transport has bytes available |
| 271 | let (mem, thru) = new_transports!(10, 0); |
| 272 | let mut t = TBufferedTransport::with_capacity(2, 0, thru); |
| 273 | |
| 274 | // fill the underlying transport's byte buffer |
| 275 | let mut readable_bytes = [0u8; 10]; |
| 276 | for i in 0..10 { |
| 277 | readable_bytes[i] = i as u8; |
| 278 | } |
| 279 | mem.borrow_mut().set_readable_bytes(&readable_bytes); |
| 280 | |
| 281 | // we ask to read into a buffer that's much larger |
| 282 | // than the one the buffered transport has; as a result |
| 283 | // it's going to have to keep asking the underlying |
| 284 | // transport for more bytes |
| 285 | let mut buf = [0u8; 8]; |
| 286 | let read_result = t.read(&mut buf); |
| 287 | |
| 288 | // we should have read 8 bytes |
| 289 | assert_eq!(read_result.unwrap(), 8); |
| 290 | assert_eq!(&buf, &[0, 1, 2, 3, 4, 5, 6, 7]); |
| 291 | |
| 292 | // let's clear out the buffer and try read again |
| 293 | for i in 0..8 { |
| 294 | buf[i] = 0; |
| 295 | } |
| 296 | let read_result = t.read(&mut buf); |
| 297 | |
| 298 | // this time we were only able to read 2 bytes |
| 299 | // (all that's remaining from the underlying transport) |
| 300 | // let's also check that the remaining bytes are untouched |
| 301 | assert_eq!(read_result.unwrap(), 2); |
| 302 | assert_eq!(&buf[0..2], &[8, 9]); |
| 303 | assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]); |
| 304 | |
| 305 | // try read again (we should get 0) |
| 306 | // and all the existing bytes were untouched |
| 307 | let read_result = t.read(&mut buf); |
| 308 | assert_eq!(read_result.unwrap(), 0); |
| 309 | assert_eq!(&buf[0..2], &[8, 9]); |
| 310 | assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]); |
| 311 | } |
| 312 | |
| 313 | #[test] |
| 314 | fn must_return_zero_if_nothing_can_be_written() { |
| 315 | let (_, thru) = new_transports!(0, 0); |
| 316 | let mut t = TBufferedTransport::with_capacity(0, 0, thru); |
| 317 | |
| 318 | let b = vec![0; 10]; |
| 319 | let r = t.write(&b); |
| 320 | |
| 321 | assert_eq!(r.unwrap(), 0); |
| 322 | } |
| 323 | |
| 324 | #[test] |
| 325 | fn must_return_zero_if_caller_calls_write_with_empty_buffer() { |
| 326 | let (mem, thru) = new_transports!(0, 10); |
| 327 | let mut t = TBufferedTransport::with_capacity(0, 10, thru); |
| 328 | |
| 329 | let r = t.write(&[]); |
| 330 | |
| 331 | assert_eq!(r.unwrap(), 0); |
| 332 | assert_eq!(mem.borrow_mut().write_buffer_as_ref(), &[]); |
| 333 | } |
| 334 | |
| 335 | #[test] |
| 336 | fn must_return_zero_if_write_buffer_full() { |
| 337 | let (_, thru) = new_transports!(0, 0); |
| 338 | let mut t = TBufferedTransport::with_capacity(0, 4, thru); |
| 339 | |
| 340 | let b = [0x00, 0x01, 0x02, 0x03]; |
| 341 | |
| 342 | // we've now filled the write buffer |
| 343 | let r = t.write(&b); |
| 344 | assert_eq!(r.unwrap(), 4); |
| 345 | |
| 346 | // try write the same bytes again - nothing should be writable |
| 347 | let r = t.write(&b); |
| 348 | assert_eq!(r.unwrap(), 0); |
| 349 | } |
| 350 | |
| 351 | #[test] |
| 352 | fn must_only_write_to_inner_transport_on_flush() { |
| 353 | let (mem, thru) = new_transports!(10, 10); |
| 354 | let mut t = TBufferedTransport::new(thru); |
| 355 | |
| 356 | let b: [u8; 5] = [0, 1, 2, 3, 4]; |
| 357 | assert_eq!(t.write(&b).unwrap(), 5); |
| 358 | assert_eq!(mem.borrow_mut().write_buffer_as_ref().len(), 0); |
| 359 | |
| 360 | assert!(t.flush().is_ok()); |
| 361 | |
| 362 | { |
| 363 | let inner = mem.borrow_mut(); |
| 364 | let underlying_buffer = inner.write_buffer_as_ref(); |
| 365 | assert_eq!(b, underlying_buffer); |
| 366 | } |
| 367 | } |
| 368 | |
| 369 | #[test] |
| 370 | fn must_write_successfully_after_flush() { |
| 371 | let (mem, thru) = new_transports!(0, 5); |
| 372 | let mut t = TBufferedTransport::with_capacity(0, 5, thru); |
| 373 | |
| 374 | // write and flush |
| 375 | let b: [u8; 5] = [0, 1, 2, 3, 4]; |
| 376 | assert_eq!(t.write(&b).unwrap(), 5); |
| 377 | assert!(t.flush().is_ok()); |
| 378 | |
| 379 | // check the flushed bytes |
| 380 | { |
| 381 | let inner = mem.borrow_mut(); |
| 382 | let underlying_buffer = inner.write_buffer_as_ref(); |
| 383 | assert_eq!(b, underlying_buffer); |
| 384 | } |
| 385 | |
| 386 | // reset our underlying transport |
| 387 | mem.borrow_mut().empty_write_buffer(); |
| 388 | |
| 389 | // write and flush again |
| 390 | assert_eq!(t.write(&b).unwrap(), 5); |
| 391 | assert!(t.flush().is_ok()); |
| 392 | |
| 393 | // check the flushed bytes |
| 394 | { |
| 395 | let inner = mem.borrow_mut(); |
| 396 | let underlying_buffer = inner.write_buffer_as_ref(); |
| 397 | assert_eq!(b, underlying_buffer); |
| 398 | } |
| 399 | } |
| 400 | } |