blob: cf959be8bed64be0a03f2b580627ac8034392710 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
Allen George8b96bfb2016-11-02 08:01:08 -040019use std::cmp;
20use std::io;
Allen Georgeb7084cb2017-12-13 07:34:49 -050021use std::io::{Read, Write};
Allen George8b96bfb2016-11-02 08:01:08 -040022
Allen George0e22c362017-01-30 07:15:00 -050023use super::{TReadTransport, TReadTransportFactory, TWriteTransport, TWriteTransportFactory};
Hasnain Lakhani42d0b712025-07-17 19:57:05 -070024use crate::TConfiguration;
Allen George8b96bfb2016-11-02 08:01:08 -040025
26/// Default capacity of the read buffer in bytes.
Allen George0e22c362017-01-30 07:15:00 -050027const READ_CAPACITY: usize = 4096;
Allen George8b96bfb2016-11-02 08:01:08 -040028
Allen George0e22c362017-01-30 07:15:00 -050029/// Default capacity of the write buffer in bytes.
30const WRITE_CAPACITY: usize = 4096;
Allen George8b96bfb2016-11-02 08:01:08 -040031
Allen George0e22c362017-01-30 07:15:00 -050032/// Transport that reads framed messages.
Allen George8b96bfb2016-11-02 08:01:08 -040033///
Allen George0e22c362017-01-30 07:15:00 -050034/// A `TFramedReadTransport` maintains a fixed-size internal read buffer.
35/// On a call to `TFramedReadTransport::read(...)` one full message - both
36/// fixed-length header and bytes - is read from the wrapped channel and
37/// buffered. Subsequent read calls are serviced from the internal buffer
38/// until it is exhausted, at which point the next full message is read
39/// from the wrapped channel.
Allen George8b96bfb2016-11-02 08:01:08 -040040///
41/// # Examples
42///
Allen George0e22c362017-01-30 07:15:00 -050043/// Create and use a `TFramedReadTransport`.
Allen George8b96bfb2016-11-02 08:01:08 -040044///
45/// ```no_run
Allen George0e22c362017-01-30 07:15:00 -050046/// use std::io::Read;
47/// use thrift::transport::{TFramedReadTransport, TTcpChannel};
Allen George8b96bfb2016-11-02 08:01:08 -040048///
Allen George0e22c362017-01-30 07:15:00 -050049/// let mut c = TTcpChannel::new();
50/// c.open("localhost:9090").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040051///
Allen George0e22c362017-01-30 07:15:00 -050052/// let mut t = TFramedReadTransport::new(c);
Allen George8b96bfb2016-11-02 08:01:08 -040053///
Allen George8b96bfb2016-11-02 08:01:08 -040054/// t.read(&mut vec![0u8; 1]).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040055/// ```
Allen George0e22c362017-01-30 07:15:00 -050056#[derive(Debug)]
57pub struct TFramedReadTransport<C>
58where
59 C: Read,
60{
Allen Georgeb7084cb2017-12-13 07:34:49 -050061 buf: Vec<u8>,
Allen George0e22c362017-01-30 07:15:00 -050062 pos: usize,
63 cap: usize,
64 chan: C,
Hasnain Lakhani42d0b712025-07-17 19:57:05 -070065 config: TConfiguration,
Allen George8b96bfb2016-11-02 08:01:08 -040066}
67
Allen George0e22c362017-01-30 07:15:00 -050068impl<C> TFramedReadTransport<C>
69where
70 C: Read,
71{
Allen Georgeb7084cb2017-12-13 07:34:49 -050072 /// Create a `TFramedReadTransport` with a default-sized
73 /// internal read buffer that wraps the given `TIoChannel`.
Allen George0e22c362017-01-30 07:15:00 -050074 pub fn new(channel: C) -> TFramedReadTransport<C> {
75 TFramedReadTransport::with_capacity(READ_CAPACITY, channel)
Allen George8b96bfb2016-11-02 08:01:08 -040076 }
77
Allen Georgeb7084cb2017-12-13 07:34:49 -050078 /// Create a `TFramedTransport` with an internal read buffer
79 /// of size `read_capacity` that wraps the given `TIoChannel`.
Allen George0e22c362017-01-30 07:15:00 -050080 pub fn with_capacity(read_capacity: usize, channel: C) -> TFramedReadTransport<C> {
81 TFramedReadTransport {
Allen Georgeb7084cb2017-12-13 07:34:49 -050082 buf: vec![0; read_capacity], // FIXME: do I actually have to do this?
Allen George0e22c362017-01-30 07:15:00 -050083 pos: 0,
84 cap: 0,
85 chan: channel,
Hasnain Lakhani42d0b712025-07-17 19:57:05 -070086 config: TConfiguration::default(),
Allen George8b96bfb2016-11-02 08:01:08 -040087 }
88 }
89}
90
Allen George0e22c362017-01-30 07:15:00 -050091impl<C> Read for TFramedReadTransport<C>
92where
93 C: Read,
94{
Allen George8b96bfb2016-11-02 08:01:08 -040095 fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
Allen George0e22c362017-01-30 07:15:00 -050096 if self.cap - self.pos == 0 {
Hasnain Lakhani42d0b712025-07-17 19:57:05 -070097 let frame_size_bytes = self.chan.read_i32::<BigEndian>()?;
98
99 if frame_size_bytes < 0 {
100 return Err(io::Error::new(
101 io::ErrorKind::InvalidData,
102 format!("Negative frame size: {}", frame_size_bytes),
103 ));
104 }
105
106 let message_size = frame_size_bytes as usize;
107
108 if let Some(max_frame) = self.config.max_frame_size() {
109 if message_size > max_frame {
110 return Err(io::Error::new(
111 io::ErrorKind::InvalidData,
112 format!(
113 "Frame size {} exceeds maximum allowed size of {}",
114 message_size, max_frame
115 ),
116 ));
117 }
118 }
Allen Georgeb7084cb2017-12-13 07:34:49 -0500119
120 let buf_capacity = cmp::max(message_size, READ_CAPACITY);
121 self.buf.resize(buf_capacity, 0);
122
Allen George0e22c362017-01-30 07:15:00 -0500123 self.chan.read_exact(&mut self.buf[..message_size])?;
Cameron Martinda54fc82025-01-12 08:55:45 +0000124 self.cap = message_size;
Allen Georgeb7084cb2017-12-13 07:34:49 -0500125 self.pos = 0;
Allen George8b96bfb2016-11-02 08:01:08 -0400126 }
127
Allen George0e22c362017-01-30 07:15:00 -0500128 let nread = cmp::min(b.len(), self.cap - self.pos);
129 b[..nread].clone_from_slice(&self.buf[self.pos..self.pos + nread]);
130 self.pos += nread;
Allen George8b96bfb2016-11-02 08:01:08 -0400131
132 Ok(nread)
133 }
134}
135
Allen George0e22c362017-01-30 07:15:00 -0500136/// Factory for creating instances of `TFramedReadTransport`.
137#[derive(Default)]
138pub struct TFramedReadTransportFactory;
139
140impl TFramedReadTransportFactory {
141 pub fn new() -> TFramedReadTransportFactory {
142 TFramedReadTransportFactory {}
143 }
144}
145
146impl TReadTransportFactory for TFramedReadTransportFactory {
147 /// Create a `TFramedReadTransport`.
Danny Browning77d96c12019-08-21 13:41:07 -0600148 fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send> {
Allen George0e22c362017-01-30 07:15:00 -0500149 Box::new(TFramedReadTransport::new(channel))
150 }
151}
Allen George0e22c362017-01-30 07:15:00 -0500152/// Transport that writes framed messages.
153///
154/// A `TFramedWriteTransport` maintains a fixed-size internal write buffer. All
155/// writes are made to this buffer and are sent to the wrapped channel only
156/// when `TFramedWriteTransport::flush()` is called. On a flush a fixed-length
157/// header with a count of the buffered bytes is written, followed by the bytes
158/// themselves.
159///
160/// # Examples
161///
162/// Create and use a `TFramedWriteTransport`.
163///
164/// ```no_run
165/// use std::io::Write;
166/// use thrift::transport::{TFramedWriteTransport, TTcpChannel};
167///
168/// let mut c = TTcpChannel::new();
169/// c.open("localhost:9090").unwrap();
170///
171/// let mut t = TFramedWriteTransport::new(c);
172///
173/// t.write(&[0x00]).unwrap();
174/// t.flush().unwrap();
175/// ```
176#[derive(Debug)]
177pub struct TFramedWriteTransport<C>
178where
179 C: Write,
180{
Allen Georgeb7084cb2017-12-13 07:34:49 -0500181 buf: Vec<u8>,
Allen George0e22c362017-01-30 07:15:00 -0500182 channel: C,
183}
184
185impl<C> TFramedWriteTransport<C>
186where
187 C: Write,
188{
Allen Georgeb7084cb2017-12-13 07:34:49 -0500189 /// Create a `TFramedWriteTransport` with default-sized internal
190 /// write buffer that wraps the given `TIoChannel`.
Allen George0e22c362017-01-30 07:15:00 -0500191 pub fn new(channel: C) -> TFramedWriteTransport<C> {
192 TFramedWriteTransport::with_capacity(WRITE_CAPACITY, channel)
193 }
194
Allen Georgeb7084cb2017-12-13 07:34:49 -0500195 /// Create a `TFramedWriteTransport` with an internal write buffer
196 /// of size `write_capacity` that wraps the given `TIoChannel`.
Allen George0e22c362017-01-30 07:15:00 -0500197 pub fn with_capacity(write_capacity: usize, channel: C) -> TFramedWriteTransport<C> {
198 TFramedWriteTransport {
Allen Georgeb7084cb2017-12-13 07:34:49 -0500199 buf: Vec::with_capacity(write_capacity),
200 channel,
Allen George0e22c362017-01-30 07:15:00 -0500201 }
202 }
203}
204
205impl<C> Write for TFramedWriteTransport<C>
206where
207 C: Write,
208{
Allen George8b96bfb2016-11-02 08:01:08 -0400209 fn write(&mut self, b: &[u8]) -> io::Result<usize> {
Allen Georgeb7084cb2017-12-13 07:34:49 -0500210 let current_capacity = self.buf.capacity();
211 let available_space = current_capacity - self.buf.len();
212 if b.len() > available_space {
213 let additional_space = cmp::max(b.len() - available_space, current_capacity);
214 self.buf.reserve(additional_space);
Allen George8b96bfb2016-11-02 08:01:08 -0400215 }
216
Allen Georgeb7084cb2017-12-13 07:34:49 -0500217 self.buf.extend_from_slice(b);
218 Ok(b.len())
Allen George8b96bfb2016-11-02 08:01:08 -0400219 }
220
221 fn flush(&mut self) -> io::Result<()> {
Allen Georgeb7084cb2017-12-13 07:34:49 -0500222 let message_size = self.buf.len();
Allen George8b96bfb2016-11-02 08:01:08 -0400223
224 if let 0 = message_size {
225 return Ok(());
226 } else {
Allen Georgeef7a1892018-12-16 18:01:37 -0500227 self.channel.write_i32::<BigEndian>(message_size as i32)?;
Allen George8b96bfb2016-11-02 08:01:08 -0400228 }
229
Allen Georgeb7084cb2017-12-13 07:34:49 -0500230 // will spin if the underlying channel can't be written to
Allen George8b96bfb2016-11-02 08:01:08 -0400231 let mut byte_index = 0;
Allen Georgeb7084cb2017-12-13 07:34:49 -0500232 while byte_index < message_size {
233 let nwrite = self.channel.write(&self.buf[byte_index..message_size])?;
234 byte_index = cmp::min(byte_index + nwrite, message_size);
Allen George8b96bfb2016-11-02 08:01:08 -0400235 }
236
Allen Georgeb7084cb2017-12-13 07:34:49 -0500237 let buf_capacity = cmp::min(self.buf.capacity(), WRITE_CAPACITY);
238 self.buf.resize(buf_capacity, 0);
239 self.buf.clear();
240
Allen George0e22c362017-01-30 07:15:00 -0500241 self.channel.flush()
Allen George8b96bfb2016-11-02 08:01:08 -0400242 }
243}
244
Allen George0e22c362017-01-30 07:15:00 -0500245/// Factory for creating instances of `TFramedWriteTransport`.
Allen George8b96bfb2016-11-02 08:01:08 -0400246#[derive(Default)]
Allen George0e22c362017-01-30 07:15:00 -0500247pub struct TFramedWriteTransportFactory;
Allen George8b96bfb2016-11-02 08:01:08 -0400248
Allen George0e22c362017-01-30 07:15:00 -0500249impl TFramedWriteTransportFactory {
250 pub fn new() -> TFramedWriteTransportFactory {
251 TFramedWriteTransportFactory {}
Allen George8b96bfb2016-11-02 08:01:08 -0400252 }
253}
254
Allen George0e22c362017-01-30 07:15:00 -0500255impl TWriteTransportFactory for TFramedWriteTransportFactory {
256 /// Create a `TFramedWriteTransport`.
Danny Browning77d96c12019-08-21 13:41:07 -0600257 fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send> {
Allen George0e22c362017-01-30 07:15:00 -0500258 Box::new(TFramedWriteTransport::new(channel))
Allen George8b96bfb2016-11-02 08:01:08 -0400259 }
260}
261
262#[cfg(test)]
263mod tests {
Allen Georgeb7084cb2017-12-13 07:34:49 -0500264 use super::*;
Allen Georgeb0d14132020-03-29 11:48:55 -0400265 use crate::transport::mem::TBufferChannel;
Allen Georgeb7084cb2017-12-13 07:34:49 -0500266
267 // FIXME: test a forced reserve
268
269 #[test]
270 fn must_read_message_smaller_than_initial_buffer_size() {
271 let c = TBufferChannel::with_capacity(10, 10);
272 let mut t = TFramedReadTransport::with_capacity(8, c);
273
Allen Georgeef7a1892018-12-16 18:01:37 -0500274 t.chan.set_readable_bytes(&[
275 0x00, 0x00, 0x00, 0x04, /* message size */
276 0x00, 0x01, 0x02, 0x03, /* message body */
277 ]);
Allen Georgeb7084cb2017-12-13 07:34:49 -0500278
279 let mut buf = vec![0; 8];
280
281 // we've read exactly 4 bytes
282 assert_eq!(t.read(&mut buf).unwrap(), 4);
283 assert_eq!(&buf[..4], &[0x00, 0x01, 0x02, 0x03]);
284 }
285
286 #[test]
287 fn must_read_message_greater_than_initial_buffer_size() {
288 let c = TBufferChannel::with_capacity(10, 10);
289 let mut t = TFramedReadTransport::with_capacity(2, c);
290
Allen Georgeef7a1892018-12-16 18:01:37 -0500291 t.chan.set_readable_bytes(&[
292 0x00, 0x00, 0x00, 0x04, /* message size */
293 0x00, 0x01, 0x02, 0x03, /* message body */
294 ]);
Allen Georgeb7084cb2017-12-13 07:34:49 -0500295
296 let mut buf = vec![0; 8];
297
298 // we've read exactly 4 bytes
299 assert_eq!(t.read(&mut buf).unwrap(), 4);
300 assert_eq!(&buf[..4], &[0x00, 0x01, 0x02, 0x03]);
301 }
302
303 #[test]
304 fn must_read_multiple_messages_in_sequence_correctly() {
305 let c = TBufferChannel::with_capacity(10, 10);
306 let mut t = TFramedReadTransport::with_capacity(2, c);
307
308 //
309 // 1st message
310 //
311
Allen Georgeef7a1892018-12-16 18:01:37 -0500312 t.chan.set_readable_bytes(&[
313 0x00, 0x00, 0x00, 0x04, /* message size */
314 0x00, 0x01, 0x02, 0x03, /* message body */
315 ]);
Allen Georgeb7084cb2017-12-13 07:34:49 -0500316
317 let mut buf = vec![0; 8];
318
319 // we've read exactly 4 bytes
320 assert_eq!(t.read(&mut buf).unwrap(), 4);
321 assert_eq!(&buf, &[0x00, 0x01, 0x02, 0x03, 0x00, 0x00, 0x00, 0x00]);
322
323 //
324 // 2nd message
325 //
326
Allen Georgeef7a1892018-12-16 18:01:37 -0500327 t.chan.set_readable_bytes(&[
328 0x00, 0x00, 0x00, 0x01, /* message size */
329 0x04, /* message body */
330 ]);
Allen Georgeb7084cb2017-12-13 07:34:49 -0500331
332 let mut buf = vec![0; 8];
333
334 // we've read exactly 1 byte
335 assert_eq!(t.read(&mut buf).unwrap(), 1);
336 assert_eq!(&buf, &[0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
337 }
338
339 #[test]
340 fn must_write_message_smaller_than_buffer_size() {
341 let mem = TBufferChannel::with_capacity(0, 0);
342 let mut t = TFramedWriteTransport::with_capacity(20, mem);
343
344 let b = vec![0; 10];
345
346 // should have written 10 bytes
347 assert_eq!(t.write(&b).unwrap(), 10);
348 }
349
350 #[test]
351 fn must_return_zero_if_caller_calls_write_with_empty_buffer() {
352 let mem = TBufferChannel::with_capacity(0, 10);
353 let mut t = TFramedWriteTransport::with_capacity(10, mem);
354
355 let expected: [u8; 0] = [];
356
357 assert_eq!(t.write(&[]).unwrap(), 0);
358 assert_eq_transport_written_bytes!(t, expected);
359 }
360
361 #[test]
362 fn must_write_to_inner_transport_on_flush() {
363 let mem = TBufferChannel::with_capacity(10, 10);
364 let mut t = TFramedWriteTransport::new(mem);
365
366 let b: [u8; 5] = [0x00, 0x01, 0x02, 0x03, 0x04];
367 assert_eq!(t.write(&b).unwrap(), 5);
368 assert_eq_transport_num_written_bytes!(t, 0);
369
370 assert!(t.flush().is_ok());
371
372 let expected_bytes = [
Allen Georgeef7a1892018-12-16 18:01:37 -0500373 0x00, 0x00, 0x00, 0x05, /* message size */
374 0x00, 0x01, 0x02, 0x03, 0x04, /* message body */
Allen Georgeb7084cb2017-12-13 07:34:49 -0500375 ];
376
377 assert_eq_transport_written_bytes!(t, expected_bytes);
378 }
379
380 #[test]
381 fn must_write_message_greater_than_buffer_size_00() {
382 let mem = TBufferChannel::with_capacity(0, 10);
383
384 // IMPORTANT: DO **NOT** CHANGE THE WRITE_CAPACITY OR THE NUMBER OF BYTES TO BE WRITTEN!
385 // these lengths were chosen to be just long enough
386 // that doubling the capacity is a **worse** choice than
387 // simply resizing the buffer to b.len()
388
389 let mut t = TFramedWriteTransport::with_capacity(1, mem);
390 let b = [0x00, 0x01, 0x02];
391
392 // should have written 3 bytes
393 assert_eq!(t.write(&b).unwrap(), 3);
394 assert_eq_transport_num_written_bytes!(t, 0);
395
396 assert!(t.flush().is_ok());
397
398 let expected_bytes = [
Allen Georgeef7a1892018-12-16 18:01:37 -0500399 0x00, 0x00, 0x00, 0x03, /* message size */
400 0x00, 0x01, 0x02, /* message body */
Allen Georgeb7084cb2017-12-13 07:34:49 -0500401 ];
402
403 assert_eq_transport_written_bytes!(t, expected_bytes);
404 }
405
406 #[test]
407 fn must_write_message_greater_than_buffer_size_01() {
408 let mem = TBufferChannel::with_capacity(0, 10);
409
410 // IMPORTANT: DO **NOT** CHANGE THE WRITE_CAPACITY OR THE NUMBER OF BYTES TO BE WRITTEN!
411 // these lengths were chosen to be just long enough
412 // that doubling the capacity is a **better** choice than
413 // simply resizing the buffer to b.len()
414
415 let mut t = TFramedWriteTransport::with_capacity(2, mem);
416 let b = [0x00, 0x01, 0x02];
417
418 // should have written 3 bytes
419 assert_eq!(t.write(&b).unwrap(), 3);
420 assert_eq_transport_num_written_bytes!(t, 0);
421
422 assert!(t.flush().is_ok());
423
424 let expected_bytes = [
Allen Georgeef7a1892018-12-16 18:01:37 -0500425 0x00, 0x00, 0x00, 0x03, /* message size */
426 0x00, 0x01, 0x02, /* message body */
Allen Georgeb7084cb2017-12-13 07:34:49 -0500427 ];
428
429 assert_eq_transport_written_bytes!(t, expected_bytes);
430 }
431
432 #[test]
433 fn must_return_error_if_nothing_can_be_written_to_inner_transport_on_flush() {
434 let mem = TBufferChannel::with_capacity(0, 0);
435 let mut t = TFramedWriteTransport::with_capacity(1, mem);
436
437 let b = vec![0; 10];
438
439 // should have written 10 bytes
440 assert_eq!(t.write(&b).unwrap(), 10);
441
442 // let's flush
443 let r = t.flush();
444
445 // this time we'll error out because the flush can't write to the underlying channel
446 assert!(r.is_err());
447 }
448
449 #[test]
450 fn must_write_successfully_after_flush() {
451 // IMPORTANT: write capacity *MUST* be greater
452 // than message sizes used in this test + 4-byte frame header
453 let mem = TBufferChannel::with_capacity(0, 10);
454 let mut t = TFramedWriteTransport::with_capacity(5, mem);
455
456 // write and flush
457 let first_message: [u8; 5] = [0x00, 0x01, 0x02, 0x03, 0x04];
458 assert_eq!(t.write(&first_message).unwrap(), 5);
459 assert!(t.flush().is_ok());
460
461 let mut expected = Vec::new();
462 expected.write_all(&[0x00, 0x00, 0x00, 0x05]).unwrap(); // message size
463 expected.extend_from_slice(&first_message);
464
465 // check the flushed bytes
466 assert_eq!(t.channel.write_bytes(), expected);
467
468 // reset our underlying transport
469 t.channel.empty_write_buffer();
470
471 let second_message: [u8; 3] = [0x05, 0x06, 0x07];
472 assert_eq!(t.write(&second_message).unwrap(), 3);
473 assert!(t.flush().is_ok());
474
475 expected.clear();
476 expected.write_all(&[0x00, 0x00, 0x00, 0x03]).unwrap(); // message size
477 expected.extend_from_slice(&second_message);
478
479 // check the flushed bytes
480 assert_eq!(t.channel.write_bytes(), expected);
481 }
Allen George8b96bfb2016-11-02 08:01:08 -0400482}