blob: 3f240d82a1aabbf1e9268ac9068c02ee06137f4a [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cell::RefCell;
19use std::cmp;
20use std::io;
21use std::io::{Read, Write};
22use std::rc::Rc;
23
24use super::{TTransport, TTransportFactory};
25
26/// Default capacity of the read buffer in bytes.
27const DEFAULT_RBUFFER_CAPACITY: usize = 4096;
28
29/// Default capacity of the write buffer in bytes..
30const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
31
32/// Transport that communicates with endpoints using a byte stream.
33///
34/// A `TBufferedTransport` maintains a fixed-size internal write buffer. All
35/// writes are made to this buffer and are sent to the wrapped transport only
36/// when `TTransport::flush()` is called. On a flush a fixed-length header with a
37/// count of the buffered bytes is written, followed by the bytes themselves.
38///
39/// A `TBufferedTransport` also maintains a fixed-size internal read buffer.
40/// On a call to `TTransport::read(...)` one full message - both fixed-length
41/// header and bytes - is read from the wrapped transport and buffered.
42/// Subsequent read calls are serviced from the internal buffer until it is
43/// exhausted, at which point the next full message is read from the wrapped
44/// transport.
45///
46/// # Examples
47///
48/// Create and use a `TBufferedTransport`.
49///
50/// ```no_run
51/// use std::cell::RefCell;
52/// use std::rc::Rc;
53/// use std::io::{Read, Write};
54/// use thrift::transport::{TBufferedTransport, TTcpTransport, TTransport};
55///
56/// let mut t = TTcpTransport::new();
57/// t.open("localhost:9090").unwrap();
58///
59/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
60/// let mut t = TBufferedTransport::new(t);
61///
62/// // read
63/// t.read(&mut vec![0u8; 1]).unwrap();
64///
65/// // write
66/// t.write(&[0x00]).unwrap();
67/// t.flush().unwrap();
68/// ```
69pub struct TBufferedTransport {
70 rbuf: Box<[u8]>,
71 rpos: usize,
72 rcap: usize,
73 wbuf: Vec<u8>,
74 inner: Rc<RefCell<Box<TTransport>>>,
75}
76
77impl TBufferedTransport {
78 /// Create a `TBufferedTransport` with default-sized internal read and
79 /// write buffers that wraps an `inner` `TTransport`.
80 pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TBufferedTransport {
81 TBufferedTransport::with_capacity(DEFAULT_RBUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner)
82 }
83
84 /// Create a `TBufferedTransport` with an internal read buffer of size
85 /// `read_buffer_capacity` and an internal write buffer of size
86 /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
87 pub fn with_capacity(read_buffer_capacity: usize,
88 write_buffer_capacity: usize,
89 inner: Rc<RefCell<Box<TTransport>>>)
90 -> TBufferedTransport {
91 TBufferedTransport {
92 rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
93 rpos: 0,
94 rcap: 0,
95 wbuf: Vec::with_capacity(write_buffer_capacity),
96 inner: inner,
97 }
98 }
99
100 fn get_bytes(&mut self) -> io::Result<&[u8]> {
101 if self.rcap - self.rpos == 0 {
102 self.rpos = 0;
103 self.rcap = self.inner.borrow_mut().read(&mut self.rbuf)?;
104 }
105
106 Ok(&self.rbuf[self.rpos..self.rcap])
107 }
108
109 fn consume(&mut self, consumed: usize) {
110 // TODO: was a bug here += <-- test somehow
111 self.rpos = cmp::min(self.rcap, self.rpos + consumed);
112 }
113}
114
115impl Read for TBufferedTransport {
116 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
117 let mut bytes_read = 0;
118
119 loop {
120 let nread = {
121 let avail_bytes = self.get_bytes()?;
122 let avail_space = buf.len() - bytes_read;
123 let nread = cmp::min(avail_space, avail_bytes.len());
124 buf[bytes_read..(bytes_read + nread)].copy_from_slice(&avail_bytes[..nread]);
125 nread
126 };
127
128 self.consume(nread);
129 bytes_read += nread;
130
131 if bytes_read == buf.len() || nread == 0 {
132 break;
133 }
134 }
135
136 Ok(bytes_read)
137 }
138}
139
140impl Write for TBufferedTransport {
141 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
142 let avail_bytes = cmp::min(buf.len(), self.wbuf.capacity() - self.wbuf.len());
143 self.wbuf.extend_from_slice(&buf[..avail_bytes]);
144 assert!(self.wbuf.len() <= self.wbuf.capacity(),
145 "copy overflowed buffer");
146 Ok(avail_bytes)
147 }
148
149 fn flush(&mut self) -> io::Result<()> {
150 self.inner.borrow_mut().write_all(&self.wbuf)?;
151 self.inner.borrow_mut().flush()?;
152 self.wbuf.clear();
153 Ok(())
154 }
155}
156
157/// Factory for creating instances of `TBufferedTransport`
158#[derive(Default)]
159pub struct TBufferedTransportFactory;
160
161impl TBufferedTransportFactory {
162 /// Create a `TBufferedTransportFactory`.
163 pub fn new() -> TBufferedTransportFactory {
164 TBufferedTransportFactory {}
165 }
166}
167
168impl TTransportFactory for TBufferedTransportFactory {
169 fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
170 Box::new(TBufferedTransport::new(inner)) as Box<TTransport>
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use std::cell::RefCell;
177 use std::io::{Read, Write};
178 use std::rc::Rc;
179
180 use super::*;
181 use ::transport::{TPassThruTransport, TTransport};
182 use ::transport::mem::TBufferTransport;
183
184 macro_rules! new_transports {
185 ($wbc:expr, $rbc:expr) => (
186 {
187 let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity($wbc, $rbc))));
188 let thru: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() });
189 let thru = Rc::new(RefCell::new(thru));
190 (mem, thru)
191 }
192 );
193 }
194
195 #[test]
196 fn must_return_zero_if_read_buffer_is_empty() {
197 let (_, thru) = new_transports!(10, 0);
198 let mut t = TBufferedTransport::with_capacity(10, 0, thru);
199
200 let mut b = vec![0; 10];
201 let read_result = t.read(&mut b);
202
203 assert_eq!(read_result.unwrap(), 0);
204 }
205
206 #[test]
207 fn must_return_zero_if_caller_reads_into_zero_capacity_buffer() {
208 let (_, thru) = new_transports!(10, 0);
209 let mut t = TBufferedTransport::with_capacity(10, 0, thru);
210
211 let read_result = t.read(&mut []);
212
213 assert_eq!(read_result.unwrap(), 0);
214 }
215
216 #[test]
217 fn must_return_zero_if_nothing_more_can_be_read() {
218 let (mem, thru) = new_transports!(4, 0);
219 let mut t = TBufferedTransport::with_capacity(4, 0, thru);
220
221 mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
222
223 // read buffer is exactly the same size as bytes available
224 let mut buf = vec![0u8; 4];
225 let read_result = t.read(&mut buf);
226
227 // we've read exactly 4 bytes
228 assert_eq!(read_result.unwrap(), 4);
229 assert_eq!(&buf, &[0, 1, 2, 3]);
230
231 // try read again
232 let buf_again = vec![0u8; 4];
233 let read_result = t.read(&mut buf);
234
235 // this time, 0 bytes and we haven't changed the buffer
236 assert_eq!(read_result.unwrap(), 0);
237 assert_eq!(&buf_again, &[0, 0, 0, 0])
238 }
239
240 #[test]
241 fn must_fill_user_buffer_with_only_as_many_bytes_as_available() {
242 let (mem, thru) = new_transports!(4, 0);
243 let mut t = TBufferedTransport::with_capacity(4, 0, thru);
244
245 mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
246
247 // read buffer is much larger than the bytes available
248 let mut buf = vec![0u8; 8];
249 let read_result = t.read(&mut buf);
250
251 // we've read exactly 4 bytes
252 assert_eq!(read_result.unwrap(), 4);
253 assert_eq!(&buf[..4], &[0, 1, 2, 3]);
254
255 // try read again
256 let read_result = t.read(&mut buf[4..]);
257
258 // this time, 0 bytes and we haven't changed the buffer
259 assert_eq!(read_result.unwrap(), 0);
260 assert_eq!(&buf, &[0, 1, 2, 3, 0, 0, 0, 0])
261 }
262
263 #[test]
264 fn must_read_successfully() {
265 // this test involves a few loops within the buffered transport
266 // itself where it has to drain the underlying transport in order
267 // to service a read
268
269 // we have a much smaller buffer than the
270 // underlying transport has bytes available
271 let (mem, thru) = new_transports!(10, 0);
272 let mut t = TBufferedTransport::with_capacity(2, 0, thru);
273
274 // fill the underlying transport's byte buffer
275 let mut readable_bytes = [0u8; 10];
276 for i in 0..10 {
277 readable_bytes[i] = i as u8;
278 }
279 mem.borrow_mut().set_readable_bytes(&readable_bytes);
280
281 // we ask to read into a buffer that's much larger
282 // than the one the buffered transport has; as a result
283 // it's going to have to keep asking the underlying
284 // transport for more bytes
285 let mut buf = [0u8; 8];
286 let read_result = t.read(&mut buf);
287
288 // we should have read 8 bytes
289 assert_eq!(read_result.unwrap(), 8);
290 assert_eq!(&buf, &[0, 1, 2, 3, 4, 5, 6, 7]);
291
292 // let's clear out the buffer and try read again
293 for i in 0..8 {
294 buf[i] = 0;
295 }
296 let read_result = t.read(&mut buf);
297
298 // this time we were only able to read 2 bytes
299 // (all that's remaining from the underlying transport)
300 // let's also check that the remaining bytes are untouched
301 assert_eq!(read_result.unwrap(), 2);
302 assert_eq!(&buf[0..2], &[8, 9]);
303 assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]);
304
305 // try read again (we should get 0)
306 // and all the existing bytes were untouched
307 let read_result = t.read(&mut buf);
308 assert_eq!(read_result.unwrap(), 0);
309 assert_eq!(&buf[0..2], &[8, 9]);
310 assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]);
311 }
312
313 #[test]
314 fn must_return_zero_if_nothing_can_be_written() {
315 let (_, thru) = new_transports!(0, 0);
316 let mut t = TBufferedTransport::with_capacity(0, 0, thru);
317
318 let b = vec![0; 10];
319 let r = t.write(&b);
320
321 assert_eq!(r.unwrap(), 0);
322 }
323
324 #[test]
325 fn must_return_zero_if_caller_calls_write_with_empty_buffer() {
326 let (mem, thru) = new_transports!(0, 10);
327 let mut t = TBufferedTransport::with_capacity(0, 10, thru);
328
329 let r = t.write(&[]);
330
331 assert_eq!(r.unwrap(), 0);
332 assert_eq!(mem.borrow_mut().write_buffer_as_ref(), &[]);
333 }
334
335 #[test]
336 fn must_return_zero_if_write_buffer_full() {
337 let (_, thru) = new_transports!(0, 0);
338 let mut t = TBufferedTransport::with_capacity(0, 4, thru);
339
340 let b = [0x00, 0x01, 0x02, 0x03];
341
342 // we've now filled the write buffer
343 let r = t.write(&b);
344 assert_eq!(r.unwrap(), 4);
345
346 // try write the same bytes again - nothing should be writable
347 let r = t.write(&b);
348 assert_eq!(r.unwrap(), 0);
349 }
350
351 #[test]
352 fn must_only_write_to_inner_transport_on_flush() {
353 let (mem, thru) = new_transports!(10, 10);
354 let mut t = TBufferedTransport::new(thru);
355
356 let b: [u8; 5] = [0, 1, 2, 3, 4];
357 assert_eq!(t.write(&b).unwrap(), 5);
358 assert_eq!(mem.borrow_mut().write_buffer_as_ref().len(), 0);
359
360 assert!(t.flush().is_ok());
361
362 {
363 let inner = mem.borrow_mut();
364 let underlying_buffer = inner.write_buffer_as_ref();
365 assert_eq!(b, underlying_buffer);
366 }
367 }
368
369 #[test]
370 fn must_write_successfully_after_flush() {
371 let (mem, thru) = new_transports!(0, 5);
372 let mut t = TBufferedTransport::with_capacity(0, 5, thru);
373
374 // write and flush
375 let b: [u8; 5] = [0, 1, 2, 3, 4];
376 assert_eq!(t.write(&b).unwrap(), 5);
377 assert!(t.flush().is_ok());
378
379 // check the flushed bytes
380 {
381 let inner = mem.borrow_mut();
382 let underlying_buffer = inner.write_buffer_as_ref();
383 assert_eq!(b, underlying_buffer);
384 }
385
386 // reset our underlying transport
387 mem.borrow_mut().empty_write_buffer();
388
389 // write and flush again
390 assert_eq!(t.write(&b).unwrap(), 5);
391 assert!(t.flush().is_ok());
392
393 // check the flushed bytes
394 {
395 let inner = mem.borrow_mut();
396 let underlying_buffer = inner.write_buffer_as_ref();
397 assert_eq!(b, underlying_buffer);
398 }
399 }
400}