Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | use std::convert::From; |
| 19 | use std::io; |
| 20 | use std::io::{ErrorKind, Read, Write}; |
| 21 | use std::net::{Shutdown, TcpStream}; |
| 22 | use std::ops::Drop; |
| 23 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 24 | use {TransportErrorKind, new_transport_error}; |
| 25 | use super::{ReadHalf, TIoChannel, WriteHalf}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 26 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 27 | /// Bidirectional TCP/IP channel. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 28 | /// |
| 29 | /// # Examples |
| 30 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 31 | /// Create a `TTcpChannel`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 32 | /// |
| 33 | /// ```no_run |
| 34 | /// use std::io::{Read, Write}; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 35 | /// use thrift::transport::TTcpChannel; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 36 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 37 | /// let mut c = TTcpChannel::new(); |
| 38 | /// c.open("localhost:9090").unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 39 | /// |
| 40 | /// let mut buf = vec![0u8; 4]; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 41 | /// c.read(&mut buf).unwrap(); |
| 42 | /// c.write(&vec![0, 1, 2]).unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 43 | /// ``` |
| 44 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 45 | /// Create a `TTcpChannel` by wrapping an existing `TcpStream`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 46 | /// |
| 47 | /// ```no_run |
| 48 | /// use std::io::{Read, Write}; |
| 49 | /// use std::net::TcpStream; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 50 | /// use thrift::transport::TTcpChannel; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 51 | /// |
| 52 | /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 53 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 54 | /// // no need to call c.open() since we've already connected above |
| 55 | /// let mut c = TTcpChannel::with_stream(stream); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 56 | /// |
| 57 | /// let mut buf = vec![0u8; 4]; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 58 | /// c.read(&mut buf).unwrap(); |
| 59 | /// c.write(&vec![0, 1, 2]).unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 60 | /// ``` |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 61 | #[derive(Debug, Default)] |
| 62 | pub struct TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 63 | stream: Option<TcpStream>, |
| 64 | } |
| 65 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 66 | impl TTcpChannel { |
| 67 | /// Create an uninitialized `TTcpChannel`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 68 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 69 | /// The returned instance must be opened using `TTcpChannel::open(...)` |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 70 | /// before it can be used. |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 71 | pub fn new() -> TTcpChannel { |
| 72 | TTcpChannel { stream: None } |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 73 | } |
| 74 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 75 | /// Create a `TTcpChannel` that wraps an existing `TcpStream`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 76 | /// |
| 77 | /// The passed-in stream is assumed to have been opened before being wrapped |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 78 | /// by the created `TTcpChannel` instance. |
| 79 | pub fn with_stream(stream: TcpStream) -> TTcpChannel { |
| 80 | TTcpChannel { stream: Some(stream) } |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 81 | } |
| 82 | |
| 83 | /// Connect to `remote_address`, which should have the form `host:port`. |
| 84 | pub fn open(&mut self, remote_address: &str) -> ::Result<()> { |
| 85 | if self.stream.is_some() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 86 | Err( |
| 87 | new_transport_error( |
| 88 | TransportErrorKind::AlreadyOpen, |
| 89 | "tcp connection previously opened", |
| 90 | ), |
| 91 | ) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 92 | } else { |
| 93 | match TcpStream::connect(&remote_address) { |
| 94 | Ok(s) => { |
| 95 | self.stream = Some(s); |
| 96 | Ok(()) |
| 97 | } |
| 98 | Err(e) => Err(From::from(e)), |
| 99 | } |
| 100 | } |
| 101 | } |
| 102 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 103 | /// Shut down this channel. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 104 | /// |
| 105 | /// Both send and receive halves are closed, and this instance can no |
| 106 | /// longer be used to communicate with another endpoint. |
| 107 | pub fn close(&mut self) -> ::Result<()> { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 108 | self.if_set(|s| s.shutdown(Shutdown::Both)) |
| 109 | .map_err(From::from) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 110 | } |
| 111 | |
| 112 | fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T> |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 113 | where |
| 114 | F: FnMut(&mut TcpStream) -> io::Result<T>, |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 115 | { |
| 116 | |
| 117 | if let Some(ref mut s) = self.stream { |
| 118 | stream_operation(s) |
| 119 | } else { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 120 | Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"),) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 121 | } |
| 122 | } |
| 123 | } |
| 124 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 125 | impl TIoChannel for TTcpChannel { |
| 126 | fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)> |
| 127 | where |
| 128 | Self: Sized, |
| 129 | { |
| 130 | let mut s = self; |
| 131 | |
| 132 | s.stream |
| 133 | .as_mut() |
| 134 | .and_then(|s| s.try_clone().ok()) |
| 135 | .map( |
| 136 | |cloned| { |
| 137 | (ReadHalf { handle: TTcpChannel { stream: s.stream.take() } }, |
| 138 | WriteHalf { handle: TTcpChannel { stream: Some(cloned) } }) |
| 139 | }, |
| 140 | ) |
| 141 | .ok_or_else( |
| 142 | || { |
| 143 | new_transport_error( |
| 144 | TransportErrorKind::Unknown, |
| 145 | "cannot clone underlying tcp stream", |
| 146 | ) |
| 147 | }, |
| 148 | ) |
| 149 | } |
| 150 | } |
| 151 | |
| 152 | impl Read for TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 153 | fn read(&mut self, b: &mut [u8]) -> io::Result<usize> { |
| 154 | self.if_set(|s| s.read(b)) |
| 155 | } |
| 156 | } |
| 157 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 158 | impl Write for TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 159 | fn write(&mut self, b: &[u8]) -> io::Result<usize> { |
| 160 | self.if_set(|s| s.write(b)) |
| 161 | } |
| 162 | |
| 163 | fn flush(&mut self) -> io::Result<()> { |
| 164 | self.if_set(|s| s.flush()) |
| 165 | } |
| 166 | } |
| 167 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 168 | // FIXME: Do I have to implement the Drop trait? TcpStream closes the socket on drop. |
| 169 | impl Drop for TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 170 | fn drop(&mut self) { |
| 171 | if let Err(e) = self.close() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame^] | 172 | warn!("error while closing socket: {:?}", e) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 173 | } |
| 174 | } |
| 175 | } |