blob: 8ec2a984de4dbc183168739ada89e564e3511183 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp;
19use std::io;
20
21/// Simple transport that contains both a fixed-length internal read buffer and
22/// a fixed-length internal write buffer.
23///
24/// On a `write` bytes are written to the internal write buffer. Writes are no
25/// longer accepted once this buffer is full. Callers must `empty_write_buffer()`
26/// before subsequent writes are accepted.
27///
28/// You can set readable bytes in the internal read buffer by filling it with
29/// `set_readable_bytes(...)`. Callers can then read until the buffer is
30/// depleted. No further reads are accepted until the internal read buffer is
31/// replenished again.
32pub struct TBufferTransport {
33 rbuf: Box<[u8]>,
34 rpos: usize,
35 ridx: usize,
36 rcap: usize,
37 wbuf: Box<[u8]>,
38 wpos: usize,
39 wcap: usize,
40}
41
42impl TBufferTransport {
43 /// Constructs a new, empty `TBufferTransport` with the given
44 /// read buffer capacity and write buffer capacity.
45 pub fn with_capacity(read_buffer_capacity: usize,
46 write_buffer_capacity: usize)
47 -> TBufferTransport {
48 TBufferTransport {
49 rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
50 ridx: 0,
51 rpos: 0,
52 rcap: read_buffer_capacity,
53 wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
54 wpos: 0,
55 wcap: write_buffer_capacity,
56 }
57 }
58
59 /// Return a slice containing the bytes held by the internal read buffer.
60 /// Returns an empty slice if no readable bytes are present.
61 pub fn read_buffer(&self) -> &[u8] {
62 &self.rbuf[..self.ridx]
63 }
64
65 // FIXME: do I really need this API call?
66 // FIXME: should this simply reset to the last set of readable bytes?
67 /// Reset the number of readable bytes to zero.
68 ///
69 /// Subsequent calls to `read` will return nothing.
70 pub fn empty_read_buffer(&mut self) {
71 self.rpos = 0;
72 self.ridx = 0;
73 }
74
75 /// Copy bytes from the source buffer `buf` into the internal read buffer,
76 /// overwriting any existing bytes. Returns the number of bytes copied,
77 /// which is `min(buf.len(), internal_read_buf.len())`.
78 pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
79 self.empty_read_buffer();
80 let max_bytes = cmp::min(self.rcap, buf.len());
81 self.rbuf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
82 self.ridx = max_bytes;
83 max_bytes
84 }
85
86 /// Return a slice containing the bytes held by the internal write buffer.
87 /// Returns an empty slice if no bytes were written.
88 pub fn write_buffer_as_ref(&self) -> &[u8] {
89 &self.wbuf[..self.wpos]
90 }
91
92 /// Return a vector with a copy of the bytes held by the internal write buffer.
93 /// Returns an empty vector if no bytes were written.
94 pub fn write_buffer_to_vec(&self) -> Vec<u8> {
95 let mut buf = vec![0u8; self.wpos];
96 buf.copy_from_slice(&self.wbuf[..self.wpos]);
97 buf
98 }
99
100 /// Resets the internal write buffer, making it seem like no bytes were
101 /// written. Calling `write_buffer` after this returns an empty slice.
102 pub fn empty_write_buffer(&mut self) {
103 self.wpos = 0;
104 }
105
106 /// Overwrites the contents of the read buffer with the contents of the
107 /// write buffer. The write buffer is emptied after this operation.
108 pub fn copy_write_buffer_to_read_buffer(&mut self) {
109 let buf = {
110 let b = self.write_buffer_as_ref();
111 let mut b_ret = vec![0; b.len()];
112 b_ret.copy_from_slice(&b);
113 b_ret
114 };
115
116 let bytes_copied = self.set_readable_bytes(&buf);
117 assert_eq!(bytes_copied, buf.len());
118
119 self.empty_write_buffer();
120 }
121}
122
123impl io::Read for TBufferTransport {
124 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
125 let nread = cmp::min(buf.len(), self.ridx - self.rpos);
126 buf[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
127 self.rpos += nread;
128 Ok(nread)
129 }
130}
131
132impl io::Write for TBufferTransport {
133 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
134 let nwrite = cmp::min(buf.len(), self.wcap - self.wpos);
135 self.wbuf[self.wpos..self.wpos + nwrite].clone_from_slice(&buf[..nwrite]);
136 self.wpos += nwrite;
137 Ok(nwrite)
138 }
139
140 fn flush(&mut self) -> io::Result<()> {
141 Ok(()) // nothing to do on flush
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use std::io::{Read, Write};
148
149 use super::TBufferTransport;
150
151 #[test]
152 fn must_empty_write_buffer() {
153 let mut t = TBufferTransport::with_capacity(0, 1);
154
155 let bytes_to_write: [u8; 1] = [0x01];
156 let result = t.write(&bytes_to_write);
157 assert_eq!(result.unwrap(), 1);
158 assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
159
160 t.empty_write_buffer();
161 assert_eq!(t.write_buffer_as_ref().len(), 0);
162 }
163
164 #[test]
165 fn must_accept_writes_after_buffer_emptied() {
166 let mut t = TBufferTransport::with_capacity(0, 2);
167
168 let bytes_to_write: [u8; 2] = [0x01, 0x02];
169
170 // first write (all bytes written)
171 let result = t.write(&bytes_to_write);
172 assert_eq!(result.unwrap(), 2);
173 assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
174
175 // try write again (nothing should be written)
176 let result = t.write(&bytes_to_write);
177 assert_eq!(result.unwrap(), 0);
178 assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write); // still the same as before
179
180 // now reset the buffer
181 t.empty_write_buffer();
182 assert_eq!(t.write_buffer_as_ref().len(), 0);
183
184 // now try write again - the write should succeed
185 let result = t.write(&bytes_to_write);
186 assert_eq!(result.unwrap(), 2);
187 assert_eq!(&t.write_buffer_as_ref(), &bytes_to_write);
188 }
189
190 #[test]
191 fn must_accept_multiple_writes_until_buffer_is_full() {
192 let mut t = TBufferTransport::with_capacity(0, 10);
193
194 // first write (all bytes written)
195 let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
196 let write_0_result = t.write(&bytes_to_write_0);
197 assert_eq!(write_0_result.unwrap(), 2);
198 assert_eq!(t.write_buffer_as_ref(), &bytes_to_write_0);
199
200 // second write (all bytes written, starting at index 2)
201 let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
202 let write_1_result = t.write(&bytes_to_write_1);
203 assert_eq!(write_1_result.unwrap(), 7);
204 assert_eq!(&t.write_buffer_as_ref()[2..], &bytes_to_write_1);
205
206 // third write (only 1 byte written - that's all we have space for)
207 let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
208 let write_2_result = t.write(&bytes_to_write_2);
209 assert_eq!(write_2_result.unwrap(), 1);
210 assert_eq!(&t.write_buffer_as_ref()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
211
212 // fourth write (no writes are accepted)
213 let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
214 let write_3_result = t.write(&bytes_to_write_3);
215 assert_eq!(write_3_result.unwrap(), 0);
216
217 // check the full write buffer
218 let mut expected: Vec<u8> = Vec::with_capacity(10);
219 expected.extend_from_slice(&bytes_to_write_0);
220 expected.extend_from_slice(&bytes_to_write_1);
221 expected.extend_from_slice(&bytes_to_write_2[0..1]);
222 assert_eq!(t.write_buffer_as_ref(), &expected[..]);
223 }
224
225 #[test]
226 fn must_empty_read_buffer() {
227 let mut t = TBufferTransport::with_capacity(1, 0);
228
229 let bytes_to_read: [u8; 1] = [0x01];
230 let result = t.set_readable_bytes(&bytes_to_read);
231 assert_eq!(result, 1);
232 assert_eq!(&t.read_buffer(), &bytes_to_read);
233
234 t.empty_read_buffer();
235 assert_eq!(t.read_buffer().len(), 0);
236 }
237
238 #[test]
239 fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
240 let mut t = TBufferTransport::with_capacity(1, 0);
241
242 let bytes_to_read_0: [u8; 1] = [0x01];
243 let result = t.set_readable_bytes(&bytes_to_read_0);
244 assert_eq!(result, 1);
245 assert_eq!(&t.read_buffer(), &bytes_to_read_0);
246
247 t.empty_read_buffer();
248 assert_eq!(t.read_buffer().len(), 0);
249
250 let bytes_to_read_1: [u8; 1] = [0x02];
251 let result = t.set_readable_bytes(&bytes_to_read_1);
252 assert_eq!(result, 1);
253 assert_eq!(&t.read_buffer(), &bytes_to_read_1);
254 }
255
256 #[test]
257 fn must_accept_multiple_reads_until_all_bytes_read() {
258 let mut t = TBufferTransport::with_capacity(10, 0);
259
260 let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
261
262 // check that we're able to set the bytes to be read
263 let result = t.set_readable_bytes(&readable_bytes);
264 assert_eq!(result, 10);
265 assert_eq!(&t.read_buffer(), &readable_bytes);
266
267 // first read
268 let mut read_buf_0 = vec![0; 5];
269 let read_result = t.read(&mut read_buf_0);
270 assert_eq!(read_result.unwrap(), 5);
271 assert_eq!(read_buf_0.as_slice(), &(readable_bytes[0..5]));
272
273 // second read
274 let mut read_buf_1 = vec![0; 4];
275 let read_result = t.read(&mut read_buf_1);
276 assert_eq!(read_result.unwrap(), 4);
277 assert_eq!(read_buf_1.as_slice(), &(readable_bytes[5..9]));
278
279 // third read (only 1 byte remains to be read)
280 let mut read_buf_2 = vec![0; 3];
281 let read_result = t.read(&mut read_buf_2);
282 assert_eq!(read_result.unwrap(), 1);
283 read_buf_2.truncate(1); // FIXME: does the caller have to do this?
284 assert_eq!(read_buf_2.as_slice(), &(readable_bytes[9..]));
285
286 // fourth read (nothing should be readable)
287 let mut read_buf_3 = vec![0; 10];
288 let read_result = t.read(&mut read_buf_3);
289 assert_eq!(read_result.unwrap(), 0);
290 read_buf_3.truncate(0);
291
292 // check that all the bytes we received match the original (again!)
293 let mut bytes_read = Vec::with_capacity(10);
294 bytes_read.extend_from_slice(&read_buf_0);
295 bytes_read.extend_from_slice(&read_buf_1);
296 bytes_read.extend_from_slice(&read_buf_2);
297 bytes_read.extend_from_slice(&read_buf_3);
298 assert_eq!(&bytes_read, &readable_bytes);
299 }
300
301 #[test]
302 fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
303 let mut t = TBufferTransport::with_capacity(3, 0);
304
305 let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
306
307 // check that we're able to set the bytes to be read
308 let result = t.set_readable_bytes(&readable_bytes_0);
309 assert_eq!(result, 3);
310 assert_eq!(&t.read_buffer(), &readable_bytes_0);
311
312 let mut read_buf = vec![0; 4];
313
314 // drain the read buffer
315 let read_result = t.read(&mut read_buf);
316 assert_eq!(read_result.unwrap(), 3);
317 assert_eq!(t.read_buffer(), &read_buf[0..3]);
318
319 // check that a subsequent read fails
320 let read_result = t.read(&mut read_buf);
321 assert_eq!(read_result.unwrap(), 0);
322
323 // we don't modify the read buffer on failure
324 let mut expected_bytes = Vec::with_capacity(4);
325 expected_bytes.extend_from_slice(&readable_bytes_0);
326 expected_bytes.push(0x00);
327 assert_eq!(&read_buf, &expected_bytes);
328
329 // replenish the read buffer again
330 let readable_bytes_1: [u8; 2] = [0x91, 0xAA];
331
332 // check that we're able to set the bytes to be read
333 let result = t.set_readable_bytes(&readable_bytes_1);
334 assert_eq!(result, 2);
335 assert_eq!(&t.read_buffer(), &readable_bytes_1);
336
337 // read again
338 let read_result = t.read(&mut read_buf);
339 assert_eq!(read_result.unwrap(), 2);
340 assert_eq!(t.read_buffer(), &read_buf[0..2]);
341 }
342}