blob: a6f780ac8c895eb4c8120e1961436283f3634970 [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::convert::From;
19use std::io;
20use std::io::{ErrorKind, Read, Write};
21use std::net::{Shutdown, TcpStream};
Allen George8b96bfb2016-11-02 08:01:08 -040022
Allen George0e22c362017-01-30 07:15:00 -050023use {TransportErrorKind, new_transport_error};
24use super::{ReadHalf, TIoChannel, WriteHalf};
Allen George8b96bfb2016-11-02 08:01:08 -040025
Allen George0e22c362017-01-30 07:15:00 -050026/// Bidirectional TCP/IP channel.
Allen George8b96bfb2016-11-02 08:01:08 -040027///
28/// # Examples
29///
Allen George0e22c362017-01-30 07:15:00 -050030/// Create a `TTcpChannel`.
Allen George8b96bfb2016-11-02 08:01:08 -040031///
32/// ```no_run
33/// use std::io::{Read, Write};
Allen George0e22c362017-01-30 07:15:00 -050034/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040035///
Allen George0e22c362017-01-30 07:15:00 -050036/// let mut c = TTcpChannel::new();
37/// c.open("localhost:9090").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040038///
39/// let mut buf = vec![0u8; 4];
Allen George0e22c362017-01-30 07:15:00 -050040/// c.read(&mut buf).unwrap();
41/// c.write(&vec![0, 1, 2]).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040042/// ```
43///
Allen George0e22c362017-01-30 07:15:00 -050044/// Create a `TTcpChannel` by wrapping an existing `TcpStream`.
Allen George8b96bfb2016-11-02 08:01:08 -040045///
46/// ```no_run
47/// use std::io::{Read, Write};
48/// use std::net::TcpStream;
Allen George0e22c362017-01-30 07:15:00 -050049/// use thrift::transport::TTcpChannel;
Allen George8b96bfb2016-11-02 08:01:08 -040050///
51/// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040052///
Allen George0e22c362017-01-30 07:15:00 -050053/// // no need to call c.open() since we've already connected above
54/// let mut c = TTcpChannel::with_stream(stream);
Allen George8b96bfb2016-11-02 08:01:08 -040055///
56/// let mut buf = vec![0u8; 4];
Allen George0e22c362017-01-30 07:15:00 -050057/// c.read(&mut buf).unwrap();
58/// c.write(&vec![0, 1, 2]).unwrap();
Allen George8b96bfb2016-11-02 08:01:08 -040059/// ```
Allen George0e22c362017-01-30 07:15:00 -050060#[derive(Debug, Default)]
61pub struct TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -040062 stream: Option<TcpStream>,
63}
64
Allen George0e22c362017-01-30 07:15:00 -050065impl TTcpChannel {
66 /// Create an uninitialized `TTcpChannel`.
Allen George8b96bfb2016-11-02 08:01:08 -040067 ///
Allen George0e22c362017-01-30 07:15:00 -050068 /// The returned instance must be opened using `TTcpChannel::open(...)`
Allen George8b96bfb2016-11-02 08:01:08 -040069 /// before it can be used.
Allen George0e22c362017-01-30 07:15:00 -050070 pub fn new() -> TTcpChannel {
71 TTcpChannel { stream: None }
Allen George8b96bfb2016-11-02 08:01:08 -040072 }
73
Allen George0e22c362017-01-30 07:15:00 -050074 /// Create a `TTcpChannel` that wraps an existing `TcpStream`.
Allen George8b96bfb2016-11-02 08:01:08 -040075 ///
76 /// The passed-in stream is assumed to have been opened before being wrapped
Allen George0e22c362017-01-30 07:15:00 -050077 /// by the created `TTcpChannel` instance.
78 pub fn with_stream(stream: TcpStream) -> TTcpChannel {
79 TTcpChannel { stream: Some(stream) }
Allen George8b96bfb2016-11-02 08:01:08 -040080 }
81
82 /// Connect to `remote_address`, which should have the form `host:port`.
83 pub fn open(&mut self, remote_address: &str) -> ::Result<()> {
84 if self.stream.is_some() {
Allen George0e22c362017-01-30 07:15:00 -050085 Err(
86 new_transport_error(
87 TransportErrorKind::AlreadyOpen,
88 "tcp connection previously opened",
89 ),
90 )
Allen George8b96bfb2016-11-02 08:01:08 -040091 } else {
92 match TcpStream::connect(&remote_address) {
93 Ok(s) => {
94 self.stream = Some(s);
95 Ok(())
96 }
97 Err(e) => Err(From::from(e)),
98 }
99 }
100 }
101
Allen George0e22c362017-01-30 07:15:00 -0500102 /// Shut down this channel.
Allen George8b96bfb2016-11-02 08:01:08 -0400103 ///
104 /// Both send and receive halves are closed, and this instance can no
105 /// longer be used to communicate with another endpoint.
106 pub fn close(&mut self) -> ::Result<()> {
Allen George0e22c362017-01-30 07:15:00 -0500107 self.if_set(|s| s.shutdown(Shutdown::Both))
108 .map_err(From::from)
Allen George8b96bfb2016-11-02 08:01:08 -0400109 }
110
111 fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
Allen George0e22c362017-01-30 07:15:00 -0500112 where
113 F: FnMut(&mut TcpStream) -> io::Result<T>,
Allen George8b96bfb2016-11-02 08:01:08 -0400114 {
115
116 if let Some(ref mut s) = self.stream {
117 stream_operation(s)
118 } else {
Allen George0e22c362017-01-30 07:15:00 -0500119 Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"),)
Allen George8b96bfb2016-11-02 08:01:08 -0400120 }
121 }
122}
123
Allen George0e22c362017-01-30 07:15:00 -0500124impl TIoChannel for TTcpChannel {
125 fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)>
126 where
127 Self: Sized,
128 {
129 let mut s = self;
130
131 s.stream
132 .as_mut()
133 .and_then(|s| s.try_clone().ok())
134 .map(
135 |cloned| {
136 (ReadHalf { handle: TTcpChannel { stream: s.stream.take() } },
137 WriteHalf { handle: TTcpChannel { stream: Some(cloned) } })
138 },
139 )
140 .ok_or_else(
141 || {
142 new_transport_error(
143 TransportErrorKind::Unknown,
144 "cannot clone underlying tcp stream",
145 )
146 },
147 )
148 }
149}
150
151impl Read for TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400152 fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
153 self.if_set(|s| s.read(b))
154 }
155}
156
Allen George0e22c362017-01-30 07:15:00 -0500157impl Write for TTcpChannel {
Allen George8b96bfb2016-11-02 08:01:08 -0400158 fn write(&mut self, b: &[u8]) -> io::Result<usize> {
Allen Georgecf7ba4c2017-12-11 11:44:11 -0500159 self.if_set(|s| s.write(b))
Allen George8b96bfb2016-11-02 08:01:08 -0400160 }
161
162 fn flush(&mut self) -> io::Result<()> {
163 self.if_set(|s| s.flush())
164 }
165}