blob: 16b59ef216a3670db0354ab2c2faf12ff7ff12fc [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::convert::From;
19use std::io;
20use std::io::{ErrorKind, Read, Write};
21use std::net::{Shutdown, TcpStream};
22use std::ops::Drop;
23
Allen George0e22c362017-01-30 07:15:00 -050024use {TransportErrorKind, new_transport_error};
25use super::{ReadHalf, TIoChannel, WriteHalf};
Allen George8b96bfb2016-11-02 08:01:08 -040026
Allen George0e22c362017-01-30 07:15:00 -050027/// Bidirectional TCP/IP channel.
Allen George8b96bfb2016-11-02 08:01:08 -040028///
29/// # Examples
30///
Allen George0e22c362017-01-30 07:15:00 -050031/// Create a `TTcpChannel`.
Allen George8b96bfb2016-11-02 08:01:08 -040032///
33/// ```no_run
34/// use std::io::{Read, Write};
Allen George0e22c362017-01-30 07:15:00 -050035/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040036///
Allen George0e22c362017-01-30 07:15:00 -050037/// let mut c = TTcpChannel::new();
38/// c.open("localhost:9090").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040039///
40/// let mut buf = vec![0u8; 4];
Allen George0e22c362017-01-30 07:15:00 -050041/// c.read(&mut buf).unwrap();
42/// c.write(&vec![0, 1, 2]).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040043/// ```
44///
Allen George0e22c362017-01-30 07:15:00 -050045/// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
Allen George8b96bfb2016-11-02 08:01:08 -040046///
47/// ```no_run
48/// use std::io::{Read, Write};
49/// use std::net::TcpStream;
Allen George0e22c362017-01-30 07:15:00 -050050/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040051///
52/// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040053///
Allen George0e22c362017-01-30 07:15:00 -050054/// // no need to call c.open() since we've already connected above
55/// let mut c = TTcpChannel::with_stream(stream);
Allen George8b96bfb2016-11-02 08:01:08 -040056///
57/// let mut buf = vec![0u8; 4];
Allen George0e22c362017-01-30 07:15:00 -050058/// c.read(&mut buf).unwrap();
59/// c.write(&vec![0, 1, 2]).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040060/// ```
Allen George0e22c362017-01-30 07:15:00 -050061#[derive(Debug, Default)]
62pub struct TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -040063 stream: Option<TcpStream>,
64}
65
Allen George0e22c362017-01-30 07:15:00 -050066impl TTcpChannel {
67 /// Create an uninitialized `TTcpChannel`.
Allen George8b96bfb2016-11-02 08:01:08 -040068 ///
Allen George0e22c362017-01-30 07:15:00 -050069 /// The returned instance must be opened using `TTcpChannel::open(...)`
Allen George8b96bfb2016-11-02 08:01:08 -040070 /// before it can be used.
Allen George0e22c362017-01-30 07:15:00 -050071 pub fn new() -> TTcpChannel {
72 TTcpChannel { stream: None }
Allen George8b96bfb2016-11-02 08:01:08 -040073 }
74
Allen George0e22c362017-01-30 07:15:00 -050075 /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
Allen George8b96bfb2016-11-02 08:01:08 -040076 ///
77 /// The passed-in stream is assumed to have been opened before being wrapped
Allen George0e22c362017-01-30 07:15:00 -050078 /// by the created `TTcpChannel` instance.
79 pub fn with_stream(stream: TcpStream) -> TTcpChannel {
80 TTcpChannel { stream: Some(stream) }
Allen George8b96bfb2016-11-02 08:01:08 -040081 }
82
83 /// Connect to `remote_address`, which should have the form `host:port`.
84 pub fn open(&mut self, remote_address: &str) -> ::Result<()> {
85 if self.stream.is_some() {
Allen George0e22c362017-01-30 07:15:00 -050086 Err(
87 new_transport_error(
88 TransportErrorKind::AlreadyOpen,
89 "tcp connection previously opened",
90 ),
91 )
Allen George8b96bfb2016-11-02 08:01:08 -040092 } else {
93 match TcpStream::connect(&remote_address) {
94 Ok(s) => {
95 self.stream = Some(s);
96 Ok(())
97 }
98 Err(e) => Err(From::from(e)),
99 }
100 }
101 }
102
Allen George0e22c362017-01-30 07:15:00 -0500103 /// Shut down this channel.
Allen George8b96bfb2016-11-02 08:01:08 -0400104 ///
105 /// Both send and receive halves are closed, and this instance can no
106 /// longer be used to communicate with another endpoint.
107 pub fn close(&mut self) -> ::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -0500108 self.if_set(|s| s.shutdown(Shutdown::Both))
109 .map_err(From::from)
Allen George8b96bfb2016-11-02 08:01:08 -0400110 }
111
112 fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
Allen George0e22c362017-01-30 07:15:00 -0500113 where
114 F: FnMut(&mut TcpStream) -> io::Result<T>,
Allen George8b96bfb2016-11-02 08:01:08 -0400115 {
116
117 if let Some(ref mut s) = self.stream {
118 stream_operation(s)
119 } else {
Allen George0e22c362017-01-30 07:15:00 -0500120 Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"),)
Allen George8b96bfb2016-11-02 08:01:08 -0400121 }
122 }
123}
124
Allen George0e22c362017-01-30 07:15:00 -0500125impl TIoChannel for TTcpChannel {
126 fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
127 where
128 Self: Sized,
129 {
130 let mut s = self;
131
132 s.stream
133 .as_mut()
134 .and_then(|s| s.try_clone().ok())
135 .map(
136 |cloned| {
137 (ReadHalf { handle: TTcpChannel { stream: s.stream.take() } },
138 WriteHalf { handle: TTcpChannel { stream: Some(cloned) } })
139 },
140 )
141 .ok_or_else(
142 || {
143 new_transport_error(
144 TransportErrorKind::Unknown,
145 "cannot clone underlying tcp stream",
146 )
147 },
148 )
149 }
150}
151
152impl Read for TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400153 fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
154 self.if_set(|s| s.read(b))
155 }
156}
157
Allen George0e22c362017-01-30 07:15:00 -0500158impl Write for TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400159 fn write(&mut self, b: &[u8]) -> io::Result<usize> {
160 self.if_set(|s| s.write(b))
161 }
162
163 fn flush(&mut self) -> io::Result<()> {
164 self.if_set(|s| s.flush())
165 }
166}
167
Allen George0e22c362017-01-30 07:15:00 -0500168// FIXME: Do I have to implement the Drop trait? TcpStream closes the socket on drop.
169impl Drop for TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400170 fn drop(&mut self) {
171 if let Err(e) = self.close() {
Allen George0e22c362017-01-30 07:15:00 -0500172 warn!("error while closing socket: {:?}", e)
Allen George8b96bfb2016-11-02 08:01:08 -0400173 }
174 }
175}