blob: 86ac6bb25f288d3beec2ad893eb0e7928785f1dd [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp;
19use std::io;
Allen George0e22c362017-01-30 07:15:00 -050020use std::sync::{Arc, Mutex};
Allen George8b96bfb2016-11-02 08:01:08 -040021
Allen George0e22c362017-01-30 07:15:00 -050022use super::{ReadHalf, TIoChannel, WriteHalf};
23
24/// In-memory read and write channel with fixed-size read and write buffers.
Allen George8b96bfb2016-11-02 08:01:08 -040025///
26/// On a `write` bytes are written to the internal write buffer. Writes are no
27/// longer accepted once this buffer is full. Callers must `empty_write_buffer()`
28/// before subsequent writes are accepted.
29///
30/// You can set readable bytes in the internal read buffer by filling it with
31/// `set_readable_bytes(...)`. Callers can then read until the buffer is
32/// depleted. No further reads are accepted until the internal read buffer is
33/// replenished again.
Allen George0e22c362017-01-30 07:15:00 -050034#[derive(Debug)]
35pub struct TBufferChannel {
36 read: Arc<Mutex<ReadData>>,
37 write: Arc<Mutex<WriteData>>,
Allen George8b96bfb2016-11-02 08:01:08 -040038}
39
Allen George0e22c362017-01-30 07:15:00 -050040#[derive(Debug)]
41struct ReadData {
42 buf: Box<[u8]>,
43 pos: usize,
44 idx: usize,
45 cap: usize,
46}
47
48#[derive(Debug)]
49struct WriteData {
50 buf: Box<[u8]>,
51 pos: usize,
52 cap: usize,
53}
54
55impl TBufferChannel {
56 /// Constructs a new, empty `TBufferChannel` with the given
Allen George8b96bfb2016-11-02 08:01:08 -040057 /// read buffer capacity and write buffer capacity.
Allen George0e22c362017-01-30 07:15:00 -050058 pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel {
59 TBufferChannel {
60 read: Arc::new(
61 Mutex::new(
62 ReadData {
63 buf: vec![0; read_capacity].into_boxed_slice(),
64 idx: 0,
65 pos: 0,
66 cap: read_capacity,
67 },
68 ),
69 ),
70 write: Arc::new(
71 Mutex::new(
72 WriteData {
73 buf: vec![0; write_capacity].into_boxed_slice(),
74 pos: 0,
75 cap: write_capacity,
76 },
77 ),
78 ),
Allen George8b96bfb2016-11-02 08:01:08 -040079 }
80 }
81
Allen George0e22c362017-01-30 07:15:00 -050082 /// Return a copy of the bytes held by the internal read buffer.
83 /// Returns an empty vector if no readable bytes are present.
84 pub fn read_bytes(&self) -> Vec<u8> {
85 let rdata = self.read.as_ref().lock().unwrap();
86 let mut buf = vec![0u8; rdata.idx];
87 buf.copy_from_slice(&rdata.buf[..rdata.idx]);
88 buf
Allen George8b96bfb2016-11-02 08:01:08 -040089 }
90
91 // FIXME: do I really need this API call?
92 // FIXME: should this simply reset to the last set of readable bytes?
93 /// Reset the number of readable bytes to zero.
94 ///
95 /// Subsequent calls to `read` will return nothing.
96 pub fn empty_read_buffer(&mut self) {
Allen George0e22c362017-01-30 07:15:00 -050097 let mut rdata = self.read.as_ref().lock().unwrap();
98 rdata.pos = 0;
99 rdata.idx = 0;
Allen George8b96bfb2016-11-02 08:01:08 -0400100 }
101
102 /// Copy bytes from the source buffer `buf` into the internal read buffer,
103 /// overwriting any existing bytes. Returns the number of bytes copied,
104 /// which is `min(buf.len(), internal_read_buf.len())`.
105 pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
106 self.empty_read_buffer();
Allen George0e22c362017-01-30 07:15:00 -0500107 let mut rdata = self.read.as_ref().lock().unwrap();
108 let max_bytes = cmp::min(rdata.cap, buf.len());
109 rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
110 rdata.idx = max_bytes;
Allen George8b96bfb2016-11-02 08:01:08 -0400111 max_bytes
112 }
113
Allen George0e22c362017-01-30 07:15:00 -0500114 /// Return a copy of the bytes held by the internal write buffer.
Allen George8b96bfb2016-11-02 08:01:08 -0400115 /// Returns an empty vector if no bytes were written.
Allen George0e22c362017-01-30 07:15:00 -0500116 pub fn write_bytes(&self) -> Vec<u8> {
117 let wdata = self.write.as_ref().lock().unwrap();
118 let mut buf = vec![0u8; wdata.pos];
119 buf.copy_from_slice(&wdata.buf[..wdata.pos]);
Allen George8b96bfb2016-11-02 08:01:08 -0400120 buf
121 }
122
123 /// Resets the internal write buffer, making it seem like no bytes were
Allen George0e22c362017-01-30 07:15:00 -0500124 /// written. Calling `write_buffer` after this returns an empty vector.
Allen George8b96bfb2016-11-02 08:01:08 -0400125 pub fn empty_write_buffer(&mut self) {
Allen George0e22c362017-01-30 07:15:00 -0500126 let mut wdata = self.write.as_ref().lock().unwrap();
127 wdata.pos = 0;
Allen George8b96bfb2016-11-02 08:01:08 -0400128 }
129
130 /// Overwrites the contents of the read buffer with the contents of the
131 /// write buffer. The write buffer is emptied after this operation.
132 pub fn copy_write_buffer_to_read_buffer(&mut self) {
Allen George0e22c362017-01-30 07:15:00 -0500133 // FIXME: redo this entire method
Allen George8b96bfb2016-11-02 08:01:08 -0400134 let buf = {
Allen George0e22c362017-01-30 07:15:00 -0500135 let wdata = self.write.as_ref().lock().unwrap();
136 let b = &wdata.buf[..wdata.pos];
Allen George8b96bfb2016-11-02 08:01:08 -0400137 let mut b_ret = vec![0; b.len()];
Allen George92e1c402017-04-06 08:28:22 -0400138 b_ret.copy_from_slice(b);
Allen George8b96bfb2016-11-02 08:01:08 -0400139 b_ret
140 };
141
142 let bytes_copied = self.set_readable_bytes(&buf);
143 assert_eq!(bytes_copied, buf.len());
144
145 self.empty_write_buffer();
146 }
147}
148
Allen George0e22c362017-01-30 07:15:00 -0500149impl TIoChannel for TBufferChannel {
150 fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
151 where
152 Self: Sized,
153 {
154 Ok(
155 (ReadHalf {
156 handle: TBufferChannel {
157 read: self.read.clone(),
158 write: self.write.clone(),
159 },
160 },
161 WriteHalf {
162 handle: TBufferChannel {
163 read: self.read.clone(),
164 write: self.write.clone(),
165 },
166 }),
167 )
168 }
169}
170
171impl io::Read for TBufferChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400172 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
Allen George0e22c362017-01-30 07:15:00 -0500173 let mut rdata = self.read.as_ref().lock().unwrap();
174 let nread = cmp::min(buf.len(), rdata.idx - rdata.pos);
175 buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]);
176 rdata.pos += nread;
Allen George8b96bfb2016-11-02 08:01:08 -0400177 Ok(nread)
178 }
179}
180
Allen George0e22c362017-01-30 07:15:00 -0500181impl io::Write for TBufferChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400182 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
Allen George0e22c362017-01-30 07:15:00 -0500183 let mut wdata = self.write.as_ref().lock().unwrap();
184 let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos);
185 let (start, end) = (wdata.pos, wdata.pos + nwrite);
186 wdata.buf[start..end].clone_from_slice(&buf[..nwrite]);
187 wdata.pos += nwrite;
Allen George8b96bfb2016-11-02 08:01:08 -0400188 Ok(nwrite)
189 }
190
191 fn flush(&mut self) -> io::Result<()> {
192 Ok(()) // nothing to do on flush
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::io::{Read, Write};
199
Allen George0e22c362017-01-30 07:15:00 -0500200 use super::TBufferChannel;
Allen George8b96bfb2016-11-02 08:01:08 -0400201
202 #[test]
203 fn must_empty_write_buffer() {
Allen George0e22c362017-01-30 07:15:00 -0500204 let mut t = TBufferChannel::with_capacity(0, 1);
Allen George8b96bfb2016-11-02 08:01:08 -0400205
206 let bytes_to_write: [u8; 1] = [0x01];
207 let result = t.write(&bytes_to_write);
208 assert_eq!(result.unwrap(), 1);
Allen George0e22c362017-01-30 07:15:00 -0500209 assert_eq!(&t.write_bytes(), &bytes_to_write);
Allen George8b96bfb2016-11-02 08:01:08 -0400210
211 t.empty_write_buffer();
Allen George0e22c362017-01-30 07:15:00 -0500212 assert_eq!(t.write_bytes().len(), 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400213 }
214
215 #[test]
216 fn must_accept_writes_after_buffer_emptied() {
Allen George0e22c362017-01-30 07:15:00 -0500217 let mut t = TBufferChannel::with_capacity(0, 2);
Allen George8b96bfb2016-11-02 08:01:08 -0400218
219 let bytes_to_write: [u8; 2] = [0x01, 0x02];
220
221 // first write (all bytes written)
222 let result = t.write(&bytes_to_write);
223 assert_eq!(result.unwrap(), 2);
Allen George0e22c362017-01-30 07:15:00 -0500224 assert_eq!(&t.write_bytes(), &bytes_to_write);
Allen George8b96bfb2016-11-02 08:01:08 -0400225
226 // try write again (nothing should be written)
227 let result = t.write(&bytes_to_write);
228 assert_eq!(result.unwrap(), 0);
Allen George0e22c362017-01-30 07:15:00 -0500229 assert_eq!(&t.write_bytes(), &bytes_to_write); // still the same as before
Allen George8b96bfb2016-11-02 08:01:08 -0400230
231 // now reset the buffer
232 t.empty_write_buffer();
Allen George0e22c362017-01-30 07:15:00 -0500233 assert_eq!(t.write_bytes().len(), 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400234
235 // now try write again - the write should succeed
236 let result = t.write(&bytes_to_write);
237 assert_eq!(result.unwrap(), 2);
Allen George0e22c362017-01-30 07:15:00 -0500238 assert_eq!(&t.write_bytes(), &bytes_to_write);
Allen George8b96bfb2016-11-02 08:01:08 -0400239 }
240
241 #[test]
242 fn must_accept_multiple_writes_until_buffer_is_full() {
Allen George0e22c362017-01-30 07:15:00 -0500243 let mut t = TBufferChannel::with_capacity(0, 10);
Allen George8b96bfb2016-11-02 08:01:08 -0400244
245 // first write (all bytes written)
246 let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
247 let write_0_result = t.write(&bytes_to_write_0);
248 assert_eq!(write_0_result.unwrap(), 2);
Allen George0e22c362017-01-30 07:15:00 -0500249 assert_eq!(t.write_bytes(), &bytes_to_write_0);
Allen George8b96bfb2016-11-02 08:01:08 -0400250
251 // second write (all bytes written, starting at index 2)
252 let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
253 let write_1_result = t.write(&bytes_to_write_1);
254 assert_eq!(write_1_result.unwrap(), 7);
Allen George0e22c362017-01-30 07:15:00 -0500255 assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1);
Allen George8b96bfb2016-11-02 08:01:08 -0400256
257 // third write (only 1 byte written - that's all we have space for)
258 let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
259 let write_2_result = t.write(&bytes_to_write_2);
260 assert_eq!(write_2_result.unwrap(), 1);
Allen George0e22c362017-01-30 07:15:00 -0500261 assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
Allen George8b96bfb2016-11-02 08:01:08 -0400262
263 // fourth write (no writes are accepted)
264 let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
265 let write_3_result = t.write(&bytes_to_write_3);
266 assert_eq!(write_3_result.unwrap(), 0);
267
268 // check the full write buffer
269 let mut expected: Vec<u8> = Vec::with_capacity(10);
270 expected.extend_from_slice(&bytes_to_write_0);
271 expected.extend_from_slice(&bytes_to_write_1);
272 expected.extend_from_slice(&bytes_to_write_2[0..1]);
Allen George0e22c362017-01-30 07:15:00 -0500273 assert_eq!(t.write_bytes(), &expected[..]);
Allen George8b96bfb2016-11-02 08:01:08 -0400274 }
275
276 #[test]
277 fn must_empty_read_buffer() {
Allen George0e22c362017-01-30 07:15:00 -0500278 let mut t = TBufferChannel::with_capacity(1, 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400279
280 let bytes_to_read: [u8; 1] = [0x01];
281 let result = t.set_readable_bytes(&bytes_to_read);
282 assert_eq!(result, 1);
Allen George0e22c362017-01-30 07:15:00 -0500283 assert_eq!(t.read_bytes(), &bytes_to_read);
Allen George8b96bfb2016-11-02 08:01:08 -0400284
285 t.empty_read_buffer();
Allen George0e22c362017-01-30 07:15:00 -0500286 assert_eq!(t.read_bytes().len(), 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400287 }
288
289 #[test]
290 fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
Allen George0e22c362017-01-30 07:15:00 -0500291 let mut t = TBufferChannel::with_capacity(1, 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400292
293 let bytes_to_read_0: [u8; 1] = [0x01];
294 let result = t.set_readable_bytes(&bytes_to_read_0);
295 assert_eq!(result, 1);
Allen George0e22c362017-01-30 07:15:00 -0500296 assert_eq!(t.read_bytes(), &bytes_to_read_0);
Allen George8b96bfb2016-11-02 08:01:08 -0400297
298 t.empty_read_buffer();
Allen George0e22c362017-01-30 07:15:00 -0500299 assert_eq!(t.read_bytes().len(), 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400300
301 let bytes_to_read_1: [u8; 1] = [0x02];
302 let result = t.set_readable_bytes(&bytes_to_read_1);
303 assert_eq!(result, 1);
Allen George0e22c362017-01-30 07:15:00 -0500304 assert_eq!(t.read_bytes(), &bytes_to_read_1);
Allen George8b96bfb2016-11-02 08:01:08 -0400305 }
306
307 #[test]
308 fn must_accept_multiple_reads_until_all_bytes_read() {
Allen George0e22c362017-01-30 07:15:00 -0500309 let mut t = TBufferChannel::with_capacity(10, 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400310
311 let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
312
313 // check that we're able to set the bytes to be read
314 let result = t.set_readable_bytes(&readable_bytes);
315 assert_eq!(result, 10);
Allen George0e22c362017-01-30 07:15:00 -0500316 assert_eq!(t.read_bytes(), &readable_bytes);
Allen George8b96bfb2016-11-02 08:01:08 -0400317
318 // first read
319 let mut read_buf_0 = vec![0; 5];
320 let read_result = t.read(&mut read_buf_0);
321 assert_eq!(read_result.unwrap(), 5);
322 assert_eq!(read_buf_0.as_slice(), &(readable_bytes[0..5]));
323
324 // second read
325 let mut read_buf_1 = vec![0; 4];
326 let read_result = t.read(&mut read_buf_1);
327 assert_eq!(read_result.unwrap(), 4);
328 assert_eq!(read_buf_1.as_slice(), &(readable_bytes[5..9]));
329
330 // third read (only 1 byte remains to be read)
331 let mut read_buf_2 = vec![0; 3];
332 let read_result = t.read(&mut read_buf_2);
333 assert_eq!(read_result.unwrap(), 1);
334 read_buf_2.truncate(1); // FIXME: does the caller have to do this?
335 assert_eq!(read_buf_2.as_slice(), &(readable_bytes[9..]));
336
337 // fourth read (nothing should be readable)
338 let mut read_buf_3 = vec![0; 10];
339 let read_result = t.read(&mut read_buf_3);
340 assert_eq!(read_result.unwrap(), 0);
341 read_buf_3.truncate(0);
342
343 // check that all the bytes we received match the original (again!)
344 let mut bytes_read = Vec::with_capacity(10);
345 bytes_read.extend_from_slice(&read_buf_0);
346 bytes_read.extend_from_slice(&read_buf_1);
347 bytes_read.extend_from_slice(&read_buf_2);
348 bytes_read.extend_from_slice(&read_buf_3);
349 assert_eq!(&bytes_read, &readable_bytes);
350 }
351
352 #[test]
353 fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
Allen George0e22c362017-01-30 07:15:00 -0500354 let mut t = TBufferChannel::with_capacity(3, 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400355
356 let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
357
358 // check that we're able to set the bytes to be read
359 let result = t.set_readable_bytes(&readable_bytes_0);
360 assert_eq!(result, 3);
Allen George0e22c362017-01-30 07:15:00 -0500361 assert_eq!(t.read_bytes(), &readable_bytes_0);
Allen George8b96bfb2016-11-02 08:01:08 -0400362
363 let mut read_buf = vec![0; 4];
364
365 // drain the read buffer
366 let read_result = t.read(&mut read_buf);
367 assert_eq!(read_result.unwrap(), 3);
Allen George0e22c362017-01-30 07:15:00 -0500368 assert_eq!(t.read_bytes(), &read_buf[0..3]);
Allen George8b96bfb2016-11-02 08:01:08 -0400369
370 // check that a subsequent read fails
371 let read_result = t.read(&mut read_buf);
372 assert_eq!(read_result.unwrap(), 0);
373
374 // we don't modify the read buffer on failure
375 let mut expected_bytes = Vec::with_capacity(4);
376 expected_bytes.extend_from_slice(&readable_bytes_0);
377 expected_bytes.push(0x00);
378 assert_eq!(&read_buf, &expected_bytes);
379
380 // replenish the read buffer again
381 let readable_bytes_1: [u8; 2] = [0x91, 0xAA];
382
383 // check that we're able to set the bytes to be read
384 let result = t.set_readable_bytes(&readable_bytes_1);
385 assert_eq!(result, 2);
Allen George0e22c362017-01-30 07:15:00 -0500386 assert_eq!(t.read_bytes(), &readable_bytes_1);
Allen George8b96bfb2016-11-02 08:01:08 -0400387
388 // read again
389 let read_result = t.read(&mut read_buf);
390 assert_eq!(read_result.unwrap(), 2);
Allen George0e22c362017-01-30 07:15:00 -0500391 assert_eq!(t.read_bytes(), &read_buf[0..2]);
Allen George8b96bfb2016-11-02 08:01:08 -0400392 }
393}