blob: c3e632457e12d777c0c0f9d057991c92c9da6599 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::convert::From;
19use std::io;
20use std::io::{ErrorKind, Read, Write};
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +010021use std::net::{Shutdown, TcpStream, ToSocketAddrs};
Allen George8b96bfb2016-11-02 08:01:08 -040022
tokcumf0336412022-03-30 11:39:08 +020023#[cfg(unix)]
24use std::os::unix::net::UnixStream;
25
Allen George0e22c362017-01-30 07:15:00 -050026use super::{ReadHalf, TIoChannel, WriteHalf};
Allen Georgeb0d14132020-03-29 11:48:55 -040027use crate::{new_transport_error, TransportErrorKind};
Allen George8b96bfb2016-11-02 08:01:08 -040028
Allen George0e22c362017-01-30 07:15:00 -050029/// Bidirectional TCP/IP channel.
Allen George8b96bfb2016-11-02 08:01:08 -040030///
31/// # Examples
32///
Allen George0e22c362017-01-30 07:15:00 -050033/// Create a `TTcpChannel`.
Allen George8b96bfb2016-11-02 08:01:08 -040034///
35/// ```no_run
36/// use std::io::{Read, Write};
Allen George0e22c362017-01-30 07:15:00 -050037/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040038///
Allen George0e22c362017-01-30 07:15:00 -050039/// let mut c = TTcpChannel::new();
40/// c.open("localhost:9090").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040041///
42/// let mut buf = vec![0u8; 4];
Allen George0e22c362017-01-30 07:15:00 -050043/// c.read(&mut buf).unwrap();
44/// c.write(&vec![0, 1, 2]).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040045/// ```
46///
Allen George0e22c362017-01-30 07:15:00 -050047/// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
Allen George8b96bfb2016-11-02 08:01:08 -040048///
49/// ```no_run
50/// use std::io::{Read, Write};
51/// use std::net::TcpStream;
Allen George0e22c362017-01-30 07:15:00 -050052/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040053///
54/// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
Tdxdxozf274d982023-09-26 12:52:05 +080055/// stream.set_nodelay(true).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040056///
Allen George0e22c362017-01-30 07:15:00 -050057/// // no need to call c.open() since we've already connected above
58/// let mut c = TTcpChannel::with_stream(stream);
Allen George8b96bfb2016-11-02 08:01:08 -040059///
60/// let mut buf = vec![0u8; 4];
Allen George0e22c362017-01-30 07:15:00 -050061/// c.read(&mut buf).unwrap();
62/// c.write(&vec![0, 1, 2]).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040063/// ```
Allen George0e22c362017-01-30 07:15:00 -050064#[derive(Debug, Default)]
65pub struct TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -040066 stream: Option<TcpStream>,
67}
68
Allen George0e22c362017-01-30 07:15:00 -050069impl TTcpChannel {
70 /// Create an uninitialized `TTcpChannel`.
Allen George8b96bfb2016-11-02 08:01:08 -040071 ///
Allen George0e22c362017-01-30 07:15:00 -050072 /// The returned instance must be opened using `TTcpChannel::open(...)`
Allen George8b96bfb2016-11-02 08:01:08 -040073 /// before it can be used.
Allen George0e22c362017-01-30 07:15:00 -050074 pub fn new() -> TTcpChannel {
75 TTcpChannel { stream: None }
Allen George8b96bfb2016-11-02 08:01:08 -040076 }
77
Allen George0e22c362017-01-30 07:15:00 -050078 /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
Allen George8b96bfb2016-11-02 08:01:08 -040079 ///
80 /// The passed-in stream is assumed to have been opened before being wrapped
Allen George0e22c362017-01-30 07:15:00 -050081 /// by the created `TTcpChannel` instance.
82 pub fn with_stream(stream: TcpStream) -> TTcpChannel {
Allen Georgeef7a1892018-12-16 18:01:37 -050083 TTcpChannel {
84 stream: Some(stream),
85 }
Allen George8b96bfb2016-11-02 08:01:08 -040086 }
87
Marcin Pajkowski98ce2c82019-11-05 00:20:15 +010088 /// Connect to `remote_address`, which should implement `ToSocketAddrs` trait.
Allen Georgeb0d14132020-03-29 11:48:55 -040089 pub fn open<A: ToSocketAddrs>(&mut self, remote_address: A) -> crate::Result<()> {
Allen George8b96bfb2016-11-02 08:01:08 -040090 if self.stream.is_some() {
Allen Georgeef7a1892018-12-16 18:01:37 -050091 Err(new_transport_error(
92 TransportErrorKind::AlreadyOpen,
93 "tcp connection previously opened",
94 ))
Allen George8b96bfb2016-11-02 08:01:08 -040095 } else {
96 match TcpStream::connect(&remote_address) {
97 Ok(s) => {
Tdxdxozf274d982023-09-26 12:52:05 +080098 s.set_nodelay(true)?;
Allen George8b96bfb2016-11-02 08:01:08 -040099 self.stream = Some(s);
100 Ok(())
101 }
102 Err(e) => Err(From::from(e)),
103 }
104 }
105 }
106
Allen George0e22c362017-01-30 07:15:00 -0500107 /// Shut down this channel.
Allen George8b96bfb2016-11-02 08:01:08 -0400108 ///
109 /// Both send and receive halves are closed, and this instance can no
110 /// longer be used to communicate with another endpoint.
Allen Georgeb0d14132020-03-29 11:48:55 -0400111 pub fn close(&mut self) -> crate::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -0500112 self.if_set(|s| s.shutdown(Shutdown::Both))
113 .map_err(From::from)
Allen George8b96bfb2016-11-02 08:01:08 -0400114 }
115
116 fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
Allen George0e22c362017-01-30 07:15:00 -0500117 where
118 F: FnMut(&mut TcpStream) -> io::Result<T>,
Allen George8b96bfb2016-11-02 08:01:08 -0400119 {
Allen George8b96bfb2016-11-02 08:01:08 -0400120 if let Some(ref mut s) = self.stream {
121 stream_operation(s)
122 } else {
Allen Georgeef7a1892018-12-16 18:01:37 -0500123 Err(io::Error::new(
124 ErrorKind::NotConnected,
125 "tcp endpoint not connected",
126 ))
Allen George8b96bfb2016-11-02 08:01:08 -0400127 }
128 }
129}
130
Allen George0e22c362017-01-30 07:15:00 -0500131impl TIoChannel for TTcpChannel {
Allen Georgeb0d14132020-03-29 11:48:55 -0400132 fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
Allen George0e22c362017-01-30 07:15:00 -0500133 where
134 Self: Sized,
135 {
136 let mut s = self;
137
138 s.stream
139 .as_mut()
140 .and_then(|s| s.try_clone().ok())
Allen Georgeef7a1892018-12-16 18:01:37 -0500141 .map(|cloned| {
142 let read_half = ReadHalf::new(TTcpChannel {
143 stream: s.stream.take(),
144 });
145 let write_half = WriteHalf::new(TTcpChannel {
146 stream: Some(cloned),
147 });
148 (read_half, write_half)
149 })
150 .ok_or_else(|| {
151 new_transport_error(
152 TransportErrorKind::Unknown,
153 "cannot clone underlying tcp stream",
154 )
155 })
Allen George0e22c362017-01-30 07:15:00 -0500156 }
157}
158
159impl Read for TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400160 fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
161 self.if_set(|s| s.read(b))
162 }
163}
164
Allen George0e22c362017-01-30 07:15:00 -0500165impl Write for TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400166 fn write(&mut self, b: &[u8]) -> io::Result<usize> {
Allen Georgecf7ba4c2017-12-11 11:44:11 -0500167 self.if_set(|s| s.write(b))
Allen George8b96bfb2016-11-02 08:01:08 -0400168 }
169
170 fn flush(&mut self) -> io::Result<()> {
171 self.if_set(|s| s.flush())
172 }
173}
tokcumf0336412022-03-30 11:39:08 +0200174
175#[cfg(unix)]
176impl TIoChannel for UnixStream {
177 fn split(self) -> crate::Result<(ReadHalf<Self>, WriteHalf<Self>)>
178 where
179 Self: Sized,
180 {
181 let socket_rx = self.try_clone().unwrap();
182
183 Ok((ReadHalf::new(self), WriteHalf::new(socket_rx)))
184 }
185}