Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | use std::convert::From; |
| 19 | use std::io; |
| 20 | use std::io::{ErrorKind, Read, Write}; |
Marcin Pajkowski | 98ce2c8 | 2019-11-05 00:20:15 +0100 | [diff] [blame^] | 21 | use std::net::{Shutdown, TcpStream, ToSocketAddrs}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 22 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 23 | use super::{ReadHalf, TIoChannel, WriteHalf}; |
Allen George | ef7a189 | 2018-12-16 18:01:37 -0500 | [diff] [blame] | 24 | use {new_transport_error, TransportErrorKind}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 25 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 26 | /// Bidirectional TCP/IP channel. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 27 | /// |
| 28 | /// # Examples |
| 29 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 30 | /// Create a `TTcpChannel`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 31 | /// |
| 32 | /// ```no_run |
| 33 | /// use std::io::{Read, Write}; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 34 | /// use thrift::transport::TTcpChannel; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 35 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 36 | /// let mut c = TTcpChannel::new(); |
| 37 | /// c.open("localhost:9090").unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 38 | /// |
| 39 | /// let mut buf = vec![0u8; 4]; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 40 | /// c.read(&mut buf).unwrap(); |
| 41 | /// c.write(&vec![0, 1, 2]).unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 42 | /// ``` |
| 43 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 44 | /// Create a `TTcpChannel` by wrapping an existing `TcpStream`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 45 | /// |
| 46 | /// ```no_run |
| 47 | /// use std::io::{Read, Write}; |
| 48 | /// use std::net::TcpStream; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 49 | /// use thrift::transport::TTcpChannel; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 50 | /// |
| 51 | /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 52 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 53 | /// // no need to call c.open() since we've already connected above |
| 54 | /// let mut c = TTcpChannel::with_stream(stream); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 55 | /// |
| 56 | /// let mut buf = vec![0u8; 4]; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 57 | /// c.read(&mut buf).unwrap(); |
| 58 | /// c.write(&vec![0, 1, 2]).unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 59 | /// ``` |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 60 | #[derive(Debug, Default)] |
| 61 | pub struct TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 62 | stream: Option<TcpStream>, |
| 63 | } |
| 64 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 65 | impl TTcpChannel { |
| 66 | /// Create an uninitialized `TTcpChannel`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 67 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 68 | /// The returned instance must be opened using `TTcpChannel::open(...)` |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 69 | /// before it can be used. |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 70 | pub fn new() -> TTcpChannel { |
| 71 | TTcpChannel { stream: None } |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 72 | } |
| 73 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 74 | /// Create a `TTcpChannel` that wraps an existing `TcpStream`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 75 | /// |
| 76 | /// The passed-in stream is assumed to have been opened before being wrapped |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 77 | /// by the created `TTcpChannel` instance. |
| 78 | pub fn with_stream(stream: TcpStream) -> TTcpChannel { |
Allen George | ef7a189 | 2018-12-16 18:01:37 -0500 | [diff] [blame] | 79 | TTcpChannel { |
| 80 | stream: Some(stream), |
| 81 | } |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 82 | } |
| 83 | |
Marcin Pajkowski | 98ce2c8 | 2019-11-05 00:20:15 +0100 | [diff] [blame^] | 84 | /// Connect to `remote_address`, which should implement `ToSocketAddrs` trait. |
| 85 | pub fn open<A: ToSocketAddrs>(&mut self, remote_address: A) -> ::Result<()> { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 86 | if self.stream.is_some() { |
Allen George | ef7a189 | 2018-12-16 18:01:37 -0500 | [diff] [blame] | 87 | Err(new_transport_error( |
| 88 | TransportErrorKind::AlreadyOpen, |
| 89 | "tcp connection previously opened", |
| 90 | )) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 91 | } else { |
| 92 | match TcpStream::connect(&remote_address) { |
| 93 | Ok(s) => { |
| 94 | self.stream = Some(s); |
| 95 | Ok(()) |
| 96 | } |
| 97 | Err(e) => Err(From::from(e)), |
| 98 | } |
| 99 | } |
| 100 | } |
| 101 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 102 | /// Shut down this channel. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 103 | /// |
| 104 | /// Both send and receive halves are closed, and this instance can no |
| 105 | /// longer be used to communicate with another endpoint. |
| 106 | pub fn close(&mut self) -> ::Result<()> { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 107 | self.if_set(|s| s.shutdown(Shutdown::Both)) |
| 108 | .map_err(From::from) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 109 | } |
| 110 | |
| 111 | fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T> |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 112 | where |
| 113 | F: FnMut(&mut TcpStream) -> io::Result<T>, |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 114 | { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 115 | if let Some(ref mut s) = self.stream { |
| 116 | stream_operation(s) |
| 117 | } else { |
Allen George | ef7a189 | 2018-12-16 18:01:37 -0500 | [diff] [blame] | 118 | Err(io::Error::new( |
| 119 | ErrorKind::NotConnected, |
| 120 | "tcp endpoint not connected", |
| 121 | )) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 122 | } |
| 123 | } |
| 124 | } |
| 125 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 126 | impl TIoChannel for TTcpChannel { |
| 127 | fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)> |
| 128 | where |
| 129 | Self: Sized, |
| 130 | { |
| 131 | let mut s = self; |
| 132 | |
| 133 | s.stream |
| 134 | .as_mut() |
| 135 | .and_then(|s| s.try_clone().ok()) |
Allen George | ef7a189 | 2018-12-16 18:01:37 -0500 | [diff] [blame] | 136 | .map(|cloned| { |
| 137 | let read_half = ReadHalf::new(TTcpChannel { |
| 138 | stream: s.stream.take(), |
| 139 | }); |
| 140 | let write_half = WriteHalf::new(TTcpChannel { |
| 141 | stream: Some(cloned), |
| 142 | }); |
| 143 | (read_half, write_half) |
| 144 | }) |
| 145 | .ok_or_else(|| { |
| 146 | new_transport_error( |
| 147 | TransportErrorKind::Unknown, |
| 148 | "cannot clone underlying tcp stream", |
| 149 | ) |
| 150 | }) |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 151 | } |
| 152 | } |
| 153 | |
| 154 | impl Read for TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 155 | fn read(&mut self, b: &mut [u8]) -> io::Result<usize> { |
| 156 | self.if_set(|s| s.read(b)) |
| 157 | } |
| 158 | } |
| 159 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 160 | impl Write for TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 161 | fn write(&mut self, b: &[u8]) -> io::Result<usize> { |
Allen George | cf7ba4c | 2017-12-11 11:44:11 -0500 | [diff] [blame] | 162 | self.if_set(|s| s.write(b)) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 163 | } |
| 164 | |
| 165 | fn flush(&mut self) -> io::Result<()> { |
| 166 | self.if_set(|s| s.flush()) |
| 167 | } |
| 168 | } |