blob: 2b5733f490b735c5aa52db85884a613d0a00cee8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Types used to send and receive bytes over an I/O channel.
//!
//! The core types are the `TReadTransport`, `TWriteTransport` and the
//! `TIoChannel` traits, through which `TInputProtocol` or
//! `TOutputProtocol` can receive and send primitives over the wire. While
//! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives
//! the types in this module understand only bytes.
use std::io;
use std::io::{Read, Write};
use std::ops::{Deref, DerefMut};
#[cfg(test)]
macro_rules! assert_eq_transport_num_written_bytes {
($transport:ident, $num_written_bytes:expr) => {{
assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes);
}};
}
#[cfg(test)]
macro_rules! assert_eq_transport_written_bytes {
($transport:ident, $expected_bytes:ident) => {{
assert_eq!($transport.channel.write_bytes(), &$expected_bytes);
}};
}
mod buffered;
mod framed;
mod mem;
mod socket;
pub use self::buffered::{
TBufferedReadTransport, TBufferedReadTransportFactory, TBufferedWriteTransport,
TBufferedWriteTransportFactory,
};
pub use self::framed::{
TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport,
TFramedWriteTransportFactory,
};
pub use self::mem::TBufferChannel;
pub use self::socket::TTcpChannel;
/// Identifies a transport used by a `TInputProtocol` to receive bytes.
pub trait TReadTransport: Read {}
/// Helper type used by a server to create `TReadTransport` instances for
/// accepted client connections.
pub trait TReadTransportFactory {
/// Create a `TTransport` that wraps a channel over which bytes are to be read.
fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>;
}
/// Identifies a transport used by `TOutputProtocol` to send bytes.
pub trait TWriteTransport: Write {}
/// Helper type used by a server to create `TWriteTransport` instances for
/// accepted client connections.
pub trait TWriteTransportFactory {
/// Create a `TTransport` that wraps a channel over which bytes are to be sent.
fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>;
}
impl<T> TReadTransport for T where T: Read {}
impl<T> TWriteTransport for T where T: Write {}
// FIXME: implement the Debug trait for boxed transports
impl<T> TReadTransportFactory for Box<T>
where
T: TReadTransportFactory + ?Sized,
{
fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send> {
(**self).create(channel)
}
}
impl<T> TWriteTransportFactory for Box<T>
where
T: TWriteTransportFactory + ?Sized,
{
fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send> {
(**self).create(channel)
}
}
/// Identifies a splittable bidirectional I/O channel used to send and receive bytes.
pub trait TIoChannel: Read + Write {
/// Split the channel into a readable half and a writable half, where the
/// readable half implements `io::Read` and the writable half implements
/// `io::Write`. Returns `None` if the channel was not initialized, or if it
/// cannot be split safely.
///
/// Returned halves may share the underlying OS channel or buffer resources.
/// Implementations **should ensure** that these two halves can be safely
/// used independently by concurrent threads.
fn split(
self,
) -> crate::Result<(
crate::transport::ReadHalf<Self>,
crate::transport::WriteHalf<Self>,
)>
where
Self: Sized;
}
/// The readable half of an object returned from `TIoChannel::split`.
#[derive(Debug)]
pub struct ReadHalf<C>
where
C: Read,
{
handle: C,
}
/// The writable half of an object returned from `TIoChannel::split`.
#[derive(Debug)]
pub struct WriteHalf<C>
where
C: Write,
{
handle: C,
}
impl<C> ReadHalf<C>
where
C: Read,
{
/// Create a `ReadHalf` associated with readable `handle`
pub fn new(handle: C) -> ReadHalf<C> {
ReadHalf { handle }
}
}
impl<C> WriteHalf<C>
where
C: Write,
{
/// Create a `WriteHalf` associated with writable `handle`
pub fn new(handle: C) -> WriteHalf<C> {
WriteHalf { handle }
}
}
impl<C> Read for ReadHalf<C>
where
C: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.handle.read(buf)
}
}
impl<C> Write for WriteHalf<C>
where
C: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.handle.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.handle.flush()
}
}
impl<C> Deref for ReadHalf<C>
where
C: Read,
{
type Target = C;
fn deref(&self) -> &Self::Target {
&self.handle
}
}
impl<C> DerefMut for ReadHalf<C>
where
C: Read,
{
fn deref_mut(&mut self) -> &mut C {
&mut self.handle
}
}
impl<C> Deref for WriteHalf<C>
where
C: Write,
{
type Target = C;
fn deref(&self) -> &Self::Target {
&self.handle
}
}
impl<C> DerefMut for WriteHalf<C>
where
C: Write,
{
fn deref_mut(&mut self) -> &mut C {
&mut self.handle
}
}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use super::*;
#[test]
fn must_create_usable_read_channel_from_concrete_read_type() {
let r = Cursor::new([0, 1, 2]);
let _ = TBufferedReadTransport::new(r);
}
#[test]
fn must_create_usable_read_channel_from_boxed_read() {
let r: Box<dyn Read> = Box::new(Cursor::new([0, 1, 2]));
let _ = TBufferedReadTransport::new(r);
}
#[test]
fn must_create_usable_write_channel_from_concrete_write_type() {
let w = vec![0u8; 10];
let _ = TBufferedWriteTransport::new(w);
}
#[test]
fn must_create_usable_write_channel_from_boxed_write() {
let w: Box<dyn Write> = Box::new(vec![0u8; 10]);
let _ = TBufferedWriteTransport::new(w);
}
#[test]
fn must_create_usable_read_transport_from_concrete_read_transport() {
let r = Cursor::new([0, 1, 2]);
let mut t = TBufferedReadTransport::new(r);
takes_read_transport(&mut t)
}
#[test]
fn must_create_usable_read_transport_from_boxed_read() {
let r = Cursor::new([0, 1, 2]);
let mut t: Box<dyn TReadTransport> = Box::new(TBufferedReadTransport::new(r));
takes_read_transport(&mut t)
}
#[test]
fn must_create_usable_write_transport_from_concrete_write_transport() {
let w = vec![0u8; 10];
let mut t = TBufferedWriteTransport::new(w);
takes_write_transport(&mut t)
}
#[test]
fn must_create_usable_write_transport_from_boxed_write() {
let w = vec![0u8; 10];
let mut t: Box<dyn TWriteTransport> = Box::new(TBufferedWriteTransport::new(w));
takes_write_transport(&mut t)
}
fn takes_read_transport<R>(t: &mut R)
where
R: TReadTransport,
{
t.bytes();
}
fn takes_write_transport<W>(t: &mut W)
where
W: TWriteTransport,
{
t.flush().unwrap();
}
}