| Allen George | 8b96bfb | 2016-11-02 08:01:08 -0400 | [diff] [blame] | 1 | // Licensed to the Apache Software Foundation (ASF) under one | 
|  | 2 | // or more contributor license agreements. See the NOTICE file | 
|  | 3 | // distributed with this work for additional information | 
|  | 4 | // regarding copyright ownership. The ASF licenses this file | 
|  | 5 | // to you under the Apache License, Version 2.0 (the | 
|  | 6 | // "License"); you may not use this file except in compliance | 
|  | 7 | // with the License. You may obtain a copy of the License at | 
|  | 8 | // | 
|  | 9 | //   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | 10 | // | 
|  | 11 | // Unless required by applicable law or agreed to in writing, | 
|  | 12 | // software distributed under the License is distributed on an | 
|  | 13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | 14 | // KIND, either express or implied. See the License for the | 
|  | 15 | // specific language governing permissions and limitations | 
|  | 16 | // under the License. | 
|  | 17 |  | 
|  | 18 | use std::convert::From; | 
|  | 19 | use std::io; | 
|  | 20 | use std::io::{ErrorKind, Read, Write}; | 
|  | 21 | use std::net::{Shutdown, TcpStream}; | 
|  | 22 | use std::ops::Drop; | 
|  | 23 |  | 
|  | 24 | use ::{TransportError, TransportErrorKind}; | 
|  | 25 |  | 
|  | 26 | /// Communicate with a Thrift service over a TCP socket. | 
|  | 27 | /// | 
|  | 28 | /// # Examples | 
|  | 29 | /// | 
|  | 30 | /// Create a `TTcpTransport`. | 
|  | 31 | /// | 
|  | 32 | /// ```no_run | 
|  | 33 | /// use std::io::{Read, Write}; | 
|  | 34 | /// use thrift::transport::TTcpTransport; | 
|  | 35 | /// | 
|  | 36 | /// let mut t = TTcpTransport::new(); | 
|  | 37 | /// t.open("localhost:9090").unwrap(); | 
|  | 38 | /// | 
|  | 39 | /// let mut buf = vec![0u8; 4]; | 
|  | 40 | /// t.read(&mut buf).unwrap(); | 
|  | 41 | /// t.write(&vec![0, 1, 2]).unwrap(); | 
|  | 42 | /// ``` | 
|  | 43 | /// | 
|  | 44 | /// Create a `TTcpTransport` by wrapping an existing `TcpStream`. | 
|  | 45 | /// | 
|  | 46 | /// ```no_run | 
|  | 47 | /// use std::io::{Read, Write}; | 
|  | 48 | /// use std::net::TcpStream; | 
|  | 49 | /// use thrift::transport::TTcpTransport; | 
|  | 50 | /// | 
|  | 51 | /// let stream = TcpStream::connect("127.0.0.1:9189").unwrap(); | 
|  | 52 | /// let mut t = TTcpTransport::with_stream(stream); | 
|  | 53 | /// | 
|  | 54 | /// // no need to call t.open() since we've already connected above | 
|  | 55 | /// | 
|  | 56 | /// let mut buf = vec![0u8; 4]; | 
|  | 57 | /// t.read(&mut buf).unwrap(); | 
|  | 58 | /// t.write(&vec![0, 1, 2]).unwrap(); | 
|  | 59 | /// ``` | 
|  | 60 | #[derive(Default)] | 
|  | 61 | pub struct TTcpTransport { | 
|  | 62 | stream: Option<TcpStream>, | 
|  | 63 | } | 
|  | 64 |  | 
|  | 65 | impl TTcpTransport { | 
|  | 66 | /// Create an uninitialized `TTcpTransport`. | 
|  | 67 | /// | 
|  | 68 | /// The returned instance must be opened using `TTcpTransport::open(...)` | 
|  | 69 | /// before it can be used. | 
|  | 70 | pub fn new() -> TTcpTransport { | 
|  | 71 | TTcpTransport { stream: None } | 
|  | 72 | } | 
|  | 73 |  | 
|  | 74 | /// Create a `TTcpTransport` that wraps an existing `TcpStream`. | 
|  | 75 | /// | 
|  | 76 | /// The passed-in stream is assumed to have been opened before being wrapped | 
|  | 77 | /// by the created `TTcpTransport` instance. | 
|  | 78 | pub fn with_stream(stream: TcpStream) -> TTcpTransport { | 
|  | 79 | TTcpTransport { stream: Some(stream) } | 
|  | 80 | } | 
|  | 81 |  | 
|  | 82 | /// Connect to `remote_address`, which should have the form `host:port`. | 
|  | 83 | pub fn open(&mut self, remote_address: &str) -> ::Result<()> { | 
|  | 84 | if self.stream.is_some() { | 
|  | 85 | Err(::Error::Transport(TransportError::new(TransportErrorKind::AlreadyOpen, | 
|  | 86 | "transport previously opened"))) | 
|  | 87 | } else { | 
|  | 88 | match TcpStream::connect(&remote_address) { | 
|  | 89 | Ok(s) => { | 
|  | 90 | self.stream = Some(s); | 
|  | 91 | Ok(()) | 
|  | 92 | } | 
|  | 93 | Err(e) => Err(From::from(e)), | 
|  | 94 | } | 
|  | 95 | } | 
|  | 96 | } | 
|  | 97 |  | 
|  | 98 | /// Shutdown this transport. | 
|  | 99 | /// | 
|  | 100 | /// Both send and receive halves are closed, and this instance can no | 
|  | 101 | /// longer be used to communicate with another endpoint. | 
|  | 102 | pub fn close(&mut self) -> ::Result<()> { | 
|  | 103 | self.if_set(|s| s.shutdown(Shutdown::Both)).map_err(From::from) | 
|  | 104 | } | 
|  | 105 |  | 
|  | 106 | fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T> | 
|  | 107 | where F: FnMut(&mut TcpStream) -> io::Result<T> | 
|  | 108 | { | 
|  | 109 |  | 
|  | 110 | if let Some(ref mut s) = self.stream { | 
|  | 111 | stream_operation(s) | 
|  | 112 | } else { | 
|  | 113 | Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected")) | 
|  | 114 | } | 
|  | 115 | } | 
|  | 116 | } | 
|  | 117 |  | 
|  | 118 | impl Read for TTcpTransport { | 
|  | 119 | fn read(&mut self, b: &mut [u8]) -> io::Result<usize> { | 
|  | 120 | self.if_set(|s| s.read(b)) | 
|  | 121 | } | 
|  | 122 | } | 
|  | 123 |  | 
|  | 124 | impl Write for TTcpTransport { | 
|  | 125 | fn write(&mut self, b: &[u8]) -> io::Result<usize> { | 
|  | 126 | self.if_set(|s| s.write(b)) | 
|  | 127 | } | 
|  | 128 |  | 
|  | 129 | fn flush(&mut self) -> io::Result<()> { | 
|  | 130 | self.if_set(|s| s.flush()) | 
|  | 131 | } | 
|  | 132 | } | 
|  | 133 |  | 
|  | 134 | // Do I have to implement the Drop trait? TcpStream closes the socket on drop. | 
|  | 135 | impl Drop for TTcpTransport { | 
|  | 136 | fn drop(&mut self) { | 
|  | 137 | if let Err(e) = self.close() { | 
|  | 138 | warn!("error while closing socket transport: {:?}", e) | 
|  | 139 | } | 
|  | 140 | } | 
|  | 141 | } |