blob: 48d6dda133b69d84ee22b125d13f9dd30c83217c [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::convert::From;
19use std::io;
20use std::io::{ErrorKind, Read, Write};
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +010021use std::net::{Shutdown, TcpStream, ToSocketAddrs};
Allen George8b96bfb2016-11-02 08:01:08 -040022
tokcumf0336412022-03-30 11:39:08 +020023#[cfg(unix)]
24use std::os::unix::net::UnixStream;
25
Allen George0e22c362017-01-30 07:15:00 -050026use super::{ReadHalf, TIoChannel, WriteHalf};
Allen Georgeb0d14132020-03-29 11:48:55 -040027use crate::{new_transport_error, TransportErrorKind};
Allen George8b96bfb2016-11-02 08:01:08 -040028
Allen George0e22c362017-01-30 07:15:00 -050029/// Bidirectional TCP/IP channel.
Allen George8b96bfb2016-11-02 08:01:08 -040030///
31/// # Examples
32///
Allen George0e22c362017-01-30 07:15:00 -050033/// Create a `TTcpChannel`.
Allen George8b96bfb2016-11-02 08:01:08 -040034///
35/// ```no_run
36/// use std::io::{Read, Write};
Allen George0e22c362017-01-30 07:15:00 -050037/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040038///
Allen George0e22c362017-01-30 07:15:00 -050039/// let mut c = TTcpChannel::new();
40/// c.open("localhost:9090").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040041///
42/// let mut buf = vec![0u8; 4];
Allen George0e22c362017-01-30 07:15:00 -050043/// c.read(&mut buf).unwrap();
44/// c.write(&vec![0, 1, 2]).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040045/// ```
46///
Allen George0e22c362017-01-30 07:15:00 -050047/// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
Allen George8b96bfb2016-11-02 08:01:08 -040048///
49/// ```no_run
50/// use std::io::{Read, Write};
51/// use std::net::TcpStream;
Allen George0e22c362017-01-30 07:15:00 -050052/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040053///
54/// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040055///
Allen George0e22c362017-01-30 07:15:00 -050056/// // no need to call c.open() since we've already connected above
57/// let mut c = TTcpChannel::with_stream(stream);
Allen George8b96bfb2016-11-02 08:01:08 -040058///
59/// let mut buf = vec![0u8; 4];
Allen George0e22c362017-01-30 07:15:00 -050060/// c.read(&mut buf).unwrap();
61/// c.write(&vec![0, 1, 2]).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040062/// ```
Allen George0e22c362017-01-30 07:15:00 -050063#[derive(Debug, Default)]
64pub struct TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -040065 stream: Option<TcpStream>,
66}
67
Allen George0e22c362017-01-30 07:15:00 -050068impl TTcpChannel {
69 /// Create an uninitialized `TTcpChannel`.
Allen George8b96bfb2016-11-02 08:01:08 -040070 ///
Allen George0e22c362017-01-30 07:15:00 -050071 /// The returned instance must be opened using `TTcpChannel::open(...)`
Allen George8b96bfb2016-11-02 08:01:08 -040072 /// before it can be used.
Allen George0e22c362017-01-30 07:15:00 -050073 pub fn new() -> TTcpChannel {
74 TTcpChannel { stream: None }
Allen George8b96bfb2016-11-02 08:01:08 -040075 }
76
Allen George0e22c362017-01-30 07:15:00 -050077 /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
Allen George8b96bfb2016-11-02 08:01:08 -040078 ///
79 /// The passed-in stream is assumed to have been opened before being wrapped
Allen George0e22c362017-01-30 07:15:00 -050080 /// by the created `TTcpChannel` instance.
81 pub fn with_stream(stream: TcpStream) -> TTcpChannel {
Allen Georgeef7a1892018-12-16 18:01:37 -050082 TTcpChannel {
83 stream: Some(stream),
84 }
Allen George8b96bfb2016-11-02 08:01:08 -040085 }
86
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +010087 /// Connect to `remote_address`, which should implement `ToSocketAddrs` trait.
Allen Georgeb0d14132020-03-29 11:48:55 -040088 pub fn open<A: ToSocketAddrs>(&mut self, remote_address: A) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -040089 if self.stream.is_some() {
Allen Georgeef7a1892018-12-16 18:01:37 -050090 Err(new_transport_error(
91 TransportErrorKind::AlreadyOpen,
92 "tcp connection previously opened",
93 ))
Allen George8b96bfb2016-11-02 08:01:08 -040094 } else {
95 match TcpStream::connect(&remote_address) {
96 Ok(s) => {
97 self.stream = Some(s);
98 Ok(())
99 }
100 Err(e) => Err(From::from(e)),
101 }
102 }
103 }
104
Allen George0e22c362017-01-30 07:15:00 -0500105 /// Shut down this channel.
Allen George8b96bfb2016-11-02 08:01:08 -0400106 ///
107 /// Both send and receive halves are closed, and this instance can no
108 /// longer be used to communicate with another endpoint.
Allen Georgeb0d14132020-03-29 11:48:55 -0400109 pub fn close(&mut self) -> crate::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -0500110 self.if_set(|s| s.shutdown(Shutdown::Both))
111 .map_err(From::from)
Allen George8b96bfb2016-11-02 08:01:08 -0400112 }
113
114 fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
Allen George0e22c362017-01-30 07:15:00 -0500115 where
116 F: FnMut(&mut TcpStream) -> io::Result<T>,
Allen George8b96bfb2016-11-02 08:01:08 -0400117 {
Allen George8b96bfb2016-11-02 08:01:08 -0400118 if let Some(ref mut s) = self.stream {
119 stream_operation(s)
120 } else {
Allen Georgeef7a1892018-12-16 18:01:37 -0500121 Err(io::Error::new(
122 ErrorKind::NotConnected,
123 "tcp endpoint not connected",
124 ))
Allen George8b96bfb2016-11-02 08:01:08 -0400125 }
126 }
127}
128
Allen George0e22c362017-01-30 07:15:00 -0500129impl TIoChannel for TTcpChannel {
Allen Georgeb0d14132020-03-29 11:48:55 -0400130 fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
Allen George0e22c362017-01-30 07:15:00 -0500131 where
132 Self: Sized,
133 {
134 let mut s = self;
135
136 s.stream
137 .as_mut()
138 .and_then(|s| s.try_clone().ok())
Allen Georgeef7a1892018-12-16 18:01:37 -0500139 .map(|cloned| {
140 let read_half = ReadHalf::new(TTcpChannel {
141 stream: s.stream.take(),
142 });
143 let write_half = WriteHalf::new(TTcpChannel {
144 stream: Some(cloned),
145 });
146 (read_half, write_half)
147 })
148 .ok_or_else(|| {
149 new_transport_error(
150 TransportErrorKind::Unknown,
151 "cannot clone underlying tcp stream",
152 )
153 })
Allen George0e22c362017-01-30 07:15:00 -0500154 }
155}
156
157impl Read for TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400158 fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
159 self.if_set(|s| s.read(b))
160 }
161}
162
Allen George0e22c362017-01-30 07:15:00 -0500163impl Write for TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400164 fn write(&mut self, b: &[u8]) -> io::Result<usize> {
Allen Georgecf7ba4c2017-12-11 11:44:11 -0500165 self.if_set(|s| s.write(b))
Allen George8b96bfb2016-11-02 08:01:08 -0400166 }
167
168 fn flush(&mut self) -> io::Result<()> {
169 self.if_set(|s| s.flush())
170 }
171}
tokcumf0336412022-03-30 11:39:08 +0200172
173#[cfg(unix)]
174impl TIoChannel for UnixStream {
175 fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
176 where
177 Self: Sized,
178 {
179 let socket_rx = self.try_clone().unwrap();
180
181 Ok((ReadHalf::new(self), WriteHalf::new(socket_rx)))
182 }
183}