blob: 9874257604e0171f37a3f9cea722406d62f1a7a5 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp;
19use std::io;
Allen George0e22c362017-01-30 07:15:00 -050020use std::sync::{Arc, Mutex};
Allen George8b96bfb2016-11-02 08:01:08 -040021
Allen George0e22c362017-01-30 07:15:00 -050022use super::{ReadHalf, TIoChannel, WriteHalf};
23
24/// In-memory read and write channel with fixed-size read and write buffers.
Allen George8b96bfb2016-11-02 08:01:08 -040025///
26/// On a `write` bytes are written to the internal write buffer. Writes are no
27/// longer accepted once this buffer is full. Callers must `empty_write_buffer()`
28/// before subsequent writes are accepted.
29///
30/// You can set readable bytes in the internal read buffer by filling it with
31/// `set_readable_bytes(...)`. Callers can then read until the buffer is
32/// depleted. No further reads are accepted until the internal read buffer is
33/// replenished again.
Julian Tescher9c6c6bf2019-12-01 21:43:01 -080034#[derive(Clone, Debug)]
Allen George0e22c362017-01-30 07:15:00 -050035pub struct TBufferChannel {
36 read: Arc<Mutex<ReadData>>,
37 write: Arc<Mutex<WriteData>>,
Allen George8b96bfb2016-11-02 08:01:08 -040038}
39
Allen George0e22c362017-01-30 07:15:00 -050040#[derive(Debug)]
41struct ReadData {
42 buf: Box<[u8]>,
43 pos: usize,
44 idx: usize,
45 cap: usize,
46}
47
48#[derive(Debug)]
49struct WriteData {
50 buf: Box<[u8]>,
51 pos: usize,
52 cap: usize,
53}
54
55impl TBufferChannel {
56 /// Constructs a new, empty `TBufferChannel` with the given
Allen George8b96bfb2016-11-02 08:01:08 -040057 /// read buffer capacity and write buffer capacity.
Allen George0e22c362017-01-30 07:15:00 -050058 pub fn with_capacity(read_capacity: usize, write_capacity: usize) -> TBufferChannel {
59 TBufferChannel {
Allen Georgeef7a1892018-12-16 18:01:37 -050060 read: Arc::new(Mutex::new(ReadData {
61 buf: vec![0; read_capacity].into_boxed_slice(),
62 idx: 0,
63 pos: 0,
64 cap: read_capacity,
65 })),
66 write: Arc::new(Mutex::new(WriteData {
67 buf: vec![0; write_capacity].into_boxed_slice(),
68 pos: 0,
69 cap: write_capacity,
70 })),
Allen George8b96bfb2016-11-02 08:01:08 -040071 }
72 }
73
Allen George0e22c362017-01-30 07:15:00 -050074 /// Return a copy of the bytes held by the internal read buffer.
75 /// Returns an empty vector if no readable bytes are present.
76 pub fn read_bytes(&self) -> Vec<u8> {
77 let rdata = self.read.as_ref().lock().unwrap();
78 let mut buf = vec![0u8; rdata.idx];
79 buf.copy_from_slice(&rdata.buf[..rdata.idx]);
80 buf
Allen George8b96bfb2016-11-02 08:01:08 -040081 }
82
83 // FIXME: do I really need this API call?
84 // FIXME: should this simply reset to the last set of readable bytes?
85 /// Reset the number of readable bytes to zero.
86 ///
87 /// Subsequent calls to `read` will return nothing.
88 pub fn empty_read_buffer(&mut self) {
Allen George0e22c362017-01-30 07:15:00 -050089 let mut rdata = self.read.as_ref().lock().unwrap();
90 rdata.pos = 0;
91 rdata.idx = 0;
Allen George8b96bfb2016-11-02 08:01:08 -040092 }
93
94 /// Copy bytes from the source buffer `buf` into the internal read buffer,
95 /// overwriting any existing bytes. Returns the number of bytes copied,
96 /// which is `min(buf.len(), internal_read_buf.len())`.
97 pub fn set_readable_bytes(&mut self, buf: &[u8]) -> usize {
98 self.empty_read_buffer();
Allen George0e22c362017-01-30 07:15:00 -050099 let mut rdata = self.read.as_ref().lock().unwrap();
100 let max_bytes = cmp::min(rdata.cap, buf.len());
101 rdata.buf[..max_bytes].clone_from_slice(&buf[..max_bytes]);
102 rdata.idx = max_bytes;
Allen George8b96bfb2016-11-02 08:01:08 -0400103 max_bytes
104 }
105
Allen George0e22c362017-01-30 07:15:00 -0500106 /// Return a copy of the bytes held by the internal write buffer.
Allen George8b96bfb2016-11-02 08:01:08 -0400107 /// Returns an empty vector if no bytes were written.
Allen George0e22c362017-01-30 07:15:00 -0500108 pub fn write_bytes(&self) -> Vec<u8> {
109 let wdata = self.write.as_ref().lock().unwrap();
110 let mut buf = vec![0u8; wdata.pos];
111 buf.copy_from_slice(&wdata.buf[..wdata.pos]);
Allen George8b96bfb2016-11-02 08:01:08 -0400112 buf
113 }
114
115 /// Resets the internal write buffer, making it seem like no bytes were
Allen George0e22c362017-01-30 07:15:00 -0500116 /// written. Calling `write_buffer` after this returns an empty vector.
Allen George8b96bfb2016-11-02 08:01:08 -0400117 pub fn empty_write_buffer(&mut self) {
Allen George0e22c362017-01-30 07:15:00 -0500118 let mut wdata = self.write.as_ref().lock().unwrap();
119 wdata.pos = 0;
Allen George8b96bfb2016-11-02 08:01:08 -0400120 }
121
122 /// Overwrites the contents of the read buffer with the contents of the
123 /// write buffer. The write buffer is emptied after this operation.
124 pub fn copy_write_buffer_to_read_buffer(&mut self) {
Allen George0e22c362017-01-30 07:15:00 -0500125 // FIXME: redo this entire method
Allen George8b96bfb2016-11-02 08:01:08 -0400126 let buf = {
Allen George0e22c362017-01-30 07:15:00 -0500127 let wdata = self.write.as_ref().lock().unwrap();
128 let b = &wdata.buf[..wdata.pos];
Allen George8b96bfb2016-11-02 08:01:08 -0400129 let mut b_ret = vec![0; b.len()];
Allen George92e1c402017-04-06 08:28:22 -0400130 b_ret.copy_from_slice(b);
Allen George8b96bfb2016-11-02 08:01:08 -0400131 b_ret
132 };
133
134 let bytes_copied = self.set_readable_bytes(&buf);
135 assert_eq!(bytes_copied, buf.len());
136
137 self.empty_write_buffer();
138 }
139}
140
Allen George0e22c362017-01-30 07:15:00 -0500141impl TIoChannel for TBufferChannel {
142 fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
143 where
144 Self: Sized,
145 {
Allen Georgeef7a1892018-12-16 18:01:37 -0500146 Ok((
147 ReadHalf {
148 handle: TBufferChannel {
149 read: self.read.clone(),
150 write: self.write.clone(),
151 },
152 },
153 WriteHalf {
154 handle: TBufferChannel {
155 read: self.read.clone(),
156 write: self.write.clone(),
157 },
158 },
159 ))
Allen George0e22c362017-01-30 07:15:00 -0500160 }
161}
162
163impl io::Read for TBufferChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400164 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
Allen George0e22c362017-01-30 07:15:00 -0500165 let mut rdata = self.read.as_ref().lock().unwrap();
166 let nread = cmp::min(buf.len(), rdata.idx - rdata.pos);
167 buf[..nread].clone_from_slice(&rdata.buf[rdata.pos..rdata.pos + nread]);
168 rdata.pos += nread;
Allen George8b96bfb2016-11-02 08:01:08 -0400169 Ok(nread)
170 }
171}
172
Allen George0e22c362017-01-30 07:15:00 -0500173impl io::Write for TBufferChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400174 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
Allen George0e22c362017-01-30 07:15:00 -0500175 let mut wdata = self.write.as_ref().lock().unwrap();
176 let nwrite = cmp::min(buf.len(), wdata.cap - wdata.pos);
177 let (start, end) = (wdata.pos, wdata.pos + nwrite);
178 wdata.buf[start..end].clone_from_slice(&buf[..nwrite]);
179 wdata.pos += nwrite;
Allen George8b96bfb2016-11-02 08:01:08 -0400180 Ok(nwrite)
181 }
182
183 fn flush(&mut self) -> io::Result<()> {
184 Ok(()) // nothing to do on flush
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use std::io::{Read, Write};
191
Allen George0e22c362017-01-30 07:15:00 -0500192 use super::TBufferChannel;
Allen George8b96bfb2016-11-02 08:01:08 -0400193
194 #[test]
195 fn must_empty_write_buffer() {
Allen George0e22c362017-01-30 07:15:00 -0500196 let mut t = TBufferChannel::with_capacity(0, 1);
Allen George8b96bfb2016-11-02 08:01:08 -0400197
198 let bytes_to_write: [u8; 1] = [0x01];
199 let result = t.write(&bytes_to_write);
200 assert_eq!(result.unwrap(), 1);
Allen George0e22c362017-01-30 07:15:00 -0500201 assert_eq!(&t.write_bytes(), &bytes_to_write);
Allen George8b96bfb2016-11-02 08:01:08 -0400202
203 t.empty_write_buffer();
Allen George0e22c362017-01-30 07:15:00 -0500204 assert_eq!(t.write_bytes().len(), 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400205 }
206
207 #[test]
208 fn must_accept_writes_after_buffer_emptied() {
Allen George0e22c362017-01-30 07:15:00 -0500209 let mut t = TBufferChannel::with_capacity(0, 2);
Allen George8b96bfb2016-11-02 08:01:08 -0400210
211 let bytes_to_write: [u8; 2] = [0x01, 0x02];
212
213 // first write (all bytes written)
214 let result = t.write(&bytes_to_write);
215 assert_eq!(result.unwrap(), 2);
Allen George0e22c362017-01-30 07:15:00 -0500216 assert_eq!(&t.write_bytes(), &bytes_to_write);
Allen George8b96bfb2016-11-02 08:01:08 -0400217
218 // try write again (nothing should be written)
219 let result = t.write(&bytes_to_write);
220 assert_eq!(result.unwrap(), 0);
Allen George0e22c362017-01-30 07:15:00 -0500221 assert_eq!(&t.write_bytes(), &bytes_to_write); // still the same as before
Allen George8b96bfb2016-11-02 08:01:08 -0400222
223 // now reset the buffer
224 t.empty_write_buffer();
Allen George0e22c362017-01-30 07:15:00 -0500225 assert_eq!(t.write_bytes().len(), 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400226
227 // now try write again - the write should succeed
228 let result = t.write(&bytes_to_write);
229 assert_eq!(result.unwrap(), 2);
Allen George0e22c362017-01-30 07:15:00 -0500230 assert_eq!(&t.write_bytes(), &bytes_to_write);
Allen George8b96bfb2016-11-02 08:01:08 -0400231 }
232
233 #[test]
234 fn must_accept_multiple_writes_until_buffer_is_full() {
Allen George0e22c362017-01-30 07:15:00 -0500235 let mut t = TBufferChannel::with_capacity(0, 10);
Allen George8b96bfb2016-11-02 08:01:08 -0400236
237 // first write (all bytes written)
238 let bytes_to_write_0: [u8; 2] = [0x01, 0x41];
239 let write_0_result = t.write(&bytes_to_write_0);
240 assert_eq!(write_0_result.unwrap(), 2);
Allen George0e22c362017-01-30 07:15:00 -0500241 assert_eq!(t.write_bytes(), &bytes_to_write_0);
Allen George8b96bfb2016-11-02 08:01:08 -0400242
243 // second write (all bytes written, starting at index 2)
244 let bytes_to_write_1: [u8; 7] = [0x24, 0x41, 0x32, 0x33, 0x11, 0x98, 0xAF];
245 let write_1_result = t.write(&bytes_to_write_1);
246 assert_eq!(write_1_result.unwrap(), 7);
Allen George0e22c362017-01-30 07:15:00 -0500247 assert_eq!(&t.write_bytes()[2..], &bytes_to_write_1);
Allen George8b96bfb2016-11-02 08:01:08 -0400248
249 // third write (only 1 byte written - that's all we have space for)
250 let bytes_to_write_2: [u8; 3] = [0xBF, 0xDA, 0x98];
251 let write_2_result = t.write(&bytes_to_write_2);
252 assert_eq!(write_2_result.unwrap(), 1);
Allen George0e22c362017-01-30 07:15:00 -0500253 assert_eq!(&t.write_bytes()[9..], &bytes_to_write_2[0..1]); // how does this syntax work?!
Allen George8b96bfb2016-11-02 08:01:08 -0400254
255 // fourth write (no writes are accepted)
256 let bytes_to_write_3: [u8; 3] = [0xBF, 0xAA, 0xFD];
257 let write_3_result = t.write(&bytes_to_write_3);
258 assert_eq!(write_3_result.unwrap(), 0);
259
260 // check the full write buffer
261 let mut expected: Vec<u8> = Vec::with_capacity(10);
262 expected.extend_from_slice(&bytes_to_write_0);
263 expected.extend_from_slice(&bytes_to_write_1);
264 expected.extend_from_slice(&bytes_to_write_2[0..1]);
Allen George0e22c362017-01-30 07:15:00 -0500265 assert_eq!(t.write_bytes(), &expected[..]);
Allen George8b96bfb2016-11-02 08:01:08 -0400266 }
267
268 #[test]
269 fn must_empty_read_buffer() {
Allen George0e22c362017-01-30 07:15:00 -0500270 let mut t = TBufferChannel::with_capacity(1, 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400271
272 let bytes_to_read: [u8; 1] = [0x01];
273 let result = t.set_readable_bytes(&bytes_to_read);
274 assert_eq!(result, 1);
Allen George0e22c362017-01-30 07:15:00 -0500275 assert_eq!(t.read_bytes(), &bytes_to_read);
Allen George8b96bfb2016-11-02 08:01:08 -0400276
277 t.empty_read_buffer();
Allen George0e22c362017-01-30 07:15:00 -0500278 assert_eq!(t.read_bytes().len(), 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400279 }
280
281 #[test]
282 fn must_allow_readable_bytes_to_be_set_after_read_buffer_emptied() {
Allen George0e22c362017-01-30 07:15:00 -0500283 let mut t = TBufferChannel::with_capacity(1, 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400284
285 let bytes_to_read_0: [u8; 1] = [0x01];
286 let result = t.set_readable_bytes(&bytes_to_read_0);
287 assert_eq!(result, 1);
Allen George0e22c362017-01-30 07:15:00 -0500288 assert_eq!(t.read_bytes(), &bytes_to_read_0);
Allen George8b96bfb2016-11-02 08:01:08 -0400289
290 t.empty_read_buffer();
Allen George0e22c362017-01-30 07:15:00 -0500291 assert_eq!(t.read_bytes().len(), 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400292
293 let bytes_to_read_1: [u8; 1] = [0x02];
294 let result = t.set_readable_bytes(&bytes_to_read_1);
295 assert_eq!(result, 1);
Allen George0e22c362017-01-30 07:15:00 -0500296 assert_eq!(t.read_bytes(), &bytes_to_read_1);
Allen George8b96bfb2016-11-02 08:01:08 -0400297 }
298
299 #[test]
300 fn must_accept_multiple_reads_until_all_bytes_read() {
Allen George0e22c362017-01-30 07:15:00 -0500301 let mut t = TBufferChannel::with_capacity(10, 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400302
303 let readable_bytes: [u8; 10] = [0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0x00, 0x1A, 0x2B, 0x3C, 0x4D];
304
305 // check that we're able to set the bytes to be read
306 let result = t.set_readable_bytes(&readable_bytes);
307 assert_eq!(result, 10);
Allen George0e22c362017-01-30 07:15:00 -0500308 assert_eq!(t.read_bytes(), &readable_bytes);
Allen George8b96bfb2016-11-02 08:01:08 -0400309
310 // first read
311 let mut read_buf_0 = vec![0; 5];
312 let read_result = t.read(&mut read_buf_0);
313 assert_eq!(read_result.unwrap(), 5);
314 assert_eq!(read_buf_0.as_slice(), &(readable_bytes[0..5]));
315
316 // second read
317 let mut read_buf_1 = vec![0; 4];
318 let read_result = t.read(&mut read_buf_1);
319 assert_eq!(read_result.unwrap(), 4);
320 assert_eq!(read_buf_1.as_slice(), &(readable_bytes[5..9]));
321
322 // third read (only 1 byte remains to be read)
323 let mut read_buf_2 = vec![0; 3];
324 let read_result = t.read(&mut read_buf_2);
325 assert_eq!(read_result.unwrap(), 1);
326 read_buf_2.truncate(1); // FIXME: does the caller have to do this?
327 assert_eq!(read_buf_2.as_slice(), &(readable_bytes[9..]));
328
329 // fourth read (nothing should be readable)
330 let mut read_buf_3 = vec![0; 10];
331 let read_result = t.read(&mut read_buf_3);
332 assert_eq!(read_result.unwrap(), 0);
333 read_buf_3.truncate(0);
334
335 // check that all the bytes we received match the original (again!)
336 let mut bytes_read = Vec::with_capacity(10);
337 bytes_read.extend_from_slice(&read_buf_0);
338 bytes_read.extend_from_slice(&read_buf_1);
339 bytes_read.extend_from_slice(&read_buf_2);
340 bytes_read.extend_from_slice(&read_buf_3);
341 assert_eq!(&bytes_read, &readable_bytes);
342 }
343
344 #[test]
345 fn must_allow_reads_to_succeed_after_read_buffer_replenished() {
Allen George0e22c362017-01-30 07:15:00 -0500346 let mut t = TBufferChannel::with_capacity(3, 0);
Allen George8b96bfb2016-11-02 08:01:08 -0400347
348 let readable_bytes_0: [u8; 3] = [0x02, 0xAB, 0x33];
349
350 // check that we're able to set the bytes to be read
351 let result = t.set_readable_bytes(&readable_bytes_0);
352 assert_eq!(result, 3);
Allen George0e22c362017-01-30 07:15:00 -0500353 assert_eq!(t.read_bytes(), &readable_bytes_0);
Allen George8b96bfb2016-11-02 08:01:08 -0400354
355 let mut read_buf = vec![0; 4];
356
357 // drain the read buffer
358 let read_result = t.read(&mut read_buf);
359 assert_eq!(read_result.unwrap(), 3);
Allen George0e22c362017-01-30 07:15:00 -0500360 assert_eq!(t.read_bytes(), &read_buf[0..3]);
Allen George8b96bfb2016-11-02 08:01:08 -0400361
362 // check that a subsequent read fails
363 let read_result = t.read(&mut read_buf);
364 assert_eq!(read_result.unwrap(), 0);
365
366 // we don't modify the read buffer on failure
367 let mut expected_bytes = Vec::with_capacity(4);
368 expected_bytes.extend_from_slice(&readable_bytes_0);
369 expected_bytes.push(0x00);
370 assert_eq!(&read_buf, &expected_bytes);
371
372 // replenish the read buffer again
373 let readable_bytes_1: [u8; 2] = [0x91, 0xAA];
374
375 // check that we're able to set the bytes to be read
376 let result = t.set_readable_bytes(&readable_bytes_1);
377 assert_eq!(result, 2);
Allen George0e22c362017-01-30 07:15:00 -0500378 assert_eq!(t.read_bytes(), &readable_bytes_1);
Allen George8b96bfb2016-11-02 08:01:08 -0400379
380 // read again
381 let read_result = t.read(&mut read_buf);
382 assert_eq!(read_result.unwrap(), 2);
Allen George0e22c362017-01-30 07:15:00 -0500383 assert_eq!(t.read_bytes(), &read_buf[0..2]);
Allen George8b96bfb2016-11-02 08:01:08 -0400384 }
385}