Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 18 | use std::cmp; |
| 19 | use std::io; |
| 20 | use std::io::{Read, Write}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 21 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 22 | use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 23 | |
| 24 | /// Default capacity of the read buffer in bytes. |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 25 | const READ_CAPACITY: usize = 4096; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 26 | |
| 27 | /// Default capacity of the write buffer in bytes.. |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 28 | const WRITE_CAPACITY: usize = 4096; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 29 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 30 | /// Transport that reads messages via an internal buffer. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 31 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 32 | /// A `TBufferedReadTransport` maintains a fixed-size internal read buffer. |
| 33 | /// On a call to `TBufferedReadTransport::read(...)` one full message - both |
| 34 | /// fixed-length header and bytes - is read from the wrapped channel and buffered. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 35 | /// Subsequent read calls are serviced from the internal buffer until it is |
| 36 | /// exhausted, at which point the next full message is read from the wrapped |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 37 | /// channel. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 38 | /// |
| 39 | /// # Examples |
| 40 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 41 | /// Create and use a `TBufferedReadTransport`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 42 | /// |
| 43 | /// ```no_run |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 44 | /// use std::io::Read; |
| 45 | /// use thrift::transport::{TBufferedReadTransport, TTcpChannel}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 46 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 47 | /// let mut c = TTcpChannel::new(); |
| 48 | /// c.open("localhost:9090").unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 49 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 50 | /// let mut t = TBufferedReadTransport::new(c); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 51 | /// |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 52 | /// t.read(&mut vec![0u8; 1]).unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 53 | /// ``` |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 54 | #[derive(Debug)] |
| 55 | pub struct TBufferedReadTransport<C> |
| 56 | where |
| 57 | C: Read, |
| 58 | { |
| 59 | buf: Box<[u8]>, |
| 60 | pos: usize, |
| 61 | cap: usize, |
| 62 | chan: C, |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 63 | } |
| 64 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 65 | impl<C> TBufferedReadTransport<C> |
| 66 | where |
| 67 | C: Read, |
| 68 | { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 69 | /// Create a `TBufferedTransport` with default-sized internal read and |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 70 | /// write buffers that wraps the given `TIoChannel`. |
| 71 | pub fn new(channel: C) -> TBufferedReadTransport<C> { |
| 72 | TBufferedReadTransport::with_capacity(READ_CAPACITY, channel) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 73 | } |
| 74 | |
| 75 | /// Create a `TBufferedTransport` with an internal read buffer of size |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 76 | /// `read_capacity` and an internal write buffer of size |
| 77 | /// `write_capacity` that wraps the given `TIoChannel`. |
| 78 | pub fn with_capacity(read_capacity: usize, channel: C) -> TBufferedReadTransport<C> { |
| 79 | TBufferedReadTransport { |
| 80 | buf: vec![0; read_capacity].into_boxed_slice(), |
| 81 | pos: 0, |
| 82 | cap: 0, |
| 83 | chan: channel, |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 84 | } |
| 85 | } |
| 86 | |
| 87 | fn get_bytes(&mut self) -> io::Result<&[u8]> { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 88 | if self.cap - self.pos == 0 { |
| 89 | self.pos = 0; |
| 90 | self.cap = self.chan.read(&mut self.buf)?; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 91 | } |
| 92 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 93 | Ok(&self.buf[self.pos..self.cap]) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 94 | } |
| 95 | |
| 96 | fn consume(&mut self, consumed: usize) { |
| 97 | // TODO: was a bug here += <-- test somehow |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 98 | self.pos = cmp::min(self.cap, self.pos + consumed); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 99 | } |
| 100 | } |
| 101 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 102 | impl<C> Read for TBufferedReadTransport<C> |
| 103 | where |
| 104 | C: Read, |
| 105 | { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 106 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| 107 | let mut bytes_read = 0; |
| 108 | |
| 109 | loop { |
| 110 | let nread = { |
| 111 | let avail_bytes = self.get_bytes()?; |
| 112 | let avail_space = buf.len() - bytes_read; |
| 113 | let nread = cmp::min(avail_space, avail_bytes.len()); |
| 114 | buf[bytes_read..(bytes_read + nread)].copy_from_slice(&avail_bytes[..nread]); |
| 115 | nread |
| 116 | }; |
| 117 | |
| 118 | self.consume(nread); |
| 119 | bytes_read += nread; |
| 120 | |
| 121 | if bytes_read == buf.len() || nread == 0 { |
| 122 | break; |
| 123 | } |
| 124 | } |
| 125 | |
| 126 | Ok(bytes_read) |
| 127 | } |
| 128 | } |
| 129 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 130 | /// Factory for creating instances of `TBufferedReadTransport`. |
| 131 | #[derive(Default)] |
| 132 | pub struct TBufferedReadTransportFactory; |
| 133 | |
| 134 | impl TBufferedReadTransportFactory { |
| 135 | pub fn new() -> TBufferedReadTransportFactory { |
| 136 | TBufferedReadTransportFactory {} |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | impl TReadTransportFactory for TBufferedReadTransportFactory { |
| 141 | /// Create a `TBufferedReadTransport`. |
| 142 | fn create(&self, channel: Box<Read + Send>) -> Box<TReadTransport + Send> { |
| 143 | Box::new(TBufferedReadTransport::new(channel)) |
| 144 | } |
| 145 | } |
| 146 | |
| 147 | /// Transport that writes messages via an internal buffer. |
| 148 | /// |
| 149 | /// A `TBufferedWriteTransport` maintains a fixed-size internal write buffer. |
| 150 | /// All writes are made to this buffer and are sent to the wrapped channel only |
| 151 | /// when `TBufferedWriteTransport::flush()` is called. On a flush a fixed-length |
| 152 | /// header with a count of the buffered bytes is written, followed by the bytes |
| 153 | /// themselves. |
| 154 | /// |
| 155 | /// # Examples |
| 156 | /// |
| 157 | /// Create and use a `TBufferedWriteTransport`. |
| 158 | /// |
| 159 | /// ```no_run |
| 160 | /// use std::io::Write; |
| 161 | /// use thrift::transport::{TBufferedWriteTransport, TTcpChannel}; |
| 162 | /// |
| 163 | /// let mut c = TTcpChannel::new(); |
| 164 | /// c.open("localhost:9090").unwrap(); |
| 165 | /// |
| 166 | /// let mut t = TBufferedWriteTransport::new(c); |
| 167 | /// |
| 168 | /// t.write(&[0x00]).unwrap(); |
| 169 | /// t.flush().unwrap(); |
| 170 | /// ``` |
| 171 | #[derive(Debug)] |
| 172 | pub struct TBufferedWriteTransport<C> |
| 173 | where |
| 174 | C: Write, |
| 175 | { |
| 176 | buf: Vec<u8>, |
| 177 | channel: C, |
| 178 | } |
| 179 | |
| 180 | impl<C> TBufferedWriteTransport<C> |
| 181 | where |
| 182 | C: Write, |
| 183 | { |
| 184 | /// Create a `TBufferedTransport` with default-sized internal read and |
| 185 | /// write buffers that wraps the given `TIoChannel`. |
| 186 | pub fn new(channel: C) -> TBufferedWriteTransport<C> { |
| 187 | TBufferedWriteTransport::with_capacity(WRITE_CAPACITY, channel) |
| 188 | } |
| 189 | |
| 190 | /// Create a `TBufferedTransport` with an internal read buffer of size |
| 191 | /// `read_capacity` and an internal write buffer of size |
| 192 | /// `write_capacity` that wraps the given `TIoChannel`. |
| 193 | pub fn with_capacity(write_capacity: usize, channel: C) -> TBufferedWriteTransport<C> { |
| 194 | TBufferedWriteTransport { |
| 195 | buf: Vec::with_capacity(write_capacity), |
| 196 | channel: channel, |
| 197 | } |
| 198 | } |
| 199 | } |
| 200 | |
| 201 | impl<C> Write for TBufferedWriteTransport<C> |
| 202 | where |
| 203 | C: Write, |
| 204 | { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 205 | fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 206 | let avail_bytes = cmp::min(buf.len(), self.buf.capacity() - self.buf.len()); |
| 207 | self.buf.extend_from_slice(&buf[..avail_bytes]); |
| 208 | assert!( |
| 209 | self.buf.len() <= self.buf.capacity(), |
| 210 | "copy overflowed buffer" |
| 211 | ); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 212 | Ok(avail_bytes) |
| 213 | } |
| 214 | |
| 215 | fn flush(&mut self) -> io::Result<()> { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 216 | self.channel.write_all(&self.buf)?; |
| 217 | self.channel.flush()?; |
| 218 | self.buf.clear(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 219 | Ok(()) |
| 220 | } |
| 221 | } |
| 222 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 223 | /// Factory for creating instances of `TBufferedWriteTransport`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 224 | #[derive(Default)] |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 225 | pub struct TBufferedWriteTransportFactory; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 226 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 227 | impl TBufferedWriteTransportFactory { |
| 228 | pub fn new() -> TBufferedWriteTransportFactory { |
| 229 | TBufferedWriteTransportFactory {} |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 230 | } |
| 231 | } |
| 232 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 233 | impl TWriteTransportFactory for TBufferedWriteTransportFactory { |
| 234 | /// Create a `TBufferedWriteTransport`. |
| 235 | fn create(&self, channel: Box<Write + Send>) -> Box<TWriteTransport + Send> { |
| 236 | Box::new(TBufferedWriteTransport::new(channel)) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 237 | } |
| 238 | } |
| 239 | |
| 240 | #[cfg(test)] |
| 241 | mod tests { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 242 | use std::io::{Read, Write}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 243 | |
| 244 | use super::*; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 245 | use transport::TBufferChannel; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 246 | |
| 247 | #[test] |
| 248 | fn must_return_zero_if_read_buffer_is_empty() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 249 | let mem = TBufferChannel::with_capacity(10, 0); |
| 250 | let mut t = TBufferedReadTransport::with_capacity(10, mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 251 | |
| 252 | let mut b = vec![0; 10]; |
| 253 | let read_result = t.read(&mut b); |
| 254 | |
| 255 | assert_eq!(read_result.unwrap(), 0); |
| 256 | } |
| 257 | |
| 258 | #[test] |
| 259 | fn must_return_zero_if_caller_reads_into_zero_capacity_buffer() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 260 | let mem = TBufferChannel::with_capacity(10, 0); |
| 261 | let mut t = TBufferedReadTransport::with_capacity(10, mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 262 | |
| 263 | let read_result = t.read(&mut []); |
| 264 | |
| 265 | assert_eq!(read_result.unwrap(), 0); |
| 266 | } |
| 267 | |
| 268 | #[test] |
| 269 | fn must_return_zero_if_nothing_more_can_be_read() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 270 | let mem = TBufferChannel::with_capacity(4, 0); |
| 271 | let mut t = TBufferedReadTransport::with_capacity(4, mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 272 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 273 | t.chan.set_readable_bytes(&[0, 1, 2, 3]); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 274 | |
| 275 | // read buffer is exactly the same size as bytes available |
| 276 | let mut buf = vec![0u8; 4]; |
| 277 | let read_result = t.read(&mut buf); |
| 278 | |
| 279 | // we've read exactly 4 bytes |
| 280 | assert_eq!(read_result.unwrap(), 4); |
| 281 | assert_eq!(&buf, &[0, 1, 2, 3]); |
| 282 | |
| 283 | // try read again |
| 284 | let buf_again = vec![0u8; 4]; |
| 285 | let read_result = t.read(&mut buf); |
| 286 | |
| 287 | // this time, 0 bytes and we haven't changed the buffer |
| 288 | assert_eq!(read_result.unwrap(), 0); |
| 289 | assert_eq!(&buf_again, &[0, 0, 0, 0]) |
| 290 | } |
| 291 | |
| 292 | #[test] |
| 293 | fn must_fill_user_buffer_with_only_as_many_bytes_as_available() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 294 | let mem = TBufferChannel::with_capacity(4, 0); |
| 295 | let mut t = TBufferedReadTransport::with_capacity(4, mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 296 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 297 | t.chan.set_readable_bytes(&[0, 1, 2, 3]); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 298 | |
| 299 | // read buffer is much larger than the bytes available |
| 300 | let mut buf = vec![0u8; 8]; |
| 301 | let read_result = t.read(&mut buf); |
| 302 | |
| 303 | // we've read exactly 4 bytes |
| 304 | assert_eq!(read_result.unwrap(), 4); |
| 305 | assert_eq!(&buf[..4], &[0, 1, 2, 3]); |
| 306 | |
| 307 | // try read again |
| 308 | let read_result = t.read(&mut buf[4..]); |
| 309 | |
| 310 | // this time, 0 bytes and we haven't changed the buffer |
| 311 | assert_eq!(read_result.unwrap(), 0); |
| 312 | assert_eq!(&buf, &[0, 1, 2, 3, 0, 0, 0, 0]) |
| 313 | } |
| 314 | |
| 315 | #[test] |
| 316 | fn must_read_successfully() { |
| 317 | // this test involves a few loops within the buffered transport |
| 318 | // itself where it has to drain the underlying transport in order |
| 319 | // to service a read |
| 320 | |
| 321 | // we have a much smaller buffer than the |
| 322 | // underlying transport has bytes available |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 323 | let mem = TBufferChannel::with_capacity(10, 0); |
| 324 | let mut t = TBufferedReadTransport::with_capacity(2, mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 325 | |
| 326 | // fill the underlying transport's byte buffer |
| 327 | let mut readable_bytes = [0u8; 10]; |
| 328 | for i in 0..10 { |
| 329 | readable_bytes[i] = i as u8; |
| 330 | } |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 331 | |
| 332 | t.chan.set_readable_bytes(&readable_bytes); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 333 | |
| 334 | // we ask to read into a buffer that's much larger |
| 335 | // than the one the buffered transport has; as a result |
| 336 | // it's going to have to keep asking the underlying |
| 337 | // transport for more bytes |
| 338 | let mut buf = [0u8; 8]; |
| 339 | let read_result = t.read(&mut buf); |
| 340 | |
| 341 | // we should have read 8 bytes |
| 342 | assert_eq!(read_result.unwrap(), 8); |
| 343 | assert_eq!(&buf, &[0, 1, 2, 3, 4, 5, 6, 7]); |
| 344 | |
| 345 | // let's clear out the buffer and try read again |
| 346 | for i in 0..8 { |
| 347 | buf[i] = 0; |
| 348 | } |
| 349 | let read_result = t.read(&mut buf); |
| 350 | |
| 351 | // this time we were only able to read 2 bytes |
| 352 | // (all that's remaining from the underlying transport) |
| 353 | // let's also check that the remaining bytes are untouched |
| 354 | assert_eq!(read_result.unwrap(), 2); |
| 355 | assert_eq!(&buf[0..2], &[8, 9]); |
| 356 | assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]); |
| 357 | |
| 358 | // try read again (we should get 0) |
| 359 | // and all the existing bytes were untouched |
| 360 | let read_result = t.read(&mut buf); |
| 361 | assert_eq!(read_result.unwrap(), 0); |
| 362 | assert_eq!(&buf[0..2], &[8, 9]); |
| 363 | assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]); |
| 364 | } |
| 365 | |
| 366 | #[test] |
| 367 | fn must_return_zero_if_nothing_can_be_written() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 368 | let mem = TBufferChannel::with_capacity(0, 0); |
| 369 | let mut t = TBufferedWriteTransport::with_capacity(0, mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 370 | |
| 371 | let b = vec![0; 10]; |
| 372 | let r = t.write(&b); |
| 373 | |
| 374 | assert_eq!(r.unwrap(), 0); |
| 375 | } |
| 376 | |
| 377 | #[test] |
| 378 | fn must_return_zero_if_caller_calls_write_with_empty_buffer() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 379 | let mem = TBufferChannel::with_capacity(0, 10); |
| 380 | let mut t = TBufferedWriteTransport::with_capacity(10, mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 381 | |
| 382 | let r = t.write(&[]); |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 383 | let expected: [u8; 0] = []; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 384 | |
| 385 | assert_eq!(r.unwrap(), 0); |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 386 | assert_eq_transport_written_bytes!(t, expected); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 387 | } |
| 388 | |
| 389 | #[test] |
| 390 | fn must_return_zero_if_write_buffer_full() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 391 | let mem = TBufferChannel::with_capacity(0, 0); |
| 392 | let mut t = TBufferedWriteTransport::with_capacity(4, mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 393 | |
| 394 | let b = [0x00, 0x01, 0x02, 0x03]; |
| 395 | |
| 396 | // we've now filled the write buffer |
| 397 | let r = t.write(&b); |
| 398 | assert_eq!(r.unwrap(), 4); |
| 399 | |
| 400 | // try write the same bytes again - nothing should be writable |
| 401 | let r = t.write(&b); |
| 402 | assert_eq!(r.unwrap(), 0); |
| 403 | } |
| 404 | |
| 405 | #[test] |
| 406 | fn must_only_write_to_inner_transport_on_flush() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 407 | let mem = TBufferChannel::with_capacity(10, 10); |
| 408 | let mut t = TBufferedWriteTransport::new(mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 409 | |
| 410 | let b: [u8; 5] = [0, 1, 2, 3, 4]; |
| 411 | assert_eq!(t.write(&b).unwrap(), 5); |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 412 | assert_eq_transport_num_written_bytes!(t, 0); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 413 | |
| 414 | assert!(t.flush().is_ok()); |
| 415 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 416 | assert_eq_transport_written_bytes!(t, b); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 417 | } |
| 418 | |
| 419 | #[test] |
| 420 | fn must_write_successfully_after_flush() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 421 | let mem = TBufferChannel::with_capacity(0, 5); |
| 422 | let mut t = TBufferedWriteTransport::with_capacity(5, mem); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 423 | |
| 424 | // write and flush |
| 425 | let b: [u8; 5] = [0, 1, 2, 3, 4]; |
| 426 | assert_eq!(t.write(&b).unwrap(), 5); |
| 427 | assert!(t.flush().is_ok()); |
| 428 | |
| 429 | // check the flushed bytes |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 430 | assert_eq_transport_written_bytes!(t, b); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 431 | |
| 432 | // reset our underlying transport |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 433 | t.channel.empty_write_buffer(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 434 | |
| 435 | // write and flush again |
| 436 | assert_eq!(t.write(&b).unwrap(), 5); |
| 437 | assert!(t.flush().is_ok()); |
| 438 | |
| 439 | // check the flushed bytes |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 440 | assert_eq_transport_written_bytes!(t, b); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 441 | } |
| 442 | } |