blob: 9f2b8ba31e71c24027d19bbec484574a2c3a439c [file] [log] [blame]
Allen George8b96bfb2016-11-02 08:01:08 -04001// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::convert::From;
19use std::io;
20use std::io::{ErrorKind, Read, Write};
21use std::net::{Shutdown, TcpStream};
22use std::ops::Drop;
23
24use ::{TransportError, TransportErrorKind};
25
26/// Communicate with a Thrift service over a TCP socket.
27///
28/// # Examples
29///
30/// Create a `TTcpTransport`.
31///
32/// ```no_run
33/// use std::io::{Read, Write};
34/// use thrift::transport::TTcpTransport;
35///
36/// let mut t = TTcpTransport::new();
37/// t.open("localhost:9090").unwrap();
38///
39/// let mut buf = vec![0u8; 4];
40/// t.read(&mut buf).unwrap();
41/// t.write(&vec![0, 1, 2]).unwrap();
42/// ```
43///
44/// Create a `TTcpTransport` by wrapping an existing `TcpStream`.
45///
46/// ```no_run
47/// use std::io::{Read, Write};
48/// use std::net::TcpStream;
49/// use thrift::transport::TTcpTransport;
50///
51/// let stream = TcpStream::connect("127.0.0.1:9189").unwrap();
52/// let mut t = TTcpTransport::with_stream(stream);
53///
54/// // no need to call t.open() since we've already connected above
55///
56/// let mut buf = vec![0u8; 4];
57/// t.read(&mut buf).unwrap();
58/// t.write(&vec![0, 1, 2]).unwrap();
59/// ```
60#[derive(Default)]
61pub struct TTcpTransport {
62 stream: Option<TcpStream>,
63}
64
65impl TTcpTransport {
66 /// Create an uninitialized `TTcpTransport`.
67 ///
68 /// The returned instance must be opened using `TTcpTransport::open(...)`
69 /// before it can be used.
70 pub fn new() -> TTcpTransport {
71 TTcpTransport { stream: None }
72 }
73
74 /// Create a `TTcpTransport` that wraps an existing `TcpStream`.
75 ///
76 /// The passed-in stream is assumed to have been opened before being wrapped
77 /// by the created `TTcpTransport` instance.
78 pub fn with_stream(stream: TcpStream) -> TTcpTransport {
79 TTcpTransport { stream: Some(stream) }
80 }
81
82 /// Connect to `remote_address`, which should have the form `host:port`.
83 pub fn open(&mut self, remote_address: &str) -> ::Result<()> {
84 if self.stream.is_some() {
85 Err(::Error::Transport(TransportError::new(TransportErrorKind::AlreadyOpen,
86 "transport previously opened")))
87 } else {
88 match TcpStream::connect(&remote_address) {
89 Ok(s) => {
90 self.stream = Some(s);
91 Ok(())
92 }
93 Err(e) => Err(From::from(e)),
94 }
95 }
96 }
97
98 /// Shutdown this transport.
99 ///
100 /// Both send and receive halves are closed, and this instance can no
101 /// longer be used to communicate with another endpoint.
102 pub fn close(&mut self) -> ::Result<()> {
103 self.if_set(|s| s.shutdown(Shutdown::Both)).map_err(From::from)
104 }
105
106 fn if_set<F, T>(&mut self, mut stream_operation: F) -> io::Result<T>
107 where F: FnMut(&mut TcpStream) -> io::Result<T>
108 {
109
110 if let Some(ref mut s) = self.stream {
111 stream_operation(s)
112 } else {
113 Err(io::Error::new(ErrorKind::NotConnected, "tcp endpoint not connected"))
114 }
115 }
116}
117
118impl Read for TTcpTransport {
119 fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
120 self.if_set(|s| s.read(b))
121 }
122}
123
124impl Write for TTcpTransport {
125 fn write(&mut self, b: &[u8]) -> io::Result<usize> {
126 self.if_set(|s| s.write(b))
127 }
128
129 fn flush(&mut self) -> io::Result<()> {
130 self.if_set(|s| s.flush())
131 }
132}
133
134// Do I have to implement the Drop trait? TcpStream closes the socket on drop.
135impl Drop for TTcpTransport {
136 fn drop(&mut self) {
137 if let Err(e) = self.close() {
138 warn!("error while closing socket transport: {:?}", e)
139 }
140 }
141}