Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one |
| 2 | // or more contributor license agreements. See the NOTICE file |
| 3 | // distributed with this work for additional information |
| 4 | // regarding copyright ownership. The ASF licenses this file |
| 5 | // to you under the Apache License, Version 2.0 (the |
| 6 | // "License"); you may not use this file except in compliance |
| 7 | // with the License. You may obtain a copy of the License at |
| 8 | // |
| 9 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | // |
| 11 | // Unless required by applicable law or agreed to in writing, |
| 12 | // software distributed under the License is distributed on an |
| 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | // KIND, either express or implied. See the License for the |
| 15 | // specific language governing permissions and limitations |
| 16 | // under the License. |
| 17 | |
| 18 | use std::convert::From; |
| 19 | use std::io; |
| 20 | use std::io::{ErrorKind, Read, Write}; |
| 21 | use std::net::{Shutdown, TcpStream}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 22 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 23 | use {TransportErrorKind, new_transport_error}; |
| 24 | use super::{ReadHalf, TIoChannel, WriteHalf}; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 25 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 26 | /// Bidirectional TCP/IP channel. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 27 | /// |
| 28 | /// # Examples |
| 29 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 30 | /// Create a `TTcpChannel`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 31 | /// |
| 32 | /// ```no_run |
| 33 | /// use std::io::{Read, Write}; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 34 | /// use thrift::transport::TTcpChannel; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 35 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 36 | /// let mut c = TTcpChannel::new(); |
| 37 | /// c.open("localhost:9090").unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 38 | /// |
| 39 | /// let mut buf = vec![0u8; 4]; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 40 | /// c.read(&mut buf).unwrap(); |
| 41 | /// c.write(&vec![0, 1, 2]).unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 42 | /// ``` |
| 43 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 44 | /// Create a `TTcpChannel` by wrapping an existing `TcpStream`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 45 | /// |
| 46 | /// ```no_run |
| 47 | /// use std::io::{Read, Write}; |
| 48 | /// use std::net::TcpStream; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 49 | /// use thrift::transport::TTcpChannel; |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 50 | /// |
| 51 | /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 52 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 53 | /// // no need to call c.open() since we've already connected above |
| 54 | /// let mut c = TTcpChannel::with_stream(stream); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 55 | /// |
| 56 | /// let mut buf = vec![0u8; 4]; |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 57 | /// c.read(&mut buf).unwrap(); |
| 58 | /// c.write(&vec![0, 1, 2]).unwrap(); |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 59 | /// ``` |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 60 | #[derive(Debug, Default)] |
| 61 | pub struct TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 62 | stream: Option<TcpStream>, |
| 63 | } |
| 64 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 65 | impl TTcpChannel { |
| 66 | /// Create an uninitialized `TTcpChannel`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 67 | /// |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 68 | /// The returned instance must be opened using `TTcpChannel::open(...)` |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 69 | /// before it can be used. |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 70 | pub fn new() -> TTcpChannel { |
| 71 | TTcpChannel { stream: None } |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 72 | } |
| 73 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 74 | /// Create a `TTcpChannel` that wraps an existing `TcpStream`. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 75 | /// |
| 76 | /// The passed-in stream is assumed to have been opened before being wrapped |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 77 | /// by the created `TTcpChannel` instance. |
| 78 | pub fn with_stream(stream: TcpStream) -> TTcpChannel { |
| 79 | TTcpChannel { stream: Some(stream) } |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 80 | } |
| 81 | |
| 82 | /// Connect to `remote_address`, which should have the form `host:port`. |
| 83 | pub fn open(&mut self, remote_address: &str) -> ::Result<()> { |
| 84 | if self.stream.is_some() { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 85 | Err( |
| 86 | new_transport_error( |
| 87 | TransportErrorKind::AlreadyOpen, |
| 88 | "tcp connection previously opened", |
| 89 | ), |
| 90 | ) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 91 | } else { |
| 92 | match TcpStream::connect(&remote_address) { |
| 93 | Ok(s) => { |
| 94 | self.stream = Some(s); |
| 95 | Ok(()) |
| 96 | } |
| 97 | Err(e) => Err(From::from(e)), |
| 98 | } |
| 99 | } |
| 100 | } |
| 101 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 102 | /// Shut down this channel. |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 103 | /// |
| 104 | /// Both send and receive halves are closed, and this instance can no |
| 105 | /// longer be used to communicate with another endpoint. |
| 106 | pub fn close(&mut self) -> ::Result<()> { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 107 | self.if_set(|s| s.shutdown(Shutdown::Both)) |
| 108 | .map_err(From::from) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 109 | } |
| 110 | |
| 111 | fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T> |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 112 | where |
| 113 | F: FnMut(&mut TcpStream) -> io::Result<T>, |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 114 | { |
| 115 | |
| 116 | if let Some(ref mut s) = self.stream { |
| 117 | stream_operation(s) |
| 118 | } else { |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 119 | Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"),) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 120 | } |
| 121 | } |
| 122 | } |
| 123 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 124 | impl TIoChannel for TTcpChannel { |
| 125 | fn split(self) -> ::Result<(ReadHalf<Self>, WriteHalf<Self>)> |
| 126 | where |
| 127 | Self: Sized, |
| 128 | { |
| 129 | let mut s = self; |
| 130 | |
| 131 | s.stream |
| 132 | .as_mut() |
| 133 | .and_then(|s| s.try_clone().ok()) |
| 134 | .map( |
| 135 | |cloned| { |
| 136 | (ReadHalf { handle: TTcpChannel { stream: s.stream.take() } }, |
| 137 | WriteHalf { handle: TTcpChannel { stream: Some(cloned) } }) |
| 138 | }, |
| 139 | ) |
| 140 | .ok_or_else( |
| 141 | || { |
| 142 | new_transport_error( |
| 143 | TransportErrorKind::Unknown, |
| 144 | "cannot clone underlying tcp stream", |
| 145 | ) |
| 146 | }, |
| 147 | ) |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | impl Read for TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 152 | fn read(&mut self, b: &mut [u8]) -> io::Result<usize> { |
| 153 | self.if_set(|s| s.read(b)) |
| 154 | } |
| 155 | } |
| 156 | |
Allen George | 0e22c36 | 2017-01-30 07:15:00 -0500 | [diff] [blame] | 157 | impl Write for TTcpChannel { |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 158 | fn write(&mut self, b: &[u8]) -> io::Result<usize> { |
Allen George | cf7ba4c | 2017-12-11 11:44:11 -0500 | [diff] [blame^] | 159 | self.if_set(|s| s.write(b)) |
Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 160 | } |
| 161 | |
| 162 | fn flush(&mut self) -> io::Result<()> { |
| 163 | self.if_set(|s| s.flush()) |
| 164 | } |
| 165 | } |